wdcli: Use progress writer instead of IOStream

This commit is contained in:
Tom Wiesing 2022-11-30 11:39:29 +01:00
parent 890022ae64
commit 3b78b06fff
No known key found for this signature in database
49 changed files with 396 additions and 393 deletions

View file

@ -2,6 +2,7 @@ package component
import (
"context"
"fmt"
"io"
"path/filepath"
@ -9,7 +10,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/FAU-CDI/wisski-distillery/pkg/fsx"
"github.com/pkg/errors"
"github.com/tkw1536/goprogram/stream"
)
// Backupable represents a component with a Backup method
@ -39,8 +39,8 @@ type Snapshotable interface {
// StagingContext represents a context for [Backupable] and [Snapshotable]
type StagingContext interface {
// IO returns the input output stream belonging to this backup file
IO() stream.IOStream
// Progress returns a writer to write progress information to.
Progress() io.Writer
// Name creates a new directory inside the destination.
// Passing the empty path creates the destination as a directory.
@ -66,11 +66,11 @@ type StagingContext interface {
}
// NewStagingContext returns a new [StagingContext]
func NewStagingContext(ctx context.Context, env environment.Environment, io stream.IOStream, path string, manifest chan<- string) StagingContext {
func NewStagingContext(ctx context.Context, env environment.Environment, progress io.Writer, path string, manifest chan<- string) StagingContext {
return &stagingContext{
ctx: ctx,
env: env,
io: io,
progress: progress,
path: path,
manifest: manifest,
}
@ -80,7 +80,7 @@ func NewStagingContext(ctx context.Context, env environment.Environment, io stre
type stagingContext struct {
ctx context.Context
env environment.Environment // environment
io stream.IOStream // context the files are sent to
progress io.Writer // writer to direct progress to
path string // path to send files to
manifest chan<- string // channel the manifest is sent to
}
@ -93,12 +93,12 @@ func (bc *stagingContext) sendPath(path string) {
return
}
bc.io.Println(dst)
fmt.Fprintln(bc.progress, dst)
bc.manifest <- dst
}
func (bc *stagingContext) IO() stream.IOStream {
return bc.io
func (bc *stagingContext) Progress() io.Writer {
return bc.progress
}
var errResolveAbsolute = errors.New("resolve: path must be relative")

View file

@ -3,13 +3,13 @@ package home
import (
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/instances"
"github.com/FAU-CDI/wisski-distillery/pkg/lazy"
"github.com/tkw1536/goprogram/stream"
)
type Home struct {
@ -30,10 +30,10 @@ var (
func (*Home) Routes() []string { return []string{"/"} }
func (home *Home) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
home.updateRedirect(ctx, io)
home.updateInstances(ctx, io)
home.updateRender(ctx, io)
func (home *Home) Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error) {
home.updateRedirect(ctx, progress)
home.updateInstances(ctx, progress)
home.updateRender(ctx, progress)
return home, nil
}

View file

@ -3,6 +3,8 @@ package home
import (
"bytes"
"context"
"fmt"
"io"
"time"
_ "embed"
@ -10,14 +12,13 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/control/static"
"github.com/FAU-CDI/wisski-distillery/internal/status"
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
"github.com/tkw1536/goprogram/stream"
"golang.org/x/sync/errgroup"
)
func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) {
func (home *Home) updateInstances(ctx context.Context, progress io.Writer) {
go func() {
for t := range timex.TickContext(ctx, home.RefreshInterval) {
io.Printf("[%s]: reloading instance list\n", t.Format(time.Stamp))
fmt.Fprintf(progress, "[%s]: reloading instance list\n", t.Format(time.Stamp))
err := (func() error {
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
@ -32,7 +33,7 @@ func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) {
return nil
})()
if err != nil {
io.EPrintf("error reloading instances: ", err.Error())
fmt.Fprintf(progress, "error reloading instances: %s", err.Error())
}
}
}()
@ -51,10 +52,10 @@ func (home *Home) instanceMap(ctx context.Context) (map[string]struct{}, error)
return names, nil
}
func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
func (home *Home) updateRender(ctx context.Context, progress io.Writer) {
go func() {
for t := range timex.TickContext(ctx, home.RefreshInterval) {
io.Printf("[%s]: reloading home render list\n", t.Format(time.Stamp))
fmt.Fprintf(progress, "[%s]: reloading home render list\n", t.Format(time.Stamp))
err := (func() error {
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
@ -69,7 +70,7 @@ func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
return nil
})()
if err != nil {
io.EPrintf("error reloading instances: ", err.Error())
fmt.Fprintf(progress, "error reloading instances: %s", err.Error())
}
}
}()

View file

@ -3,18 +3,19 @@ package home
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
"github.com/tkw1536/goprogram/stream"
)
func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) {
func (home *Home) updateRedirect(ctx context.Context, progress io.Writer) {
go func() {
for t := range timex.TickContext(ctx, home.RefreshInterval) {
io.Printf("[%s]: reloading overrides\n", t.Format(time.Stamp))
fmt.Fprintf(progress, "[%s]: reloading overrides\n", t.Format(time.Stamp))
err := (func() error {
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
@ -29,7 +30,7 @@ func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) {
return nil
})()
if err != nil {
io.EPrintf("error reloading overrides: ", err.Error())
fmt.Fprintf(progress, "error reloading overrides: %s", err.Error())
}
}

View file

@ -2,6 +2,7 @@ package info
import (
"context"
"io"
"net/http"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
@ -12,7 +13,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/instances"
"github.com/FAU-CDI/wisski-distillery/pkg/httpx"
"github.com/FAU-CDI/wisski-distillery/pkg/lazy"
"github.com/tkw1536/goprogram/stream"
)
type Info struct {
@ -33,7 +33,7 @@ var (
func (*Info) Routes() []string { return []string{"/dis/"} }
func (info *Info) Handler(ctx context.Context, route string, io stream.IOStream) (handler http.Handler, err error) {
func (info *Info) Handler(ctx context.Context, route string, progress io.Writer) (handler http.Handler, err error) {
router := mux.NewRouter()
{
socket := &httpx.WebSocket{

View file

@ -3,28 +3,29 @@ package info
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/exporter"
"github.com/FAU-CDI/wisski-distillery/internal/wisski"
"github.com/FAU-CDI/wisski-distillery/pkg/httpx"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
type InstanceAction struct {
NumParams int
HandleInteractive func(ctx context.Context, info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error
HandleInteractive func(ctx context.Context, info *Info, instance *wisski.WissKI, out io.Writer, params ...string) error
HandleResult func(ctx context.Context, info *Info, instance *wisski.WissKI, params ...string) (value any, err error)
}
var socketInstanceActions = map[string]InstanceAction{
"snapshot": {
HandleInteractive: func(ctx context.Context, info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
HandleInteractive: func(ctx context.Context, info *Info, instance *wisski.WissKI, out io.Writer, params ...string) error {
return info.Exporter.MakeExport(
ctx,
str,
out,
exporter.ExportTask{
Dest: "",
Instance: instance,
@ -35,17 +36,17 @@ var socketInstanceActions = map[string]InstanceAction{
},
},
"rebuild": {
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
return instance.Barrel().Build(ctx, str, true)
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, out io.Writer, params ...string) error {
return instance.Barrel().Build(ctx, out, true)
},
},
"update": {
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
return instance.Drush().Update(ctx, str)
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, out io.Writer, params ...string) error {
return instance.Drush().Update(ctx, out)
},
},
"cron": {
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str io.Writer, params ...string) error {
return instance.Drush().Cron(ctx, str)
},
},
@ -108,33 +109,30 @@ func (info *Info) handleInstanceAction(conn httpx.WebSocketConnection, action In
}
defer writer.Close()
str := stream.NewIOStream(writer, writer, nil, 0)
// handle the interactive action
if action.HandleInteractive != nil {
err := action.HandleInteractive(conn.Context(), info, instance, str, params...)
err := action.HandleInteractive(conn.Context(), info, instance, writer, params...)
if err != nil {
str.EPrintln(err)
fmt.Fprintln(writer, err)
return
}
str.Println("done")
fmt.Fprintln(writer, "done")
}
// handle the result computation
if action.HandleResult != nil {
result, err := action.HandleResult(conn.Context(), info, instance, params...)
if err != nil {
str.Println("false")
fmt.Fprintln(writer, "false")
return
}
data, err := json.Marshal(result)
if err != nil {
str.Println("false")
fmt.Fprintln(writer, "false")
return
}
data = append(data, "\n"...)
str.Println("true")
str.Stdout.Write(data)
fmt.Fprintln(writer, "true")
fmt.Fprintln(writer, data)
}
}

View file

@ -2,24 +2,24 @@ package control
import (
"context"
"fmt"
"io"
"net/http"
"github.com/tkw1536/goprogram/stream"
)
// Server returns an http.Mux that implements the main server instance.
// The server may spawn background tasks, but these should be terminated once context closes.
//
// Logging messages are directed to io.
func (control *Control) Server(ctx context.Context, io stream.IOStream) (*http.ServeMux, error) {
// Logging messages are directed to progress
func (control *Control) Server(ctx context.Context, progress io.Writer) (*http.ServeMux, error) {
// create a new mux
mux := http.NewServeMux()
// add all the servable routes!
for _, s := range control.Servables {
for _, route := range s.Routes() {
io.Printf("mounting %s\n", route)
handler, err := s.Handler(ctx, route, io)
fmt.Fprintf(progress, "mounting %s\n", route)
handler, err := s.Handler(ctx, route, progress)
if err != nil {
return nil, err
}

View file

@ -4,11 +4,11 @@ package static
import (
"context"
"embed"
"io"
"io/fs"
"net/http"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/tkw1536/goprogram/stream"
)
type Static struct {
@ -24,7 +24,7 @@ func (*Static) Routes() []string { return []string{"/static/"} }
//go:embed dist
var staticFS embed.FS
func (static *Static) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
func (static *Static) Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error) {
// take the filesystem
fs, err := fs.Sub(staticFS, "dist")
if err != nil {

View file

@ -13,7 +13,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
"golang.org/x/exp/slices"
)
@ -50,7 +49,7 @@ type BackupDescription struct {
}
// New create a new Backup
func (exporter *Exporter) NewBackup(ctx context.Context, io stream.IOStream, description BackupDescription) (backup Backup) {
func (exporter *Exporter) NewBackup(ctx context.Context, progress io.Writer, description BackupDescription) (backup Backup) {
backup.Description = description
// catch anything critical that happened during the snapshot
@ -61,16 +60,16 @@ func (exporter *Exporter) NewBackup(ctx context.Context, io stream.IOStream, des
// do the create keeping track of time!
logging.LogOperation(func() error {
backup.StartTime = time.Now().UTC()
backup.run(ctx, io, exporter)
backup.run(ctx, progress, exporter)
backup.EndTime = time.Now().UTC()
return nil
}, io, "Writing backup files")
}, progress, "Writing backup files")
return
}
func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Exporter) {
func (backup *Backup) run(ctx context.Context, progress io.Writer, exporter *Exporter) {
// create a manifest
manifest, done := backup.handleManifest(backup.Description.Dest)
defer done()
@ -81,7 +80,7 @@ func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Ex
// Component backup tasks
logging.LogOperation(func() error {
st := status.NewWithCompat(ios.Stdout, 0)
st := status.NewWithCompat(progress, 0)
st.Start()
defer st.Stop()
@ -96,7 +95,7 @@ func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Ex
component.NewStagingContext(
ctx,
exporter.Environment,
stream.NewIOStream(writer, writer, nil, 0),
writer,
filepath.Join(backup.Description.Dest, bc.BackupName()),
manifest,
),
@ -111,11 +110,11 @@ func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Ex
}
return nil
}, ios, "Backing up core components")
}, progress, "Backing up core components")
// backup instances
logging.LogOperation(func() error {
st := status.NewWithCompat(ios.Stdout, 0)
st := status.NewWithCompat(progress, 0)
st.Start()
defer st.Stop()
@ -149,7 +148,7 @@ func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Ex
manifest <- dir
return exporter.NewSnapshot(ctx, instance, stream.NewIOStream(writer, writer, nil, 0), SnapshotDescription{
return exporter.NewSnapshot(ctx, instance, writer, SnapshotDescription{
Dest: dir,
})
},
@ -166,6 +165,6 @@ func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Ex
})
return nil
}, ios, "Creating instance snapshots")
}, progress, "Creating instance snapshots")
}

View file

@ -2,6 +2,7 @@ package exporter
import (
"context"
"fmt"
"io"
"path/filepath"
@ -11,7 +12,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/FAU-CDI/wisski-distillery/pkg/targz"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
// ExportTask describes a task that makes either a [Backup] or a [Snapshot].
@ -43,7 +43,7 @@ type export interface {
// MakeExport performs an export task as described by flags.
// Output is directed to the provided io.
func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, task ExportTask) (err error) {
func (exporter *Exporter) MakeExport(ctx context.Context, progress io.Writer, task ExportTask) (err error) {
// extract parameters
Title := "Backup"
Slug := ""
@ -53,7 +53,7 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
}
// determine target paths
logging.LogMessage(io, "Determining target paths")
logging.LogMessage(progress, "Determining target paths")
var stagingDir, archivePath string
if task.StagingOnly {
stagingDir = task.Dest
@ -69,11 +69,11 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
if !task.StagingOnly && archivePath == "" {
archivePath = exporter.NewArchivePath(Slug)
}
io.Printf("Staging Directory: %s\n", stagingDir)
io.Printf("Archive Path: %s\n", archivePath)
fmt.Fprintf(progress, "Staging Directory: %s\n", stagingDir)
fmt.Fprintf(progress, "Archive Path: %s\n", archivePath)
// create the staging directory
logging.LogMessage(io, "Creating staging directory")
logging.LogMessage(progress, "Creating staging directory")
err = exporter.Environment.Mkdir(stagingDir, environment.DefaultDirPerm)
if !environment.IsExist(err) && err != nil {
return err
@ -83,7 +83,7 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
// we need the staging directory to be deleted at the end
if !task.StagingOnly {
defer func() {
logging.LogMessage(io, "Removing staging directory")
logging.LogMessage(progress, "Removing staging directory")
exporter.Environment.RemoveAll(stagingDir)
}()
}
@ -96,11 +96,11 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
var sl export
if task.Instance == nil {
task.BackupDescription.Dest = stagingDir
backup := exporter.NewBackup(ctx, io, task.BackupDescription)
backup := exporter.NewBackup(ctx, progress, task.BackupDescription)
sl = &backup
} else {
task.SnapshotDescription.Dest = stagingDir
snapshot := exporter.NewSnapshot(ctx, task.Instance, io, task.SnapshotDescription)
snapshot := exporter.NewSnapshot(ctx, task.Instance, progress, task.SnapshotDescription)
sl = &snapshot
}
@ -109,7 +109,7 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
// find the report path
reportPath := filepath.Join(stagingDir, "report.txt")
io.Println(reportPath)
fmt.Fprintln(progress, reportPath)
// create the path
report, err := exporter.Environment.Create(reportPath, environment.DefaultFilePerm)
@ -122,28 +122,28 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
_, err := sl.Report(report)
return err
}
}, io, "Generating %s", Title)
}, progress, "Generating %s", Title)
// if we only requested staging
// all that is left is to write the log entry
if task.StagingOnly {
logging.LogMessage(io, "Writing Log Entry")
logging.LogMessage(progress, "Writing Log Entry")
// write out the log entry
entry.Path = stagingDir
entry.Packed = false
exporter.ExporterLogger.Add(ctx, entry)
io.Printf("Wrote %s\n", stagingDir)
fmt.Fprintf(progress, "Wrote %s\n", stagingDir)
return nil
}
// package everything up as an archive!
if err := logging.LogOperation(func() error {
var count int64
defer func() { io.Printf("Wrote %d byte(s) to %s\n", count, archivePath) }()
defer func() { fmt.Fprintf(progress, "Wrote %d byte(s) to %s\n", count, archivePath) }()
st := status.NewWithCompat(io.Stdout, 1)
st := status.NewWithCompat(progress, 1)
st.Start()
defer st.Stop()
@ -152,12 +152,12 @@ func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, ta
})
return err
}, io, "Writing archive"); err != nil {
}, progress, "Writing archive"); err != nil {
return err
}
// write out the log entry
logging.LogMessage(io, "Writing Log Entry")
logging.LogMessage(progress, "Writing Log Entry")
entry.Path = archivePath
entry.Packed = true
exporter.ExporterLogger.Add(ctx, entry)

View file

@ -2,10 +2,10 @@ package exporter
import (
"context"
"fmt"
"io"
"path/filepath"
"time"
"github.com/tkw1536/goprogram/stream"
)
// ShouldPrune determines if a file with the provided modification time should be
@ -15,7 +15,7 @@ func (exporter *Exporter) ShouldPrune(modtime time.Time) bool {
}
// Prune prunes all old exports
func (exporter *Exporter) PruneExports(ctx context.Context, io stream.IOStream) error {
func (exporter *Exporter) PruneExports(ctx context.Context, progress io.Writer) error {
sPath := exporter.ArchivePath()
// list all the files
@ -43,7 +43,7 @@ func (exporter *Exporter) PruneExports(ctx context.Context, io stream.IOStream)
// assemble path, and then remove the file!
path := filepath.Join(sPath, entry.Name())
io.Printf("Removing %s cause it is older than %d days\n", path, exporter.Config.MaxBackupAge)
fmt.Fprintf(progress, "Removing %s cause it is older than %d days\n", path, exporter.Config.MaxBackupAge)
if err := exporter.Still.Environment.Remove(path); err != nil {
return err

View file

@ -14,7 +14,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/tkw1536/goprogram/lib/collection"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
"golang.org/x/exp/slices"
)
@ -45,20 +44,20 @@ type Snapshot struct {
}
// Snapshot creates a new snapshot of this instance into dest
func (snapshots *Exporter) NewSnapshot(ctx context.Context, instance *wisski.WissKI, io stream.IOStream, desc SnapshotDescription) (snapshot Snapshot) {
func (snapshots *Exporter) NewSnapshot(ctx context.Context, instance *wisski.WissKI, progress io.Writer, desc SnapshotDescription) (snapshot Snapshot) {
logging.LogMessage(io, "Locking instance")
logging.LogMessage(progress, "Locking instance")
if !instance.Locker().TryLock(ctx) {
err := locker.Locked
io.EPrintln(err)
logging.LogMessage(io, "Aborting snapshot creation")
fmt.Fprintln(progress, err)
logging.LogMessage(progress, "Aborting snapshot creation")
return Snapshot{
ErrPanic: err,
}
}
defer func() {
logging.LogMessage(io, "Unlocking instance")
logging.LogMessage(progress, "Unlocking instance")
instance.Locker().Unlock(ctx)
}()
@ -75,27 +74,27 @@ func (snapshots *Exporter) NewSnapshot(ctx context.Context, instance *wisski.Wis
logging.LogOperation(func() error {
snapshot.StartTime = time.Now().UTC()
snapshot.ErrWhitebox = snapshot.makeParts(ctx, io, snapshots, instance, false)
snapshot.ErrBlackbox = snapshot.makeParts(ctx, io, snapshots, instance, true)
snapshot.ErrWhitebox = snapshot.makeParts(ctx, progress, snapshots, instance, false)
snapshot.ErrBlackbox = snapshot.makeParts(ctx, progress, snapshots, instance, true)
snapshot.EndTime = time.Now().UTC()
return nil
}, io, "Writing snapshot files")
}, progress, "Writing snapshot files")
slices.Sort(snapshot.Manifest)
return
}
func (snapshot *Snapshot) makeParts(ctx context.Context, ios stream.IOStream, snapshots *Exporter, instance *wisski.WissKI, needsRunning bool) map[string]error {
func (snapshot *Snapshot) makeParts(ctx context.Context, progress io.Writer, snapshots *Exporter, instance *wisski.WissKI, needsRunning bool) map[string]error {
if !needsRunning && !snapshot.Description.Keepalive {
stack := instance.Barrel().Stack()
logging.LogMessage(ios, "Stopping instance")
snapshot.ErrStop = stack.Down(ctx, ios)
logging.LogMessage(progress, "Stopping instance")
snapshot.ErrStop = stack.Down(ctx, progress)
defer func() {
logging.LogMessage(ios, "Starting instance")
snapshot.ErrStart = stack.Up(ctx, ios)
logging.LogMessage(progress, "Starting instance")
snapshot.ErrStart = stack.Up(ctx, progress)
}()
}
// handle writing the manifest!
@ -103,7 +102,7 @@ func (snapshot *Snapshot) makeParts(ctx context.Context, ios stream.IOStream, sn
defer done()
// create a new status
st := status.NewWithCompat(ios.Stdout, 0)
st := status.NewWithCompat(progress, 0)
st.Start()
defer st.Stop()
@ -126,7 +125,7 @@ func (snapshot *Snapshot) makeParts(ctx context.Context, ios stream.IOStream, sn
component.NewStagingContext(
ctx,
snapshots.Environment,
stream.NewIOStream(writer, writer, nil, 0),
writer,
filepath.Join(snapshot.Description.Dest, sc.SnapshotName()),
manifest,
),

View file

@ -2,9 +2,9 @@ package component
import (
"context"
"io"
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/tkw1536/goprogram/stream"
)
// Installable implements an installable component.
@ -39,8 +39,8 @@ type Updatable interface {
// Update updates or initializes the provided components.
// It is called after the component has been installed (if applicable).
//
// It may send output to the provided stream.
// It may send progress to the provided stream.
//
// Updating should be idempotent, meaning running it multiple times must not break the existing system.
Update(ctx context.Context, stream stream.IOStream) error
Update(ctx context.Context, progress io.Writer) error
}

View file

@ -3,10 +3,11 @@ package instances
import (
"context"
"embed"
"fmt"
"io"
"github.com/FAU-CDI/wisski-distillery/pkg/unpack"
"github.com/tkw1536/goprogram/exit"
"github.com/tkw1536/goprogram/stream"
)
var errBootstrapFailedRuntime = exit.Error{
@ -20,9 +21,9 @@ var errBootstrapFailedRuntime = exit.Error{
var runtimeResources embed.FS
// Update installs or updates runtime components needed by this component.
func (instances *Instances) Update(ctx context.Context, stream stream.IOStream) error {
func (instances *Instances) Update(ctx context.Context, progress io.Writer) error {
err := unpack.InstallDir(instances.Still.Environment, instances.Config.RuntimeDir(), "runtime", runtimeResources, func(dst, src string) {
stream.Printf("[copy] %s\n", dst)
fmt.Fprintf(progress, "[copy] %s\n", dst)
})
if err != nil {
return errBootstrapFailedRuntime.Wrap(err)

View file

@ -2,17 +2,18 @@ package resolver
import (
"context"
"fmt"
"io"
"time"
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
"github.com/tkw1536/goprogram/stream"
)
// updatePrefixes starts updating prefixes
func (resolver *Resolver) updatePrefixes(ctx context.Context, io stream.IOStream) {
func (resolver *Resolver) updatePrefixes(ctx context.Context, progress io.Writer) {
go func() {
for t := range timex.TickContext(ctx, resolver.RefreshInterval) {
io.Printf("[%s]: reloading prefixes\n", t.Format(time.Stamp))
fmt.Fprintf(progress, "[%s]: reloading prefixes\n", t.Format(time.Stamp))
err := (func() (err error) {
ctx, cancel := context.WithTimeout(ctx, resolver.RefreshInterval)
@ -27,7 +28,7 @@ func (resolver *Resolver) updatePrefixes(ctx context.Context, io stream.IOStream
return nil
})()
if err != nil {
io.EPrintf("error reloading prefixes: ", err.Error())
fmt.Fprintf(progress, "error reloading prefixes: %s", err.Error())
}
}
}()

View file

@ -3,6 +3,7 @@ package resolver
import (
"context"
"fmt"
"io"
"net/http"
"regexp"
"time"
@ -12,7 +13,6 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/instances"
"github.com/FAU-CDI/wisski-distillery/pkg/lazy"
"github.com/tkw1536/goprogram/stream"
)
type Resolver struct {
@ -32,7 +32,7 @@ var (
func (resolver *Resolver) Routes() []string { return []string{"/go/", "/wisski/get/"} }
func (resolver *Resolver) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
func (resolver *Resolver) Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error) {
var err error
return resolver.handler.Get(func() (p wdresolve.ResolveHandler) {
p.TrustXForwardedProto = true
@ -45,17 +45,17 @@ func (resolver *Resolver) Handler(ctx context.Context, route string, io stream.I
domainName := resolver.Config.DefaultDomain
if domainName != "" {
fallback.Data[fmt.Sprintf("^https?://(.*)\\.%s", regexp.QuoteMeta(domainName))] = fmt.Sprintf("https://$1.%s", domainName)
io.Printf("registering default domain %s\n", domainName)
fmt.Fprintf(progress, "registering default domain %s\n", domainName)
}
// handle the extra domains!
for _, domain := range resolver.Config.SelfExtraDomains {
fallback.Data[fmt.Sprintf("^https?://(.*)\\.%s", regexp.QuoteMeta(domain))] = fmt.Sprintf("https://$1.%s", domainName)
io.Printf("registering legacy domain %s\n", domain)
fmt.Fprintf(progress, "registering legacy domain %s\n", domain)
}
// start updating prefixes
resolver.updatePrefixes(ctx, io)
resolver.updatePrefixes(ctx, progress)
// resolve the prefixes
p.Resolver = resolvers.InOrder{

View file

@ -2,9 +2,8 @@ package component
import (
"context"
"io"
"net/http"
"github.com/tkw1536/goprogram/stream"
)
// Servable is a component that is servable
@ -15,5 +14,5 @@ type Servable interface {
Routes() []string
// Handler returns the handler for the requested route
Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error)
Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error)
}

View file

@ -6,6 +6,7 @@ import (
"io"
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
"github.com/tkw1536/goprogram/stream"
)
var errSQLBackup = errors.New("SQLBackup: Mysqldump returned non-zero exit code")
@ -17,8 +18,7 @@ func (*SQL) BackupName() string {
// Backup makes a backup of all SQL databases into the path dest.
func (sql *SQL) Backup(scontext component.StagingContext) error {
return scontext.AddFile("", func(ctx context.Context, file io.Writer) error {
io := scontext.IO().Streams(file, nil, nil, 0).NonInteractive()
code, err := sql.Stack(sql.Environment).Exec(ctx, io, "sql", "mysqldump", "--all-databases")
code, err := sql.Stack(sql.Environment).Exec(ctx, stream.NonInteractive(scontext.Progress()), "sql", "mysqldump", "--all-databases")
if err != nil {
return err
}

View file

@ -16,16 +16,14 @@ func (*SQL) SnapshotName() string { return "sql" }
func (sql *SQL) Snapshot(wisski models.Instance, scontext component.StagingContext) error {
return scontext.AddDirectory(".", func(ctx context.Context) error {
return scontext.AddFile(wisski.SqlDatabase+".sql", func(ctx context.Context, file io.Writer) error {
return sql.SnapshotDB(ctx, scontext.IO(), file, wisski.SqlDatabase)
return sql.SnapshotDB(ctx, scontext.Progress(), file, wisski.SqlDatabase)
})
})
}
// SnapshotDB makes a backup of the sql database into dest.
func (sql *SQL) SnapshotDB(ctx context.Context, io stream.IOStream, dest io.Writer, database string) error {
io = io.Streams(dest, nil, nil, 0).NonInteractive()
code, err := sql.Stack(sql.Environment).Exec(ctx, io, "sql", "mysqldump", "--databases", database)
func (sql *SQL) SnapshotDB(ctx context.Context, progress io.Writer, dest io.Writer, database string) error {
code, err := sql.Stack(sql.Environment).Exec(ctx, stream.NonInteractive(progress), "sql", "mysqldump", "--databases", database)
if err != nil {
return err
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/FAU-CDI/wisski-distillery/internal/models"
@ -44,14 +45,14 @@ var errSQLUnableToMigrate = exit.Error{
}
// Update initializes or updates the SQL database.
func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
func (sql *SQL) Update(ctx context.Context, progress io.Writer) error {
// unsafely create the admin user!
{
if err := sql.unsafeWaitShell(ctx); err != nil {
return err
}
logging.LogMessage(io, "Creating administrative user")
logging.LogMessage(progress, "Creating administrative user")
{
username := sql.Config.MysqlAdminUser
password := sql.Config.MysqlAdminPassword
@ -62,7 +63,7 @@ func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
}
// create the admin user
logging.LogMessage(io, "Creating sql database")
logging.LogMessage(progress, "Creating sql database")
{
if !sqle.IsSafeDatabaseLiteral(sql.Config.DistilleryDatabase) {
return errSQLUnsafeDatabaseName
@ -74,7 +75,7 @@ func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
}
// wait for the database to come up
logging.LogMessage(io, "Waiting for database update to be complete")
logging.LogMessage(progress, "Waiting for database update to be complete")
sql.WaitQueryTable(ctx)
tables := []struct {
@ -107,7 +108,7 @@ func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
// migrate all of the tables!
return logging.LogOperation(func() error {
for _, table := range tables {
logging.LogMessage(io, "migrating %q table", table.name)
logging.LogMessage(progress, "migrating %q table", table.name)
db, err := sql.QueryTable(ctx, false, table.table)
if err != nil {
return errSQLUnableToMigrate.WithMessageF(table.name, "unable to access table")
@ -118,5 +119,5 @@ func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
}
}
return nil
}, io, "migrating database tables")
}, progress, "migrating database tables")
}

View file

@ -2,9 +2,9 @@ package ssh2
import (
"context"
"io"
"github.com/gliderlabs/ssh"
"github.com/tkw1536/goprogram/stream"
)
const (
@ -13,10 +13,10 @@ const (
)
// Server returns an ssh server that implements the main ssh server
func (ssh2 *SSH2) Server(context context.Context, privateKeyPath string, io stream.IOStream) (*ssh.Server, error) {
func (ssh2 *SSH2) Server(context context.Context, privateKeyPath string, progress io.Writer) (*ssh.Server, error) {
var server ssh.Server
if err := ssh2.setupHostKeys(io, privateKeyPath, &server); err != nil {
if err := ssh2.setupHostKeys(progress, privateKeyPath, &server); err != nil {
return nil, err
}

View file

@ -6,32 +6,32 @@ import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/gliderlabs/ssh"
"github.com/pkg/errors"
"github.com/tkw1536/goprogram/stream"
gossh "golang.org/x/crypto/ssh"
)
func (ssh2 *SSH2) setupHostKeys(io stream.IOStream, privateKeyPath string, server *ssh.Server) error {
return ssh2.UseOrMakeHostKeys(io, server, privateKeyPath, nil)
func (ssh2 *SSH2) setupHostKeys(progress io.Writer, privateKeyPath string, server *ssh.Server) error {
return ssh2.UseOrMakeHostKeys(progress, server, privateKeyPath, nil)
}
// UseOrMakeHostKeys is like UseOrMakeHostKey except that it accepts multiple HostKeyAlgorithms.
// For each key algorithm, the privateKeyPath is appended with "_" + the name of the algorithm in question.
//
// When algorithms is nil, picks a reasonable set of default algorithms.
func (ssh2 *SSH2) UseOrMakeHostKeys(io stream.IOStream, server *ssh.Server, privateKeyPath string, algorithms []HostKeyAlgorithm) error {
func (ssh2 *SSH2) UseOrMakeHostKeys(progress io.Writer, server *ssh.Server, privateKeyPath string, algorithms []HostKeyAlgorithm) error {
if algorithms == nil {
algorithms = []HostKeyAlgorithm{RSAAlgorithm, ED25519Algorithm}
}
for _, algorithm := range algorithms {
path := privateKeyPath + "_" + string(algorithm)
if err := ssh2.UseOrMakeHostKey(io, server, path, algorithm); err != nil {
if err := ssh2.UseOrMakeHostKey(progress, server, path, algorithm); err != nil {
return err
}
}
@ -44,8 +44,8 @@ func (ssh2 *SSH2) UseOrMakeHostKeys(io stream.IOStream, server *ssh.Server, priv
//
// All parameters except the server are passed to ReadOrMakeHostKey.
// Please see the appropriate documentation for that function.
func (ssh2 *SSH2) UseOrMakeHostKey(io stream.IOStream, server *ssh.Server, privateKeyPath string, algorithm HostKeyAlgorithm) error {
key, err := ssh2.ReadOrMakeHostKey(io, privateKeyPath, algorithm)
func (ssh2 *SSH2) UseOrMakeHostKey(progress io.Writer, server *ssh.Server, privateKeyPath string, algorithm HostKeyAlgorithm) error {
key, err := ssh2.ReadOrMakeHostKey(progress, privateKeyPath, algorithm)
if err != nil {
return err
}
@ -60,17 +60,17 @@ func (ssh2 *SSH2) UseOrMakeHostKey(io stream.IOStream, server *ssh.Server, priva
//
// This function assumes that if there is a host key in privateKeyPath it uses the provided HostKeyAlgorithm.
// It makes no attempt at verifiying this; the key mail fail to load and return an error, or it may load incorrect data.
func (ssh2 *SSH2) ReadOrMakeHostKey(io stream.IOStream, privateKeyPath string, algorithm HostKeyAlgorithm) (key gossh.Signer, err error) {
func (ssh2 *SSH2) ReadOrMakeHostKey(progress io.Writer, privateKeyPath string, algorithm HostKeyAlgorithm) (key gossh.Signer, err error) {
hostKey := NewHostKey(algorithm)
if _, e := ssh2.Environment.Lstat(privateKeyPath); environment.IsNotExist(e) { // path doesn't exist => generate a new key there!
err = ssh2.makeHostKey(io, hostKey, privateKeyPath)
err = ssh2.makeHostKey(progress, hostKey, privateKeyPath)
if err != nil {
err = errors.Wrap(err, "Unable to generate new host key")
return
}
}
err = ssh2.loadHostKey(io, hostKey, privateKeyPath)
err = ssh2.loadHostKey(progress, hostKey, privateKeyPath)
if err != nil {
return nil, err
}
@ -78,8 +78,8 @@ func (ssh2 *SSH2) ReadOrMakeHostKey(io stream.IOStream, privateKeyPath string, a
}
// loadHostKey loadsa host key
func (ssh2 *SSH2) loadHostKey(io stream.IOStream, key HostKey, path string) (err error) {
io.EPrintf("Loading hostkey (algorithm %s) from %q", key.Algorithm(), path)
func (ssh2 *SSH2) loadHostKey(progress io.Writer, key HostKey, path string) (err error) {
fmt.Fprintf(progress, "Loading hostkey (algorithm %s) from %q", key.Algorithm(), path)
// read all the bytes from the file
privateKeyBytes, err := environment.ReadFile(ssh2.Environment, path)
@ -104,8 +104,8 @@ func (ssh2 *SSH2) loadHostKey(io stream.IOStream, key HostKey, path string) (err
}
// makeHostKey makes a new host key
func (ssh2 *SSH2) makeHostKey(io stream.IOStream, key HostKey, path string) error {
io.EPrintf("Writing hostkey (algorithm %s) to %q", key.Algorithm(), path)
func (ssh2 *SSH2) makeHostKey(progress io.Writer, key HostKey, path string) error {
fmt.Fprintf(progress, "Writing hostkey (algorithm %s) to %q", key.Algorithm(), path)
if err := key.Generate(0, nil); err != nil {
return errors.Wrap(err, "Failed to generate key")

View file

@ -5,6 +5,8 @@ import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"io/fs"
"path/filepath"
@ -34,9 +36,9 @@ var errStackUpdateBuild = errors.New("Stack.Update: Build returned non-zero exit
// This does not have a direct 'docker compose' shell equivalent.
//
// See also Up.
func (ds Stack) Update(ctx context.Context, io stream.IOStream, start bool) error {
func (ds Stack) Update(ctx context.Context, progress io.Writer, start bool) error {
{
code, err := ds.compose(ctx, io, "pull")
code, err := ds.compose(ctx, stream.NonInteractive(progress), "pull")
if err != nil {
return err
}
@ -46,7 +48,7 @@ func (ds Stack) Update(ctx context.Context, io stream.IOStream, start bool) erro
}
{
code, err := ds.compose(ctx, io, "build", "--pull")
code, err := ds.compose(ctx, stream.NonInteractive(progress), "build", "--pull")
if err != nil {
return err
}
@ -55,7 +57,7 @@ func (ds Stack) Update(ctx context.Context, io stream.IOStream, start bool) erro
}
}
if start {
return ds.Up(ctx, io)
return ds.Up(ctx, progress)
}
return nil
}
@ -64,8 +66,8 @@ var errStackUp = errors.New("Stack.Up: Up returned non-zero exit code")
// Up creates and starts the containers in this Stack.
// It is equivalent to 'docker compose up --remove-orphans --detach' on the shell.
func (ds Stack) Up(ctx context.Context, io stream.IOStream) error {
code, err := ds.compose(ctx, io, "up", "--remove-orphans", "--detach")
func (ds Stack) Up(ctx context.Context, progress io.Writer) error {
code, err := ds.compose(ctx, stream.NonInteractive(progress), "up", "--remove-orphans", "--detach")
if err != nil {
return err
}
@ -116,8 +118,8 @@ var errStackRestart = errors.New("Stack.Restart: Restart returned non-zero exit
// Restart restarts all containers in this Stack.
// It is equivalent to 'docker compose restart' on the shell.
func (ds Stack) Restart(ctx context.Context, io stream.IOStream) error {
code, err := ds.compose(ctx, io, "restart")
func (ds Stack) Restart(ctx context.Context, progress io.Writer) error {
code, err := ds.compose(ctx, stream.NonInteractive(progress), "restart")
if err != nil {
return err
}
@ -130,12 +132,12 @@ func (ds Stack) Restart(ctx context.Context, io stream.IOStream) error {
var errStackPs = errors.New("Stack.Ps: Down returned non-zero exit code")
// Ps returns the ids of the containers currently running
func (ds Stack) Ps(ctx context.Context, io stream.IOStream) ([]string, error) {
func (ds Stack) Ps(ctx context.Context, progress io.Writer) ([]string, error) {
// create a buffer
var buffer bytes.Buffer
// read the ids from the command!
code, err := ds.compose(ctx, io.Streams(&buffer, nil, nil, 0), "ps", "-q")
code, err := ds.compose(ctx, stream.NewIOStream(&buffer, progress, nil, 0), "ps", "-q")
if err != nil {
return nil, err
}
@ -163,8 +165,8 @@ var errStackDown = errors.New("Stack.Down: Down returned non-zero exit code")
// Down stops and removes all containers in this Stack.
// It is equivalent to 'docker compose down -v' on the shell.
func (ds Stack) Down(ctx context.Context, io stream.IOStream) error {
code, err := ds.compose(ctx, io, "down", "-v")
func (ds Stack) Down(ctx context.Context, progress io.Writer) error {
code, err := ds.compose(ctx, stream.NonInteractive(progress), "down", "-v")
if err != nil {
return err
}
@ -219,7 +221,7 @@ type InstallationContext map[string]string
//
// Installation is non-interactive, but will provide debugging output onto io.
// InstallationContext
func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, context InstallationContext) error {
func (is StackWithResources) Install(ctx context.Context, progress io.Writer, context InstallationContext) error {
env := is.Stack.Env
if is.ContextPath != "" {
// setup the base files
@ -229,7 +231,7 @@ func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, co
is.ContextPath,
is.Resources,
func(dst, src string) {
io.Printf("[install] %s\n", dst)
fmt.Fprintf(progress, "[install] %s\n", dst)
},
); err != nil {
return err
@ -239,7 +241,7 @@ func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, co
// configure .env
envDest := filepath.Join(is.Dir, ".env")
if is.EnvPath != "" && is.EnvContext != nil {
io.Printf("[config] %s\n", envDest)
fmt.Fprintf(progress, "[config] %s\n", envDest)
if err := unpack.InstallTemplate(
env,
envDest,
@ -256,7 +258,7 @@ func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, co
// find the destination!
dst := filepath.Join(is.Dir, name)
io.Printf("[make] %s\n", dst)
fmt.Fprintf(progress, "[make] %s\n", dst)
if is.MakeDirsPerm == fs.FileMode(0) {
is.MakeDirsPerm = environment.DefaultDirPerm
}
@ -277,7 +279,7 @@ func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, co
dst := filepath.Join(is.Dir, name)
// copy over file from context
io.Printf("[copy] %s (from %s)\n", dst, src)
fmt.Fprintf(progress, "[copy] %s (from %s)\n", dst, src)
if err := fsx.CopyFile(ctx, env, dst, src); err != nil {
return errors.Wrapf(err, "Unable to copy file %s", src)
}
@ -288,7 +290,7 @@ func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, co
// find the destination!
dst := filepath.Join(is.Dir, name)
io.Printf("[touch] %s\n", dst)
fmt.Fprintf(progress, "[touch] %s\n", dst)
if err := fsx.Touch(env, dst, is.TouchFilesPerm); err != nil {
return err
}

View file

@ -3,22 +3,22 @@ package triplestore
import (
"context"
"fmt"
"io"
"net/http"
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/pkg/errors"
"github.com/tkw1536/goprogram/stream"
)
var errTriplestoreFailedSecurity = errors.New("failed to enable triplestore security: request did not succeed with HTTP 200 OK")
func (ts Triplestore) Update(ctx context.Context, io stream.IOStream) error {
logging.LogMessage(io, "Waiting for Triplestore")
func (ts Triplestore) Update(ctx context.Context, progress io.Writer) error {
logging.LogMessage(progress, "Waiting for Triplestore")
if err := ts.Wait(ctx); err != nil {
return err
}
logging.LogMessage(io, "Resetting admin user password")
logging.LogMessage(progress, "Resetting admin user password")
{
res, err := ts.OpenRaw(ctx, "PUT", "/rest/security/users/"+ts.Config.TriplestoreAdminUser, TriplestoreUserPayload{
Password: ts.Config.TriplestoreAdminPassword,
@ -43,14 +43,14 @@ func (ts Triplestore) Update(ctx context.Context, io stream.IOStream) error {
case http.StatusUnauthorized:
// a password is needed => security is already enabled.
// the password may or may not work, but that's a problem for later
logging.LogMessage(io, "Security is already enabled")
logging.LogMessage(progress, "Security is already enabled")
return nil
default:
return fmt.Errorf("failed to create triplestore user: %s", err)
}
}
logging.LogMessage(io, "Enabling Triplestore security")
logging.LogMessage(progress, "Enabling Triplestore security")
{
res, err := ts.OpenRaw(ctx, "POST", "/rest/security", true, "", "")
if err != nil {