diff --git a/cmd/backup.go b/cmd/backup.go index bfbec11..59026c2 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -1,18 +1,11 @@ package cmd import ( - "io" - "path/filepath" - wisski_distillery "github.com/FAU-CDI/wisski-distillery" "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" "github.com/FAU-CDI/wisski-distillery/internal/core" - "github.com/FAU-CDI/wisski-distillery/internal/models" - "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/logging" - "github.com/FAU-CDI/wisski-distillery/pkg/targz" "github.com/tkw1536/goprogram/exit" - "github.com/tkw1536/goprogram/status" ) // Backup is the 'backup' command @@ -53,13 +46,13 @@ func (bk backupC) Run(context wisski_distillery.Context) error { } // do the handling - err := handleSnapshotLike(context, SnapshotFlags{ + err := dis.SnapshotManager().HandleSnapshotLike(context.IOStream, snapshots.SnapshotFlags{ Dest: bk.Positionals.Dest, Slug: "", Title: "Backup", StagingOnly: bk.StagingOnly, - Do: func(dest string) SnapshotLike { + Do: func(dest string) snapshots.SnapshotLike { backup := dis.SnapshotManager().NewBackup(context.IOStream, snapshots.BackupDescription{ Dest: dest, ConcurrentSnapshots: bk.ConcurrentSnapshots, @@ -73,126 +66,3 @@ func (bk backupC) Run(context wisski_distillery.Context) error { } return nil } - -type SnapshotFlags struct { - Dest string - Slug string - Title string // "Backup" or "Snapshot" - StagingOnly bool - - Do func(dest string) SnapshotLike -} - -type SnapshotLike interface { - LogEntry() models.Snapshot - Report(w io.Writer) (int, error) -} - -func handleSnapshotLike(context wisski_distillery.Context, flags SnapshotFlags) (err error) { - dis := context.Environment - - // determine target paths - logging.LogMessage(context.IOStream, "Determining target paths") - var stagingDir, archivePath string - if flags.StagingOnly { - stagingDir = flags.Dest - } else { - archivePath = flags.Dest - } - if stagingDir == "" { - stagingDir, err = dis.SnapshotManager().NewStagingDir(flags.Slug) - if err != nil { - return err - } - } - if !flags.StagingOnly && archivePath == "" { - archivePath = dis.SnapshotManager().NewArchivePath(flags.Slug) - } - context.Printf("Staging Directory: %s\n", stagingDir) - context.Printf("Archive Path: %s\n", archivePath) - - // create the staging directory - logging.LogMessage(context.IOStream, "Creating staging directory") - err = dis.Core.Environment.Mkdir(stagingDir, environment.DefaultDirPerm) - if !environment.IsExist(err) && err != nil { - return err - } - - // if it was requested to not do staging only - // we need the staging directory to be deleted at the end - if !flags.StagingOnly { - defer func() { - logging.LogMessage(context.IOStream, "Removing staging directory") - dis.Environment.RemoveAll(stagingDir) - }() - } - - // create the actual snapshot or backup - // write out the report - // and retain a log entry - var entry models.Snapshot - logging.LogOperation(func() error { - // do the snapshot! - sl := flags.Do(stagingDir) - - // create a log entry - entry = sl.LogEntry() - - // find the report path - reportPath := filepath.Join(stagingDir, "report.txt") - context.Println(reportPath) - - // create the path - report, err := dis.Environment.Create(reportPath, environment.DefaultFilePerm) - if err != nil { - return err - } - - // and write out the report - { - _, err := sl.Report(report) - return err - } - }, context.IOStream, "Generating %s", flags.Title) - - // if we only requested staging - // all that is left is to write the log entry - if flags.StagingOnly { - logging.LogMessage(context.IOStream, "Writing Log Entry") - - // write out the log entry - entry.Path = stagingDir - entry.Packed = false - dis.Instances().AddSnapshotLog(entry) - - context.Printf("Wrote %s\n", stagingDir) - return nil - } - - // package everything up as an archive! - if err := logging.LogOperation(func() error { - var count int64 - defer func() { context.Printf("Wrote %d byte(s) to %s\n", count, archivePath) }() - - st := status.NewWithCompat(context.Stdout, 1) - st.Start() - defer st.Stop() - - count, err = targz.Package(dis.Core.Environment, archivePath, stagingDir, func(dst, src string) { - st.Set(0, dst) - }) - - return err - }, context.IOStream, "Writing archive"); err != nil { - return err - } - - // write out the log entry - logging.LogMessage(context.IOStream, "Writing Log Entry") - entry.Path = archivePath - entry.Packed = true - dis.Instances().AddSnapshotLog(entry) - - // and we're done! - return nil -} diff --git a/cmd/snapshot.go b/cmd/snapshot.go index 7dcd692..bde0a5a 100644 --- a/cmd/snapshot.go +++ b/cmd/snapshot.go @@ -45,13 +45,13 @@ func (bi snapshot) Run(context wisski_distillery.Context) error { } // do a snapshot of it! - err = handleSnapshotLike(context, SnapshotFlags{ + err = dis.SnapshotManager().HandleSnapshotLike(context.IOStream, snapshots.SnapshotFlags{ Dest: bi.Positionals.Dest, Slug: bi.Positionals.Slug, Title: "Snapshot", StagingOnly: bi.StagingOnly, - Do: func(dest string) SnapshotLike { + Do: func(dest string) snapshots.SnapshotLike { snapshot := dis.SnapshotManager().NewSnapshot(instance, context.IOStream, snapshots.SnapshotDescription{ Dest: dest, }) diff --git a/go.mod b/go.mod index b89a956..b89da52 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,9 @@ require ( github.com/alessio/shellescape v1.4.1 github.com/feiin/sqlstring v0.3.0 github.com/go-sql-driver/mysql v1.6.0 + github.com/gorilla/websocket v1.5.0 github.com/pkg/errors v0.9.1 - github.com/tkw1536/goprogram v0.1.0 + github.com/tkw1536/goprogram v0.1.1 golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 gorm.io/driver/mysql v1.3.6 diff --git a/go.sum b/go.sum index 05f70ca..6578251 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/feiin/sqlstring v0.3.0 h1:iyPEFijI2BxpY2M+AuhIvdNManzXa2OwGzuPaEMLUgo github.com/feiin/sqlstring v0.3.0/go.mod h1:xpZTjVUw1nD3hMgF9SMRdPiooKSikLf4PS5j2NTn3RI= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gosuri/uilive v0.0.4 h1:hUEBpQDj8D8jXgtCdBu7sWsy5sbW/5GhuO8KBwJ2jyY= github.com/gosuri/uilive v0.0.4/go.mod h1:V/epo5LjjlDE5RJUcqx8dbw+zc93y5Ya3yg8tfZ74VI= github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= @@ -25,6 +27,8 @@ github.com/tkw1536/goprogram v0.0.17 h1:SAD/rHtxm7CTEdUV1a37LUyE6G0MdcKhLzO8fZ8c github.com/tkw1536/goprogram v0.0.17/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE= github.com/tkw1536/goprogram v0.1.0 h1:cP+Z7VKgRF93JApsau1azKxGrH/nTwcGLnSYfZP9XjE= github.com/tkw1536/goprogram v0.1.0/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE= +github.com/tkw1536/goprogram v0.1.1 h1:gamK9OuRqoX2yQlA/nkgfVHHZWd/u2uUj6vJMYrYa70= +github.com/tkw1536/goprogram v0.1.1/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE= golang.org/x/exp v0.0.0-20220929160808-de9c53c655b9 h1:lNtcVz/3bOstm7Vebox+5m3nLh/BYWnhmc3AhXOW6oI= golang.org/x/exp v0.0.0-20220929160808-de9c53c655b9/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 h1:fGZugkZk2UgYBxtpKmvub51Yno1LJDeEsRp2xGD+0gY= diff --git a/internal/component/control/control/docker-compose.yml b/internal/component/control/control/docker-compose.yml index 1fbd8bd..c95fe95 100644 --- a/internal/component/control/control/docker-compose.yml +++ b/internal/component/control/control/docker-compose.yml @@ -19,7 +19,7 @@ services: # TODO: Mount docker socket properly! - "/var/run/docker.sock:/var/run/docker.sock" - "${CONFIG_PATH}:${CONFIG_PATH}:ro" - - "${DEPLOY_ROOT}:${DEPLOY_ROOT}:ro" + - "${DEPLOY_ROOT}:${DEPLOY_ROOT}:rw" - "${GLOBAL_AUTHORIZED_KEYS_FILE}:${GLOBAL_AUTHORIZED_KEYS_FILE}:ro" - "${SELF_OVERRIDES_FILE}:${SELF_OVERRIDES_FILE}:ro" - "${SELF_RESOLVER_BLOCK_FILE}:${SELF_RESOLVER_BLOCK_FILE}:ro" diff --git a/internal/component/control/extras_info.go b/internal/component/control/extras_info.go deleted file mode 100644 index 0203ffd..0000000 --- a/internal/component/control/extras_info.go +++ /dev/null @@ -1,207 +0,0 @@ -package control - -import ( - "context" - "html/template" - "net/http" - "strings" - "time" - - _ "embed" - - "github.com/FAU-CDI/wisski-distillery/internal/component" - "github.com/FAU-CDI/wisski-distillery/internal/component/instances" - "github.com/FAU-CDI/wisski-distillery/internal/config" - "github.com/FAU-CDI/wisski-distillery/internal/models" - "github.com/FAU-CDI/wisski-distillery/pkg/httpx" - "github.com/tkw1536/goprogram/stream" - "golang.org/x/sync/errgroup" -) - -type Info struct { - component.ComponentBase - - Instances *instances.Instances -} - -func (Info) Name() string { return "control-info" } - -func (*Info) Routes() []string { return []string{"/dis/"} } - -func (info *Info) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) { - mux := http.NewServeMux() - - // handle everything under /dis/! - mux.HandleFunc("/dis/", func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/dis/" { - http.Redirect(w, r, "/dis/index", http.StatusTemporaryRedirect) - return - } - http.NotFound(w, r) - }) - - // render everything - mux.Handle("/dis/index", httpx.HTMLHandler[disIndex]{ - Handler: info.disIndex, - Template: indexTemplate, - }) - - mux.Handle("/dis/instance/", httpx.HTMLHandler[disInstance]{ - Handler: info.disInstance, - Template: instanceTemplate, - }) - - // api -- for future usage - mux.Handle("/dis/api/v1/instance/get/", httpx.JSON(info.getinstance)) - mux.Handle("/dis/api/v1/instance/all", httpx.JSON(info.allinstances)) - - // ensure that everyone is logged in! - return httpx.BasicAuth(mux, "WissKI Distillery Admin", func(user, pass string) bool { - return user == info.Config.DisAdminUser && pass == info.Config.DisAdminPassword - }), nil -} - -// disIndex is the context of the "/dis/index" page -type disIndex struct { - Time time.Time - - Config *config.Config - - Instances []instances.WissKIInfo - - TotalCount int - RunningCount int - StoppedCount int - - Backups []models.Snapshot -} - -func (info *Info) disIndex(r *http.Request) (idx disIndex, err error) { - var group errgroup.Group - - group.Go(func() error { - // load instances - idx.Instances, err = info.allinstances(r) - if err != nil { - return err - } - - // count how many are running and how many are stopped - for _, i := range idx.Instances { - if i.Running { - idx.RunningCount++ - } else { - idx.StoppedCount++ - } - } - idx.TotalCount = len(idx.Instances) - - return nil - }) - - // get the log entries - group.Go(func() (err error) { - idx.Backups, err = info.Instances.SnapshotLogFor("") - return - }) - - // get the static properties - idx.Config = info.Config - - // current time - idx.Time = time.Now().UTC() - - // wait for everything! - group.Wait() - - return -} - -// disInstance is the context of the "/dis/instance/*" page -type disInstance struct { - Time time.Time - - Instance models.Instance - Info instances.WissKIInfo -} - -func (info *Info) disInstance(r *http.Request) (is disInstance, err error) { - // find the slug as the last component of path! - slug := strings.TrimSuffix(r.URL.Path, "/") - slug = slug[strings.LastIndex(slug, "/")+1:] - - // find the instance itself! - instance, err := info.Instances.WissKI(slug) - if err == instances.ErrWissKINotFound { - return is, httpx.ErrNotFound - } - if err != nil { - return is, err - } - is.Instance = instance.Instance - - // get some more info about the wisski - is.Info, err = instance.Info(false) - if err != nil { - return is, err - } - - // current time - is.Time = time.Now().UTC() - - return -} - -//go:embed "html/index.html" -var indexTemplateStr string -var indexTemplate = template.Must(template.New("index.html").Parse(indexTemplateStr)) - -//go:embed "html/instance.html" -var instanceTemplateString string -var instanceTemplate = template.Must(template.New("instance.html").Parse(instanceTemplateString)) - -func (info *Info) getinstance(r *http.Request) (iinfo instances.WissKIInfo, err error) { - // find the slug as the last component of path! - slug := strings.TrimSuffix(r.URL.Path, "/") - slug = slug[strings.LastIndex(slug, "/")+1:] - - // load the wisski instance! - wisski, err := info.Instances.WissKI(strings.TrimSuffix(slug, "/")) - if err == instances.ErrWissKINotFound { - return iinfo, httpx.ErrNotFound - } - if err != nil { - return iinfo, err - } - - // get info about it! - return wisski.Info(false) -} - -func (info *Info) allinstances(*http.Request) (infos []instances.WissKIInfo, err error) { - var errgroup errgroup.Group - - // list all the instances - all, err := info.Instances.All() - if err != nil { - return nil, err - } - - // get all of their info! - infos = make([]instances.WissKIInfo, len(all)) - for i, instance := range all { - { - i := i - instance := instance - - errgroup.Go(func() (err error) { - infos[i], err = instance.Info(true) - return err - }) - } - } - - // wait for the results, and return - err = errgroup.Wait() - return -} diff --git a/internal/component/home/public.go b/internal/component/home/public.go index 7bd3643..a356ea4 100644 --- a/internal/component/home/public.go +++ b/internal/component/home/public.go @@ -16,7 +16,7 @@ import ( func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) { timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) { - io.Printf("[%s]: reloading instance list", t.String()) + io.Printf("[%s]: reloading instance list\n", t.Format(time.Stamp)) names, _ := home.instanceMap() home.instanceNames.Set(names) @@ -38,7 +38,7 @@ func (home *Home) instanceMap() (map[string]struct{}, error) { func (home *Home) updateRender(ctx context.Context, io stream.IOStream) { timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) { - io.Printf("[%s]: reloading home render", t.String()) + io.Printf("[%s]: reloading home render\n", t.Format(time.Stamp)) bytes, _ := home.homeRender() home.homeBytes.Set(bytes) diff --git a/internal/component/home/redirect.go b/internal/component/home/redirect.go index 9c652b2..36882d6 100644 --- a/internal/component/home/redirect.go +++ b/internal/component/home/redirect.go @@ -13,7 +13,7 @@ import ( func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) { timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) { - io.Printf("[%s]: reloading overrides", t.String()) + io.Printf("[%s]: reloading overrides\n", t.Format(time.Stamp)) redirect, _ := home.loadRedirect() home.redirect.Set(&redirect) }) @@ -95,6 +95,10 @@ func (redirect Redirect) Redirect(r *http.Request) string { func (redirect Redirect) ServeHTTP(w http.ResponseWriter, r *http.Request) { dest := redirect.Redirect(r) if dest == "" { + if redirect.Fallback == nil { + http.NotFound(w, r) + return + } redirect.Fallback.ServeHTTP(w, r) return } diff --git a/internal/component/control/html/index.html b/internal/component/info/html/index.html similarity index 100% rename from internal/component/control/html/index.html rename to internal/component/info/html/index.html diff --git a/internal/component/control/html/instance.html b/internal/component/info/html/instance.html similarity index 91% rename from internal/component/control/html/instance.html rename to internal/component/info/html/instance.html index 3e73289..8e5952a 100644 --- a/internal/component/control/html/instance.html +++ b/internal/component/info/html/instance.html @@ -65,6 +65,11 @@

