remote: Allow protocol input & cancellation

This commit reworks the protocol being used on top of websockets. It now
permits sending input to the server, and interrupting the remote
process.
This commit is contained in:
Tom Wiesing 2023-02-28 21:34:58 +01:00
parent 746ebcd9e3
commit c19215068e
No known key found for this signature in database
12 changed files with 383 additions and 217 deletions

View file

@ -11,11 +11,11 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/wisski"
)
// non-instance specific actions
var actions = map[string]SocketAction{
"backup": {
HandleInteractive: func(ctx context.Context, socket *Sockets, out io.Writer, params ...string) error {
return socket.Dependencies.Exporter.MakeExport(
func (sockets *Sockets) Actions() ActionMap {
return map[string]Action{
// generic actions
"backup": sockets.Generic(0, func(ctx context.Context, sockets *Sockets, in io.Reader, out io.Writer, params ...string) error {
return sockets.Dependencies.Exporter.MakeExport(
ctx,
out,
exporter.ExportTask{
@ -25,12 +25,8 @@ var actions = map[string]SocketAction{
StagingOnly: false,
},
)
},
},
"provision": {
NumParams: 1,
HandleInteractive: func(ctx context.Context, sockets *Sockets, out io.Writer, params ...string) error {
}),
"provision": sockets.Generic(1, func(ctx context.Context, sockets *Sockets, in io.Reader, out io.Writer, params ...string) error {
// read the flags of the instance to be provisioned
var flags provision.ProvisionFlags
if err := json.Unmarshal([]byte(params[0]), &flags); err != nil {
@ -51,14 +47,11 @@ var actions = map[string]SocketAction{
fmt.Fprintf(out, "Password: %s\n", instance.DrupalPassword)
return nil
},
},
}
}),
// socket specific actions
var iActions = map[string]IAction{
"snapshot": {
HandleInteractive: func(ctx context.Context, socket *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
// instance-specific actions!
"snapshot": sockets.Instance(0, func(ctx context.Context, socket *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return socket.Dependencies.Exporter.MakeExport(
ctx,
out,
@ -69,44 +62,24 @@ var iActions = map[string]IAction{
StagingOnly: false,
},
)
},
},
"rebuild": {
HandleInteractive: func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
}),
"rebuild": sockets.Instance(0, func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return instance.Barrel().Build(ctx, out, true)
},
},
"update": {
HandleInteractive: func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
}),
"update": sockets.Instance(0, func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return instance.Drush().Update(ctx, out)
},
},
"cron": {
HandleInteractive: func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, str io.Writer, params ...string) error {
}),
"cron": sockets.Instance(0, func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, in io.Reader, str io.Writer, params ...string) error {
return instance.Drush().Cron(ctx, str)
},
},
"start": {
HandleInteractive: func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
}),
"start": sockets.Instance(0, func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return instance.Barrel().Stack().Up(ctx, out)
},
},
"stop": {
HandleInteractive: func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
}),
"stop": sockets.Instance(0, func(ctx context.Context, _ *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return instance.Barrel().Stack().Down(ctx, out)
},
},
"purge": {
HandleInteractive: func(ctx context.Context, sockets *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error {
}),
"purge": sockets.Instance(0, func(ctx context.Context, sockets *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error {
return sockets.Dependencies.Purger.Purge(ctx, out, instance.Slug)
},
},
}
var igActions = func() map[string]SocketAction {
generics := make(map[string]SocketAction, len(iActions))
for n, a := range iActions {
generics[n] = a.AsGenericAction()
}),
}
return generics
}()
}

View file

