websocket actions: Refactor registration

This commit is contained in:
Tom Wiesing 2023-11-09 14:58:19 +01:00
parent 08ab7b4383
commit 7b5f8a9882
No known key found for this signature in database
4 changed files with 102 additions and 87 deletions

View file

@ -1,246 +0,0 @@
package socket
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/auth"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/auth/scopes"
"github.com/gorilla/websocket"
"github.com/tkw1536/pkglib/httpx"
)
// ActionMap handles a set of WebSocket actions
type ActionMap map[string]Action
var errReadParamsTimeout = errors.New("timeout reading the first message")
var errUnknownAction = errors.New("unknown action call")
var errIncorrectParams = errors.New("invalid number of parameters")
type errPanic struct{ value any }
func (err errPanic) Error() string {
return fmt.Sprintf("fatal error: %v", err.value)
}
// Handle handles a new incoming websocket connection using the given authentication.
//
// There are two kinds of messages:
//
// - text messages, which are used to send input and output.
// - binary messages, which are json-encoded and used for control flow.
//
// To call an action, a client should send a CallMessage struct.
// The server will then start handling input and output (via text messages).
// If the client sends a SignalMessage, the signal is propagnated to the underlying context.
// Finally it will send a ResultMessage once handling is complete.
//
// A corresponding client implementation of this can be found in ..../remote/proto.ts
func (am ActionMap) Handle(auth *auth.Auth, conn httpx.WebSocketConnection) (name string, err error) {
var wg sync.WaitGroup
// once we have finished executing send a binary message (indicating success) to the client.
defer func() {
// close the underlying connection, and then wait for everything to finish!
defer wg.Wait()
defer conn.Close()
// recover from any errors
if v := recover(); v != nil {
err = errPanic{value: v}
}
// generate a result message
var result ResultMessage
if err == nil {
result.Success = true
} else {
result.Success = false
result.Message = err.Error()
if result.Message == "" {
result.Message = "unspecified error"
}
}
// encode the result message to json!
var message httpx.WebSocketMessage
message.Type = websocket.BinaryMessage
message.Bytes, err = json.Marshal(result)
// silently fail if the message fails to encode
// although this should not happen
if err != nil {
return
}
// and tell the client about it!
<-conn.Write(message)
}()
// create channels to receive text and bytes messages
textMessages := make(chan string, 10)
binaryMessages := make(chan []byte, 10)
// start reading text and binary messages
// and redirect everything to the right channels
wg.Add(1)
go func() {
defer wg.Done()
defer close(textMessages)
defer close(binaryMessages)
for {
select {
case msg := <-conn.Read():
if msg.Type == websocket.TextMessage {
textMessages <- string(msg.Bytes)
}
if msg.Type == websocket.BinaryMessage {
binaryMessages <- msg.Bytes
}
case <-conn.Context().Done():
return
}
}
}()
var call CallMessage
select {
case buffer := <-binaryMessages:
if err := json.Unmarshal(buffer, &call); err != nil {
return "", errUnknownAction
}
case <-time.After(1 * time.Second):
return "", errReadParamsTimeout
}
// check that the given action exists!
// and has the right number of parameters!
action, ok := am[call.Call]
if !ok || action.Handle == nil {
return call.Call, errUnknownAction
}
if action.NumParams != len(call.Params) {
return call.Call, errIncorrectParams
}
// check that we have the given permission
if err := auth.CheckScope(action.ScopeParam, action.scope(), conn.Request()); err != nil {
return call.Call, err
}
// create a context to be canceled once done
ctx, cancel := context.WithCancel(conn.Context())
defer cancel()
// handle any signal messages
wg.Add(1)
go func() {
defer wg.Done()
var signal SignalMessage
for binary := range binaryMessages {
signal.Signal = ""
// read the signal message
if err := json.Unmarshal(binary, &signal); err != nil {
continue
}
// if we got a cancel message, do the cancellation!
if signal.Signal == SignalCancel {
cancel()
}
}
}()
// create a pipe to handle the input
// and start handling it
var inputR, inputW = io.Pipe()
defer inputW.Close()
wg.Add(1)
go func() {
defer wg.Done()
for text := range textMessages {
inputW.Write([]byte(text))
}
}()
// write the output to the client as it comes in!
// NOTE(twiesing): We may eventually need buffering here ...
output := WriteFunc(func(b []byte) (int, error) {
<-conn.WriteText(string(b))
return len(b), nil
})
// handle the actual
return call.Call, action.Handle(ctx, inputR, output, call.Params...)
}
// WriteFunc implements io.Writer using a function.
type WriteFunc func([]byte) (int, error)
func (wf WriteFunc) Write(b []byte) (int, error) {
return wf(b)
}
// Action is something that can be handled via a WebSocket connection.
type Action struct {
// NumPara
NumParams int
// Scope and ScopeParam indicate the scope required by the caller.
// TODO(twiesing): Once we actually include scopes, make them dynamic
Scope component.Scope
ScopeParam string
// Handle handles this action.
//
// ctx is closed once the underlying connection is closed.
// out is an io.Writer that is automatically sent to the client.
// params holds exactly NumParams parameters.
Handle func(ctx context.Context, in io.Reader, out io.Writer, params ...string) error
}
// scope returns the actual scope required by this action.
// If the caller did not provide an actual scope, uses ScopeNever
func (action Action) scope() component.Scope {
if action.Scope == "" {
return scopes.ScopeNever
}
return action.Scope
}
// CallMessage is sent by the client to the server to invoke a remote procedure
type CallMessage struct {
Call string `json:"call"`
Params []string `json:"params,omitempty"`
}
// CancelMessage is sent from the client to the server to stop the current procedure
type SignalMessage struct {
Signal Signal `json:"signal"`
}
type Signal string
const (
SignalCancel = "cancel"
)
// ResultMessage is sent by the server to the client to report the success of a remote procedure
type ResultMessage struct {
Success bool `json:"success"`
Message string `json:"message,omitempty"`
}