+

+ +


+

+ diff --git a/internal/component/info/index.go b/internal/component/info/index.go new file mode 100644 index 0000000..14f94ab --- /dev/null +++ b/internal/component/info/index.go @@ -0,0 +1,85 @@ +package info + +import ( + "html/template" + "net/http" + "time" + + _ "embed" + + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/config" + "github.com/FAU-CDI/wisski-distillery/internal/models" + "golang.org/x/sync/errgroup" +) + +//go:embed "html/index.html" +var indexTemplateStr string +var indexTemplate = template.Must(template.New("index.html").Parse(indexTemplateStr)) + +type indexPageContext struct { + Time time.Time + + Config *config.Config + + Instances []instances.WissKIInfo + + TotalCount int + RunningCount int + StoppedCount int + + Backups []models.Snapshot +} + +func (info *Info) indexPageAPI(r *http.Request) (idx indexPageContext, err error) { + var group errgroup.Group + + group.Go(func() error { + // list all the instances + all, err := info.Instances.All() + if err != nil { + return err + } + + // get all of their info! + idx.Instances = make([]instances.WissKIInfo, len(all)) + for i, instance := range all { + { + i := i + instance := instance + + // store the info for this group! + group.Go(func() (err error) { + idx.Instances[i], err = instance.Info(true) + return err + }) + } + } + + return nil + }) + + // get the log entries + group.Go(func() (err error) { + idx.Backups, err = info.Instances.SnapshotLogFor("") + return + }) + + // get the static properties + idx.Config = info.Config + idx.Time = time.Now().UTC() + + group.Wait() + + // count how many are running and how many are stopped + for _, i := range idx.Instances { + if i.Running { + idx.RunningCount++ + } else { + idx.StoppedCount++ + } + } + idx.TotalCount = len(idx.Instances) + + return +} diff --git a/internal/component/info/info.go b/internal/component/info/info.go new file mode 100644 index 0000000..515321c --- /dev/null +++ b/internal/component/info/info.go @@ -0,0 +1,59 @@ +package info + +import ( + "context" + "net/http" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" + "github.com/FAU-CDI/wisski-distillery/pkg/httpx" + "github.com/tkw1536/goprogram/stream" +) + +type Info struct { + component.ComponentBase + + SnapshotManager *snapshots.Manager + Instances *instances.Instances +} + +func (Info) Name() string { return "control-info" } + +func (*Info) Routes() []string { return []string{"/dis/"} } + +func (info *Info) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) { + mux := http.NewServeMux() + + // handle everything + mux.HandleFunc(route, func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == route { + http.Redirect(w, r, route+"/index", http.StatusTemporaryRedirect) + return + } + http.NotFound(w, r) + }) + + // add a handler for the index page + mux.Handle(route+"index", httpx.HTMLHandler[indexPageContext]{ + Handler: info.indexPageAPI, + Template: indexTemplate, + }) + + // add a handler for the instance page + mux.Handle(route+"instance/", httpx.HTMLHandler[instancePageContext]{ + Handler: info.instancePageAPI, + Template: instanceTemplate, + }) + + handler := &httpx.WebSocket{ + Context: context, + Fallback: mux, + Handler: info.serveSocket, + } + + // ensure that everyone is logged in! + return httpx.BasicAuth(handler, "WissKI Distillery Admin", func(user, pass string) bool { + return user == info.Config.DisAdminUser && pass == info.Config.DisAdminPassword + }), nil +} diff --git a/internal/component/info/instance.go b/internal/component/info/instance.go new file mode 100644 index 0000000..cd880c1 --- /dev/null +++ b/internal/component/info/instance.go @@ -0,0 +1,51 @@ +package info + +import ( + _ "embed" + "html/template" + "net/http" + "strings" + "time" + + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/models" + "github.com/FAU-CDI/wisski-distillery/pkg/httpx" +) + +//go:embed "html/instance.html" +var instanceTemplateString string +var instanceTemplate = template.Must(template.New("instance.html").Parse(instanceTemplateString)) + +type instancePageContext struct { + Time time.Time + + Instance models.Instance + Info instances.WissKIInfo +} + +func (info *Info) instancePageAPI(r *http.Request) (is instancePageContext, err error) { + // find the slug as the last component of path! + slug := strings.TrimSuffix(r.URL.Path, "/") + slug = slug[strings.LastIndex(slug, "/")+1:] + + // find the instance itself! + instance, err := info.Instances.WissKI(slug) + if err == instances.ErrWissKINotFound { + return is, httpx.ErrNotFound + } + if err != nil { + return is, err + } + is.Instance = instance.Instance + + // get some more info about the wisski + is.Info, err = instance.Info(false) + if err != nil { + return is, err + } + + // current time + is.Time = time.Now().UTC() + + return +} diff --git a/internal/component/info/socket.go b/internal/component/info/socket.go new file mode 100644 index 0000000..a23ebce --- /dev/null +++ b/internal/component/info/socket.go @@ -0,0 +1,73 @@ +package info + +import ( + "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" + "github.com/FAU-CDI/wisski-distillery/pkg/httpx" + "github.com/tkw1536/goprogram/status" + "github.com/tkw1536/goprogram/stream" +) + +func (info *Info) serveSocket(conn httpx.WebSocketConnection) { + // read the next message to act on + message, ok := <-conn.Read() + if !ok { + return + } + + switch string(message.Bytes) { + case "snapshot": + slug, ok := <-conn.Read() + if !ok { + return + } + info.serverSocketSnapshot(string(slug.Bytes), info.socketWriter(conn)) + } +} + +func (*Info) socketWriter(conn httpx.WebSocketConnection) *status.LineBuffer { + return &status.LineBuffer{ + Line: func(line string) { + <-conn.WriteText(line) + }, + FlushLineOnClose: true, + } +} + +func (info *Info) serverSocketSnapshot(slug string, writer *status.LineBuffer) { + stream := stream.NewIOStream(writer, writer, nil, 0) + + // get the wisski + wissKI, err := info.Instances.WissKI(slug) + if err != nil { + stream.EPrintln(err) + return + } + + { + err := info.SnapshotManager.HandleSnapshotLike( + stream, + snapshots.SnapshotFlags{ + Dest: "", + Slug: slug, + Title: "Snapshot", + StagingOnly: false, + Do: func(dest string) snapshots.SnapshotLike { + snapshot := info.SnapshotManager.NewSnapshot( + wissKI, + stream, + snapshots.SnapshotDescription{ + Dest: dest, + }, + ) + return &snapshot + }, + }, + ) + if err != nil { + stream.EPrintln(err) + return + } + } + stream.Println("Done") + +} diff --git a/internal/component/resolver/prefixes.go b/internal/component/resolver/prefixes.go index 689ea53..89b2e44 100644 --- a/internal/component/resolver/prefixes.go +++ b/internal/component/resolver/prefixes.go @@ -11,7 +11,7 @@ import ( // updatePrefixes starts updating prefixes func (resolver *Resolver) updatePrefixes(io stream.IOStream, ctx context.Context) { timex.SetInterval(ctx, resolver.RefreshInterval, func(t time.Time) { - io.Printf("[%s]: reloading prefixes", t.String()) + io.Printf("[%s]: reloading prefixes\n", t.Format(time.Stamp)) prefixes, _ := resolver.AllPrefixes() resolver.prefixes.Set(prefixes) }) diff --git a/internal/component/snapshots/iface.go b/internal/component/snapshots/iface.go new file mode 100644 index 0000000..cb5f265 --- /dev/null +++ b/internal/component/snapshots/iface.go @@ -0,0 +1,134 @@ +package snapshots + +import ( + "io" + "path/filepath" + + "github.com/FAU-CDI/wisski-distillery/internal/models" + "github.com/FAU-CDI/wisski-distillery/pkg/environment" + "github.com/FAU-CDI/wisski-distillery/pkg/logging" + "github.com/FAU-CDI/wisski-distillery/pkg/targz" + "github.com/tkw1536/goprogram/status" + "github.com/tkw1536/goprogram/stream" +) + +type SnapshotFlags struct { + Dest string + Slug string + Title string // "Backup" or "Snapshot" + StagingOnly bool + + Do func(dest string) SnapshotLike +} + +type SnapshotLike interface { + LogEntry() models.Snapshot + Report(w io.Writer) (int, error) +} + +func (manager *Manager) HandleSnapshotLike(context stream.IOStream, flags SnapshotFlags) (err error) { + // determine target paths + logging.LogMessage(context, "Determining target paths") + var stagingDir, archivePath string + if flags.StagingOnly { + stagingDir = flags.Dest + } else { + archivePath = flags.Dest + } + if stagingDir == "" { + stagingDir, err = manager.NewStagingDir(flags.Slug) + if err != nil { + return err + } + } + if !flags.StagingOnly && archivePath == "" { + archivePath = manager.NewArchivePath(flags.Slug) + } + context.Printf("Staging Directory: %s\n", stagingDir) + context.Printf("Archive Path: %s\n", archivePath) + + // create the staging directory + logging.LogMessage(context, "Creating staging directory") + err = manager.Environment.Mkdir(stagingDir, environment.DefaultDirPerm) + if !environment.IsExist(err) && err != nil { + return err + } + + // if it was requested to not do staging only + // we need the staging directory to be deleted at the end + if !flags.StagingOnly { + defer func() { + logging.LogMessage(context, "Removing staging directory") + manager.Environment.RemoveAll(stagingDir) + }() + } + + // create the actual snapshot or backup + // write out the report + // and retain a log entry + var entry models.Snapshot + logging.LogOperation(func() error { + // do the snapshot! + sl := flags.Do(stagingDir) + + // create a log entry + entry = sl.LogEntry() + + // find the report path + reportPath := filepath.Join(stagingDir, "report.txt") + context.Println(reportPath) + + // create the path + report, err := manager.Environment.Create(reportPath, environment.DefaultFilePerm) + if err != nil { + return err + } + + // and write out the report + { + _, err := sl.Report(report) + return err + } + }, context, "Generating %s", flags.Title) + + // if we only requested staging + // all that is left is to write the log entry + if flags.StagingOnly { + logging.LogMessage(context, "Writing Log Entry") + + // write out the log entry + entry.Path = stagingDir + entry.Packed = false + manager.Instances.AddSnapshotLog(entry) + + context.Printf("Wrote %s\n", stagingDir) + return nil + } + + // package everything up as an archive! + if err := logging.LogOperation(func() error { + var count int64 + defer func() { context.Printf("Wrote %d byte(s) to %s\n", count, archivePath) }() + + st := status.NewWithCompat(context.Stdout, 1) + st.Start() + defer st.Stop() + + count, err = targz.Package(manager.Environment, archivePath, stagingDir, func(dst, src string) { + st.Set(0, dst) + }) + + return err + }, context, "Writing archive"); err != nil { + return err + } + + // write out the log entry + logging.LogMessage(context, "Writing Log Entry") + entry.Path = archivePath + entry.Packed = true + manager.Instances.AddSnapshotLog(entry) + + // and we're done! + return nil +} diff --git a/internal/component/snapshots/snapshot.go b/internal/component/snapshots/snapshot.go index 0e4027f..c83f92c 100644 --- a/internal/component/snapshots/snapshot.go +++ b/internal/component/snapshots/snapshot.go @@ -19,7 +19,6 @@ import ( // SnapshotDescription is a description for a snapshot type SnapshotDescription struct { Dest string // destination path - Log bool // should we log the creation of this snapshot? Keepalive bool // should we keep the instance alive while making the snapshot? } diff --git a/internal/component/static/dist/control/index.css b/internal/component/static/dist/control/index.css index ebf019c..4e11381 100644 --- a/internal/component/static/dist/control/index.css +++ b/internal/component/static/dist/control/index.css @@ -1 +1 @@ -html{color:#1a1a1a;background-color:#fdfdfd;font-family:Roboto;font-size:20px;line-height:1.5}body{max-width:36em;-webkit-hyphens:auto;hyphens:auto;overflow-wrap:break-word;text-rendering:optimizelegibility;font-kerning:normal;margin:0 auto;padding:50px}@media (max-width:600px){body{padding:1em;font-size:.9em}}h1{margin-top:1.4em}h2,h3{margin-top:1em}code{color:#00f;font-family:Roboto Mono}p{text-align:justify;margin:1em 0}a,a:visited{color:#1a1a1a}footer{text-align:center;border-top:1px solid #1a1a1a;font-size:small}.header-link{opacity:0;font-size:.8em;text-decoration:none;transition:opacity .2s ease-in-out .1s;position:relative;left:.5em}h2:hover .header-link,h3:hover .header-link,h4:hover .header-link,h5:hover .header-link,h6:hover .header-link{opacity:1}.wisski{padding-left:5px}.wisski.running{background-color:#9ada07}.wisski.stopped{background-color:#ff7a7a} \ No newline at end of file +html{color:#1a1a1a;background-color:#fdfdfd;font-family:Roboto;font-size:20px;line-height:1.5}body{max-width:36em;-webkit-hyphens:auto;hyphens:auto;overflow-wrap:break-word;text-rendering:optimizelegibility;font-kerning:normal;margin:0 auto;padding:50px}@media (max-width:600px){body{padding:1em;font-size:.9em}}h1{margin-top:1.4em}h2,h3{margin-top:1em}code{color:#00f;font-family:Roboto Mono}p{text-align:justify;margin:1em 0}a,a:visited{color:#1a1a1a}footer{text-align:center;border-top:1px solid #1a1a1a;font-size:small}.header-link{opacity:0;font-size:.8em;text-decoration:none;transition:opacity .2s ease-in-out .1s;position:relative;left:.5em}h2:hover .header-link,h3:hover .header-link,h4:hover .header-link,h5:hover .header-link,h6:hover .header-link{opacity:1}.wisski{padding-left:5px}.wisski.running{background-color:#9ada07}.wisski.stopped{background-color:#ff7a7a}.remote-action-out{font-size:small} \ No newline at end of file diff --git a/internal/component/static/dist/control/index.js b/internal/component/static/dist/control/index.js index d6b9f86..7f0a620 100644 --- a/internal/component/static/dist/control/index.js +++ b/internal/component/static/dist/control/index.js @@ -1 +1 @@ -(()=>{const e=e=>{const t=document.getElementsByTagName("h"+e);Array.from(t).forEach((e=>{void 0!==e.id&&""!==e.id&&e.appendChild((e=>{const t=document.createElement("a");return t.className="header-link",t.href="#"+e,t.innerHTML="#",t})(e.id))}))};new Array(6).fill(0).forEach(((t,n)=>e(n+1)));const t={date:e=>new Date(e.innerText).toISOString(),path:e=>{const t=e.innerText.split("/");return t[t.length-1]},pathbuilders:()=>{const e=window.pathbuilders,t=document.createElement("span");let o=!1;if(Object.keys(e).forEach((r=>{o=!0;const a=r+".xml",c=e[r];t.append(n(a,r,c,"application/xml")),t.append(document.createTextNode(" "))})),!o)return"(none)";const r=document.createElement("small");return r.append(document.createTextNode("(click to download)")),t.append(r),t}},n=(e,t,n,o)=>{const r=new Blob([n],{type:o??"text/plain"}),a=document.createElement("a");return a.target="_blank",a.download=e,a.href=URL.createObjectURL(r),a.append(document.createTextNode(t)),a};Object.keys(t).forEach((e=>{const n=t[e];document.querySelectorAll("code."+e).forEach((e=>{const t=n(e);if("string"==typeof t)return e.innerHTML="",void e.appendChild(document.createTextNode(t));e.parentNode.replaceChild(t,e)}))}))})(); \ No newline at end of file +(()=>{const e=e=>{const t=document.getElementsByTagName("h"+e);Array.from(t).forEach((e=>{void 0!==e.id&&""!==e.id&&e.appendChild((e=>{const t=document.createElement("a");return t.className="header-link",t.href="#"+e,t.innerHTML="#",t})(e.id))}))};new Array(6).fill(0).forEach(((t,n)=>e(n+1)));const t={date:e=>new Date(e.innerText).toISOString(),path:e=>{const t=e.innerText.split("/");return t[t.length-1]},pathbuilders:()=>{const e=window.pathbuilders,t=document.createElement("span");let o=!1;if(Object.keys(e).forEach((r=>{o=!0;const a=r+".xml",c=e[r];t.append(n(a,r,c,"application/xml")),t.append(document.createTextNode(" "))})),!o)return"(none)";const r=document.createElement("small");return r.append(document.createTextNode("(click to download)")),t.append(r),t}},n=(e,t,n,o)=>{const r=new Blob([n],{type:o??"text/plain"}),a=document.createElement("a");return a.target="_blank",a.download=e,a.href=URL.createObjectURL(r),a.append(document.createTextNode(t)),a};Object.keys(t).forEach((e=>{const n=t[e];document.querySelectorAll("code."+e).forEach((e=>{const t=n(e);if("string"==typeof t)return e.innerHTML="",void e.appendChild(document.createTextNode(t));e.parentNode.replaceChild(t,e)}))}));const o=document.getElementsByClassName("remote-action");Array.from(o).forEach((e=>{const t=e.getAttribute("data-action"),n=e.getAttribute("data-param"),o=document.querySelector(e.getAttribute("data-target")),r=function(){const t=parseInt(e.getAttribute("data-buffer")??"",10)??0;return isFinite(t)&&t>0?t:0}();let a=!1;e.addEventListener("click",(function(c){if(c.preventDefault(),a)return;a=!0,e.setAttribute("disabled","disabled");const d=function(){e.removeAttribute("disabled"),a=!1};o.innerText="";const i=[],s=function(e){0!==r?(i.push(e),i.length>r&&i.splice(0,i.length-r),o.innerText=i.join("\n")):o.innerText+=e+"\n"};var l,u;s("Connecting ..."),(l=e=>{s("Connected"),e.send(t),"string"==typeof n&&e.send(n)},u=e=>{s(e)},new Promise(((e,t)=>{const n=new WebSocket(location.href.replace("http","ws"));n.onclose=e,n.onerror=t,n.onmessage=e=>u(e.data),n.onopen=()=>l(n)}))).then((()=>{s("Connection closed.\n"),d()})).catch((()=>{s("Connection errored.\n"),d()}))}))}))})(); \ No newline at end of file diff --git a/internal/component/static/src/control/highlight.ts b/internal/component/static/src/control/highlight.ts new file mode 100644 index 0000000..5c16d9c --- /dev/null +++ b/internal/component/static/src/control/highlight.ts @@ -0,0 +1,65 @@ + +const types: Record HTMLElement | string> = { + "date": (element) => { + return (new Date(element.innerText)).toISOString() + }, + "path": (element) => { + const text = element.innerText.split("/"); + return text[text.length - 1]; + }, + "pathbuilders": () => { + const pathbuilders: {[name: string]: string} = (window as any).pathbuilders; // must be declared globally on page! + const wrapper = document.createElement("span"); + + let found_one = false + Object.keys(pathbuilders).forEach(name => { + found_one = true + + const filename = name + ".xml" + const data = pathbuilders[name] + const mime = "application/xml" + wrapper.append(make_download_link(filename, name, data, mime)) + wrapper.append(document.createTextNode(" ")) + }) + + if (!found_one) return '(none)'; + + const small = document.createElement('small') + small.append(document.createTextNode("(click to download)")) + wrapper.append(small) + + return wrapper + } +} + +const make_download_link = (filename: string, title: string, content: string, type: string) => { + const blob = new Blob( + [content], + { + type: type ?? "text/plain" + } + ); + + const link = document.createElement("a") + link.target = "_blank" + link.download = filename + link.href = URL.createObjectURL(blob) + link.append(document.createTextNode(title)) + + return link +} + +Object.keys(types).forEach(key => { + const f = types[key]; + const elements = document.querySelectorAll("code." + key) as NodeListOf + elements.forEach(element => { + const newElement = f(element) + if (typeof newElement === 'string') { + element.innerHTML = "" + element.appendChild(document.createTextNode(newElement)) + return + } + + element.parentNode!.replaceChild(newElement, element) + }) +}) \ No newline at end of file diff --git a/internal/component/static/src/control/index.css b/internal/component/static/src/control/index.css index e740a16..0de8706 100644 --- a/internal/component/static/src/control/index.css +++ b/internal/component/static/src/control/index.css @@ -11,3 +11,7 @@ .wisski.stopped { background-color: #ff7a7a; } + +.remote-action-out { + font-size: small; +} diff --git a/internal/component/static/src/control/index.ts b/internal/component/static/src/control/index.ts index 03bd0c2..2f1489f 100644 --- a/internal/component/static/src/control/index.ts +++ b/internal/component/static/src/control/index.ts @@ -1,68 +1,5 @@ import '../global.ts'; import './index.css'; - -const types: Record HTMLElement | string> = { - "date": (element) => { - return (new Date(element.innerText)).toISOString() - }, - "path": (element) => { - const text = element.innerText.split("/"); - return text[text.length - 1]; - }, - "pathbuilders": () => { - const pathbuilders: {[name: string]: string} = (window as any).pathbuilders; // must be declared globally on page! - const wrapper = document.createElement("span"); - - let found_one = false - Object.keys(pathbuilders).forEach(name => { - found_one = true - - const filename = name + ".xml" - const data = pathbuilders[name] - const mime = "application/xml" - wrapper.append(make_download_link(filename, name, data, mime)) - wrapper.append(document.createTextNode(" ")) - }) - - if (!found_one) return '(none)'; - - const small = document.createElement('small') - small.append(document.createTextNode("(click to download)")) - wrapper.append(small) - - return wrapper - } -} - -const make_download_link = (filename: string, title: string, content: string, type: string) => { - const blob = new Blob( - [content], - { - type: type ?? "text/plain" - } - ); - - const link = document.createElement("a") - link.target = "_blank" - link.download = filename - link.href = URL.createObjectURL(blob) - link.append(document.createTextNode(title)) - - return link -} - -Object.keys(types).forEach(key => { - const f = types[key]; - const elements = document.querySelectorAll("code." + key) as NodeListOf - elements.forEach(element => { - const newElement = f(element) - if (typeof newElement === 'string') { - element.innerHTML = "" - element.appendChild(document.createTextNode(newElement)) - return - } - - element.parentNode!.replaceChild(newElement, element) - }) -}) \ No newline at end of file +import './highlight.ts'; +import './remote.ts'; diff --git a/internal/component/static/src/control/remote.ts b/internal/component/static/src/control/remote.ts new file mode 100644 index 0000000..7200eb8 --- /dev/null +++ b/internal/component/static/src/control/remote.ts @@ -0,0 +1,62 @@ +import connectSocket from '../socket/socket'; + +const elements = document.getElementsByClassName('remote-action') +Array.from(elements).forEach((element) => { + const action = element.getAttribute('data-action') as string; + const param = element.getAttribute('data-param') as string | undefined; + const target = document.querySelector(element.getAttribute('data-target')!) as HTMLElement; + const bufferSize = (function() { + const number = parseInt(element.getAttribute('data-buffer') ?? "", 10) ?? 0; + return (isFinite(number) && number > 0) ? number : 0; + })() + + let running = false + element.addEventListener('click', function(ev) { + ev.preventDefault(); + + // already running + if (running) return + + running = true + element.setAttribute('disabled', 'disabled'); + const close = function() { + element.removeAttribute('disabled'); + running = false; + } + + target.innerText = ""; + + const buffer: Array = []; + const println = function(line: string) { + if(bufferSize === 0) { + target.innerText += line + "\n"; + return; + } + + buffer.push(line); + if(buffer.length > bufferSize) { + buffer.splice(0, buffer.length - bufferSize) + } + target.innerText = buffer.join("\n"); + } + + println("Connecting ...") + + // connect to the socket and send the action + connectSocket((socket) => { + println("Connected") + socket.send(action); + if (typeof param === 'string') { + socket.send(param); + } + }, (data) => { + println(data); + }).then(() => { + println("Connection closed.\n") + close(); + }).catch(() => { + println("Connection errored.\n") + close(); + }); + }); +}) \ No newline at end of file diff --git a/internal/component/static/src/socket/socket.ts b/internal/component/static/src/socket/socket.ts new file mode 100644 index 0000000..39b9649 --- /dev/null +++ b/internal/component/static/src/socket/socket.ts @@ -0,0 +1,11 @@ +export default function connectSocket(onOpen: (socket: WebSocket) => void, onData: (data: any) => void): Promise { + return new Promise((rs, rj) => { + const socket = new WebSocket(location.href.replace('http', 'ws')); + + socket.onclose = rs; + socket.onerror = rj; + + socket.onmessage = (ev) => onData(ev.data) + socket.onopen = () => onOpen(socket); + }); +} \ No newline at end of file diff --git a/internal/dis/component.go b/internal/dis/component.go index 751c7e9..fc44e33 100644 --- a/internal/dis/component.go +++ b/internal/dis/component.go @@ -6,6 +6,7 @@ import ( "github.com/FAU-CDI/wisski-distillery/internal/component" "github.com/FAU-CDI/wisski-distillery/internal/component/control" "github.com/FAU-CDI/wisski-distillery/internal/component/home" + "github.com/FAU-CDI/wisski-distillery/internal/component/info" "github.com/FAU-CDI/wisski-distillery/internal/component/instances" "github.com/FAU-CDI/wisski-distillery/internal/component/resolver" "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" @@ -52,7 +53,7 @@ func (dis *Distillery) register(context *component.PoolContext) []component.Comp r(dis, context, func(resolver *resolver.Resolver) { resolver.RefreshInterval = time.Minute }), - ra[*control.Info](dis, context), + ra[*info.Info](dis, context), } } diff --git a/pkg/httpx/socket.go b/pkg/httpx/socket.go new file mode 100644 index 0000000..620c762 --- /dev/null +++ b/pkg/httpx/socket.go @@ -0,0 +1,315 @@ +package httpx + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/FAU-CDI/wisski-distillery/pkg/lazy" + "github.com/gorilla/websocket" +) + +// WebSocket implements serving a WebSocket +type WebSocket struct { + Context context.Context // context which closes all connections + Limits WebSocketLimits // limits for websocket operations + + Handler func(ws WebSocketConnection) + Fallback http.Handler + + pool lazy.Lazy[*sync.Pool] // pool holds *WebSocketConn objects + upgrader websocket.Upgrader // upgrades upgrades connections +} + +type WebSocketLimits struct { + WriteWait time.Duration // maximum time to wait for writing + PongWait time.Duration // time to wait for pong responses + PingInterval time.Duration // interval to send pings to the client + MaxMessageSize int64 // maximal message size in bytes +} + +func (limits *WebSocketLimits) SetDefaults() { + if limits.WriteWait == 0 { + limits.WriteWait = 10 * time.Second + } + if limits.PongWait == 0 { + limits.PongWait = time.Minute + } + if limits.PingInterval <= 0 { + limits.PingInterval = (limits.PongWait * 9) / 10 + } + if limits.MaxMessageSize <= 0 { + limits.MaxMessageSize = 2048 + } +} + +// makePoolSocket creates a new socket and makes sure that the pool is initialized +func (h *WebSocket) makePoolSocket() *webSocketConn { + return h.pool.Get(func() *sync.Pool { + return &sync.Pool{ + New: func() any { return new(webSocketConn) }, + } + }).Get().(*webSocketConn) +} + +func (h *WebSocket) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // if the user did not request a websocket, go to the fallbacjk handler + if !websocket.IsWebSocketUpgrade(r) { + h.serveFallback(w, r) + return + } + + // else deal with the websocket! + h.serveWebsocket(w, r) +} + +func (h *WebSocket) serveFallback(w http.ResponseWriter, r *http.Request) { + if h.Fallback == nil { + http.NotFound(w, r) + return + } + + h.Fallback.ServeHTTP(w, r) +} + +func (h *WebSocket) serveWebsocket(w http.ResponseWriter, r *http.Request) { + // upgrade the connection or bail out! + conn, err := h.upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + // get a new socket from the pool + socket := h.makePoolSocket() + socket.Serve(h.Context, h.Limits, conn, h.Handler) + + // return a reset socket to the pool + socket.reset() + h.pool.Get(nil).Put(socket) +} + +// WebSocketConnection represents a connected Websocket +type WebSocketConnection interface { + // Context returns a context that is closed once this websocket is closed. + Context() context.Context + + // Read returns a channel that receives message. + // The channel is closed once no more messags are available. + Read() <-chan WebSocketMessage + + // Write queues the provided message for sending + // and returns a channel that is closed once the message has been sent. + Write(WebSocketMessage) <-chan struct{} + + // WriteText is a convenience method to send a TextMessage + WriteText(text string) <-chan struct{} + + // Close closes the underlying connection + Close() +} + +// WebSocketMessage represents a connected Websocket +type WebSocketMessage struct { + Type int + Bytes []byte +} + +type outWebSocketMessage struct { + WebSocketMessage + done chan<- struct{} // done should be closed when finished +} + +// webSocketConn implements [WebSocketConnection] +type webSocketConn struct { + conn *websocket.Conn // underlying connection + limits WebSocketLimits + + context context.Context // context to cancel the connection + cancel context.CancelFunc + + wg sync.WaitGroup // blocks all the ongoing tasks + + // incoming and outgoing tasks + incoming chan WebSocketMessage + outgoing chan outWebSocketMessage +} + +// Serve serves the provided connection +func (h *webSocketConn) Serve(ctx context.Context, limits WebSocketLimits, conn *websocket.Conn, handler func(ws WebSocketConnection)) { + // use the connection! + h.conn = conn + + // setup limits + h.limits = limits + h.limits.SetDefaults() + + // create a context for the connection + if ctx == nil { + ctx = context.Background() + } + h.context, h.cancel = context.WithCancel(ctx) + + // start receiving and sending messages + h.wg.Add(2) + h.sendMessages() + h.recvMessages() + + // wait for the context to be cancelled, then close the connection + h.wg.Add(1) + go func() { + defer h.wg.Done() + <-h.context.Done() + h.conn.Close() + }() + + // start the application logic + h.wg.Add(1) + go h.handle(handler) + + // wait for closing operations + h.wg.Wait() +} + +func (h *webSocketConn) handle(handler func(ws WebSocketConnection)) { + defer func() { + h.wg.Done() + h.cancel() + }() + + handler(h) +} + +func (h *webSocketConn) sendMessages() { + h.outgoing = make(chan outWebSocketMessage) + + go func() { + // close connection when done! + defer func() { + h.wg.Done() + h.cancel() + }() + + // setup a timer for pings! + ticker := time.NewTicker(h.limits.PingInterval) + defer ticker.Stop() + + for { + select { + // everything is done! + case <-h.context.Done(): + return + + // send outgoing messages + case message := <-h.outgoing: + (func() { + defer close(message.done) + + err := h.writeRaw(message.Type, message.Bytes) + if err != nil { + return + } + message.done <- struct{}{} + })() + // send a ping message + case <-ticker.C: + if err := h.writeRaw(websocket.PingMessage, []byte{}); err != nil { + return + } + } + } + }() + +} + +// writeRaw writes to the underlying socket +func (h *webSocketConn) writeRaw(messageType int, data []byte) error { + h.conn.SetWriteDeadline(time.Now().Add(h.limits.WriteWait)) + return h.conn.WriteMessage(messageType, data) +} + +// Write writes a message to the websocket connection. +func (sh *webSocketConn) Write(message WebSocketMessage) <-chan struct{} { + callback := make(chan struct{}, 1) + go func() { + select { + // write an outgoing message + case sh.outgoing <- outWebSocketMessage{ + WebSocketMessage: message, + done: callback, + }: + // context + case <-sh.context.Done(): + close(callback) + } + }() + return callback +} + +func (sh *webSocketConn) WriteText(text string) <-chan struct{} { + return sh.Write(WebSocketMessage{ + Type: websocket.TextMessage, + Bytes: []byte(text), + }) +} + +func (h *webSocketConn) recvMessages() { + h.incoming = make(chan WebSocketMessage) + + // set a read handler + h.conn.SetReadLimit(h.limits.MaxMessageSize) + + // configure a pong handler + h.conn.SetReadDeadline(time.Now().Add(h.limits.PongWait)) + h.conn.SetPongHandler(func(string) error { h.conn.SetReadDeadline(time.Now().Add(h.limits.PongWait)); return nil }) + + // handle incoming messages + go func() { + // close connection when done! + defer func() { + h.wg.Done() + h.cancel() + }() + + for { + messageType, messageBytes, err := h.conn.ReadMessage() + if err != nil { + return + } + + // try to send a message to the incoming message channel + select { + case h.incoming <- WebSocketMessage{ + Type: messageType, + Bytes: messageBytes, + }: + case <-h.context.Done(): + return + } + } + }() +} + +// Read returns a channel that receives incoming messages. +// The channel is close once no more messages are available, or the context is canceled. +func (h *webSocketConn) Read() <-chan WebSocketMessage { + return h.incoming +} + +// Context returns a context that is closed once this connection is closed. +func (h *webSocketConn) Context() context.Context { + return h.context +} + +func (h *webSocketConn) Close() { + h.cancel() +} + +// reset resets this websocket +func (h *webSocketConn) reset() { + h.limits = WebSocketLimits{} + h.conn = nil + h.incoming = nil + h.outgoing = nil + h.context, h.cancel = nil, nil +} diff --git a/pkg/logging/log.go b/pkg/logging/log.go index 477ad08..e3bd269 100644 --- a/pkg/logging/log.go +++ b/pkg/logging/log.go @@ -22,8 +22,8 @@ func LogMessage(io stream.IOStream, format string, args ...interface{}) (int, er func logOperation(io stream.IOStream, indent int, format string, args ...interface{}) (int, error) { message := "\033[1m" + strings.Repeat(" ", indent+1) + "=> " + format + "\033[0m\n" - if !io.StdinIsATerminal() { - message = " => " + format + if !io.StdoutIsATerminal() { + message = " => " + format + "\n" } return io.Printf(message, args...)