@ -0,0 +1,217 @@
package socket
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/pkglib/httpx"
)
// ActionMap handles a set of WebSocket actions
type ActionMap map[string]Action
var errReadParamsTimeout = errors.New("timeout reading the first message")
var errUnknownAction = errors.New("unknown action call")
var errIncorrectParams = errors.New("invalid number of parameters")
type errPanic struct{ value any }
func (err errPanic) Error() string {
return fmt.Sprintf("fatal error: %v", err.value)
}
// Handle handles a new incoming websocket connection.
//
// There are two kinds of messages:
//
// - text messages, which are used to send input and output.
// - binary messages, which are json-encoded and used for control flow.
//
// To call an action, a client should send a CallMessage struct.
// The server will then start handling input and output (via text messages).
// If the client sends a SignalMessage, the signal is propagnated to the underlying context.
// Finally it will send a ResultMessage once handling is complete.
//
// A corresponding client implementation of this can be found in ..../remote/proto.ts
func (am ActionMap) Handle(conn httpx.WebSocketConnection) (name string, err error) {
var wg sync.WaitGroup
// once we have finished executing send a binary message (indicating success) to the client.
defer func() {
// close the underlying connection, and then wait for everything to finish!
defer wg.Wait()
defer conn.Close()
// recover from any errors
if v := recover(); v != nil {
err = errPanic{value: v}
}
// generate a result message
var result ResultMessage
if err == nil {
result.Success = true
} else {
result.Success = false
result.Message = err.Error()
}
// encode the result message to json!
var message httpx.WebSocketMessage
message.Type = websocket.BinaryMessage
message.Bytes, err = json.Marshal(result)
// silently fail if the message fails to encode
// although this should not happen
if err != nil {
return
}
// and tell the client about it!
<-conn.Write(message)
}()
// create channels to receive text and bytes messages
textMessages := make(chan string, 10)
binaryMessages := make(chan []byte, 10)
// start reading text and binary messages
// and redirect everything to the right channels
wg.Add(1)
go func() {
defer wg.Done()
defer close(textMessages)
defer close(binaryMessages)
for {
select {
case msg := <-conn.Read():
if msg.Type == websocket.TextMessage {
textMessages <- string(msg.Bytes)
}
if msg.Type == websocket.BinaryMessage {
binaryMessages <- msg.Bytes
}
case <-conn.Context().Done():
return
}
}
}()
var call CallMessage
select {
case buffer := <-binaryMessages:
if err := json.Unmarshal(buffer, &call); err != nil {
return "", errUnknownAction
}
case <-time.After(1 * time.Second):
return "", errReadParamsTimeout
}
// check that the given action exists!
// and has the right number of parameters!
action, ok := am[call.Name]
if !ok || action.Handle == nil {
return call.Name, errUnknownAction
}
if action.NumParams != len(call.Params) {
return call.Name, errIncorrectParams
}
// create a context to be canceled once done
ctx, cancel := context.WithCancel(conn.Context())
defer cancel()
// handle any signal messages
wg.Add(1)
go func() {
defer wg.Done()
var signal SignalMessage
for binary := range binaryMessages {
signal.Signal = ""
// read the signal message
if err := json.Unmarshal(binary, &signal); err != nil {
continue
}
// if we got a cancel message, do the cancellation!
if signal.Signal == SignalCancel {
cancel()
}
}
}()
// create a pipe to handle the input
// and start handling it
var inputR, inputW = io.Pipe()
defer inputW.Close()
wg.Add(1)
go func() {
defer wg.Done()
for text := range textMessages {
inputW.Write([]byte(text))
}
}()
// create a linebuffer to write the output line by line
output := &status.LineBuffer{
Line: func(line string) {
<-conn.WriteText(line)
},
FlushLineOnClose: true,
}
defer output.Close()
// handle the actual
return call.Name, action.Handle(ctx, inputR, output, call.Params...)
}
// Action is something that can be handled via a WebSocket connection.
type Action struct {
// NumPara
NumParams int
// Handle handles this action.
//
// ctx is closed once the underlying connection is closed.
// out is an io.Writer that is automatically sent to the client.
// params holds exactly NumParams parameters.
Handle func(ctx context.Context, in io.Reader, out io.Writer, params ...string) error
}
// CallMessage is sent by the client to the server to invoke a remote procedure
type CallMessage struct {
Name string `json:"name"`
Params []string `json:"params,omitempty"`
}
// CancelMessage is sent from the client to the server to stop the current procedure
type SignalMessage struct {
Signal Signal `json:"signal"`
}
type Signal string
const (
SignalCancel = "cancel"
)
// ResultMessage is sent by the server to the client to report the success of a remote procedure
type ResultMessage struct {
Success bool `json:"success"`
Message string `json:"message,omitempty"`
}

View file

@ -2,10 +2,7 @@ package socket
import (
"context"
"encoding/json"
"errors"
"io"
"time"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/exporter"
@ -13,14 +10,16 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/instances/purger"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/provision"
"github.com/FAU-CDI/wisski-distillery/internal/wisski"
"github.com/gorilla/websocket"
"github.com/tkw1536/goprogram/status"
"github.com/rs/zerolog"
"github.com/tkw1536/pkglib/httpx"
"github.com/tkw1536/pkglib/lazy"
)
type Sockets struct {
component.Base
actions lazy.Lazy[ActionMap]
Dependencies struct {
Provision *provision.Provision
Instances *instances.Instances
@ -31,115 +30,33 @@ type Sockets struct {
// Serve handles a connection to the websocket api
func (socket *Sockets) Serve(conn httpx.WebSocketConnection) {
// read the next message to act on
message, ok := <-conn.Read()
if !ok {
return
}
name := string(message.Bytes)
// perform a generic action first
if action, ok := actions[name]; ok {
socket.Handle(conn, action)
return
}
// then do the socket actions
if action, ok := igActions[name]; ok {
socket.Handle(conn, action)
}
}
var instanceParamsTimeout = time.Second
type actionResult struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
func (*Sockets) reportErrorToClient(conn httpx.WebSocketConnection, err error) {
// create an action result
var result actionResult
if err == nil {
result.Success = true
} else {
result.Success = false
result.Error = err.Error()
}
// marshal the result, ignoring any error silently
data, err := json.Marshal(result)
// handle the websocket connection!
name, err := socket.actions.Get(socket.Actions).Handle(conn)
if err != nil {
return
zerolog.Ctx(conn.Context()).Err(err).Str("name", name).Msg("Error handling websocket")
}
// and send it as a binary message to the client
<-conn.Write(httpx.WebSocketMessage{Type: websocket.BinaryMessage, Bytes: data})
}
var errInsufficientParams = errors.New("insufficient parameters")
var errParameterTimeout = errors.New("timed out reading parameters")
func (socket *Sockets) Handle(conn httpx.WebSocketConnection, action SocketAction) (err error) {
// report the error to the client
defer func() {
// NOTE: the closure is needed here!
socket.reportErrorToClient(conn, err)
}()
// read the parameters
params := make([]string, action.NumParams)
for i := range params {
select {
case message, ok := <-conn.Read():
if !ok {
return errInsufficientParams
}
params[i] = string(message.Bytes)
case <-time.After(instanceParamsTimeout):
return errParameterTimeout
}
}
// build a stream
writer := &status.LineBuffer{
Line: func(line string) {
<-conn.WriteText(line)
// Generic returns a new action that calls handler with the provided number of parameters
func (sockets *Sockets) Generic(numParams int, handler func(ctx context.Context, socket *Sockets, in io.Reader, out io.Writer, params ...string) error) Action {
return Action{
NumParams: numParams,
Handle: func(ctx context.Context, in io.Reader, out io.Writer, params ...string) error {
return handler(ctx, sockets, in, out, params...)
},
FlushLineOnClose: true,
}
defer writer.Close()
// handle the interactive action
return action.HandleInteractive(conn.Context(), socket, writer, params...)
}
// IAction is like SocketAction, but takes the slug of an instance (runnning or not) as the first parameter
type IAction struct {
NumParams int
HandleInteractive func(ctx context.Context, sockets *Sockets, instance *wisski.WissKI, out io.Writer, params ...string) error
}
// AsGenericAction turns this InstanceAction into a generic action
func (ia IAction) AsGenericAction() SocketAction {
return SocketAction{
NumParams: ia.NumParams + 1,
HandleInteractive: func(ctx context.Context, sockets *Sockets, out io.Writer, params ...string) error {
// Insstance returns a new action that calls handler with a specific WissKI instance
func (sockets *Sockets) Instance(numParams int, handler func(ctx context.Context, sockets *Sockets, instance *wisski.WissKI, in io.Reader, out io.Writer, params ...string) error) Action {
return Action{
NumParams: numParams + 1,
Handle: func(ctx context.Context, in io.Reader, out io.Writer, params ...string) error {
instance, err := sockets.Dependencies.Instances.WissKI(ctx, params[0])
if err != nil {
return err
}
return ia.HandleInteractive(ctx, sockets, instance, out, params[1:]...)
return handler(ctx, sockets, instance, in, out, params[1:]...)
},
}
}
// SocketAction represents an action handled via socket
type SocketAction struct {
NumParams int
HandleInteractive func(ctx context.Context, sockets *Sockets, out io.Writer, params ...string) error
}