Add context
This commit adds and passes context around to (almost) every function. This allows cancelling (almost) every function call globally.
This commit is contained in:
parent
996ecb9f80
commit
3455f491ca
104 changed files with 836 additions and 511 deletions
|
|
@ -1,6 +1,7 @@
|
|||
package component
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
|
|
@ -45,7 +46,7 @@ type StagingContext interface {
|
|||
// Passing the empty path creates the destination as a directory.
|
||||
//
|
||||
// It then allows op to fill the file.
|
||||
AddDirectory(path string, op func() error) error
|
||||
AddDirectory(path string, op func(context.Context) error) error
|
||||
|
||||
// CopyFile copies a file from src to dst.
|
||||
CopyFile(dst, src string) error
|
||||
|
|
@ -61,12 +62,13 @@ type StagingContext interface {
|
|||
// The op function must not retain file.
|
||||
// The underlying file does not need to be closed.
|
||||
// AddFile will not return before op has returned.
|
||||
AddFile(path string, op func(file io.Writer) error) error
|
||||
AddFile(path string, op func(ctx context.Context, file io.Writer) error) error
|
||||
}
|
||||
|
||||
// NewStagingContext returns a new [StagingContext]
|
||||
func NewStagingContext(env environment.Environment, io stream.IOStream, path string, manifest chan<- string) StagingContext {
|
||||
func NewStagingContext(ctx context.Context, env environment.Environment, io stream.IOStream, path string, manifest chan<- string) StagingContext {
|
||||
return &stagingContext{
|
||||
ctx: ctx,
|
||||
env: env,
|
||||
io: io,
|
||||
path: path,
|
||||
|
|
@ -76,6 +78,7 @@ func NewStagingContext(env environment.Environment, io stream.IOStream, path str
|
|||
|
||||
// stagingContext implements [components.StagingContext]
|
||||
type stagingContext struct {
|
||||
ctx context.Context
|
||||
env environment.Environment // environment
|
||||
io stream.IOStream // context the files are sent to
|
||||
path string // path to send files to
|
||||
|
|
@ -110,7 +113,12 @@ func (bc *stagingContext) resolve(path string) (dest string, err error) {
|
|||
return filepath.Join(bc.path, path), nil
|
||||
}
|
||||
|
||||
func (sc *stagingContext) AddDirectory(path string, op func() error) error {
|
||||
func (sc *stagingContext) AddDirectory(path string, op func(context.Context) error) error {
|
||||
// check if we are already done
|
||||
if err, ok := sc.ctxdone(); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
// resolve the path!
|
||||
dst, err := sc.resolve(path)
|
||||
if err != nil {
|
||||
|
|
@ -126,30 +134,43 @@ func (sc *stagingContext) AddDirectory(path string, op func() error) error {
|
|||
sc.sendPath(path)
|
||||
|
||||
// and run the files!
|
||||
return op()
|
||||
return op(sc.ctx)
|
||||
}
|
||||
|
||||
func (sc *stagingContext) CopyFile(dst, src string) error {
|
||||
if err, ok := sc.ctxdone(); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
dstPath, err := sc.resolve(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sc.sendPath(dst)
|
||||
return fsx.CopyFile(sc.env, dstPath, src)
|
||||
return fsx.CopyFile(sc.ctx, sc.env, dstPath, src)
|
||||
}
|
||||
|
||||
func (sc *stagingContext) CopyDirectory(dst, src string) error {
|
||||
if err, ok := sc.ctxdone(); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
dstPath, err := sc.resolve(dst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return fsx.CopyDirectory(sc.env, dstPath, src, func(dst, src string) {
|
||||
return fsx.CopyDirectory(sc.ctx, sc.env, dstPath, src, func(dst, src string) {
|
||||
sc.sendPath(dst)
|
||||
})
|
||||
}
|
||||
|
||||
func (sc *stagingContext) AddFile(path string, op func(file io.Writer) error) error {
|
||||
func (sc *stagingContext) AddFile(path string, op func(ctx context.Context, file io.Writer) error) error {
|
||||
// check if we're already done
|
||||
if err, ok := sc.ctxdone(); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
// resolve the path!
|
||||
dst, err := sc.resolve(path)
|
||||
if err != nil {
|
||||
|
|
@ -167,5 +188,11 @@ func (sc *stagingContext) AddFile(path string, op func(file io.Writer) error) er
|
|||
sc.sendPath(path)
|
||||
|
||||
// and do whatever they wanted to do
|
||||
return op(file)
|
||||
return op(sc.ctx, file)
|
||||
}
|
||||
|
||||
func (sc *stagingContext) ctxdone() (err error, done bool) {
|
||||
err = sc.ctx.Err()
|
||||
done = (err != nil)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,10 +26,10 @@ type Home struct {
|
|||
|
||||
func (*Home) Routes() []string { return []string{"/"} }
|
||||
|
||||
func (home *Home) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
|
||||
home.updateRedirect(context, io)
|
||||
home.updateInstances(context, io)
|
||||
home.updateRender(context, io)
|
||||
func (home *Home) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
|
||||
home.updateRedirect(ctx, io)
|
||||
home.updateInstances(ctx, io)
|
||||
home.updateRender(ctx, io)
|
||||
return home, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,14 +19,27 @@ func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) {
|
|||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
io.Printf("[%s]: reloading instance list\n", t.Format(time.Stamp))
|
||||
|
||||
names, _ := home.instanceMap()
|
||||
home.instanceNames.Set(names)
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
names, err := home.instanceMap(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.instanceNames.Set(names)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
io.EPrintf("error reloading instances: ", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (home *Home) instanceMap() (map[string]struct{}, error) {
|
||||
wissKIs, err := home.Instances.All()
|
||||
func (home *Home) instanceMap(ctx context.Context) (map[string]struct{}, error) {
|
||||
wissKIs, err := home.Instances.All(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -41,10 +54,23 @@ func (home *Home) instanceMap() (map[string]struct{}, error) {
|
|||
func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
|
||||
go func() {
|
||||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
io.Printf("[%s]: reloading home render\n", t.Format(time.Stamp))
|
||||
io.Printf("[%s]: reloading home render list\n", t.Format(time.Stamp))
|
||||
|
||||
bytes, _ := home.homeRender()
|
||||
home.homeBytes.Set(bytes)
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
bytes, err := home.homeRender(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.homeBytes.Set(bytes)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
io.EPrintf("error reloading instances: ", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -53,7 +79,7 @@ func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
|
|||
var homeHTMLStr string
|
||||
var homeTemplate = static.AssetsHomeHome.MustParseShared("home.html", homeHTMLStr)
|
||||
|
||||
func (home *Home) homeRender() ([]byte, error) {
|
||||
func (home *Home) homeRender(ctx context.Context) ([]byte, error) {
|
||||
var context HomeContext
|
||||
|
||||
// setup a couple of static things
|
||||
|
|
@ -61,7 +87,7 @@ func (home *Home) homeRender() ([]byte, error) {
|
|||
context.SelfRedirect = home.Config.SelfRedirect.String()
|
||||
|
||||
// find all the WissKIs
|
||||
wissKIs, err := home.Instances.All()
|
||||
wissKIs, err := home.Instances.All(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -73,7 +99,7 @@ func (home *Home) homeRender() ([]byte, error) {
|
|||
i := i
|
||||
wissKI := instance
|
||||
eg.Go(func() (err error) {
|
||||
context.Instances[i], err = wissKI.Info().Information(false)
|
||||
context.Instances[i], err = wissKI.Info().Information(ctx, false)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,13 +16,27 @@ func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) {
|
|||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
io.Printf("[%s]: reloading overrides\n", t.Format(time.Stamp))
|
||||
|
||||
redirect, _ := home.loadRedirect()
|
||||
home.redirect.Set(&redirect)
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
redirect, err := home.loadRedirect(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.redirect.Set(&redirect)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
io.EPrintf("error reloading overrides: ", err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (home *Home) loadRedirect() (redirect Redirect, err error) {
|
||||
func (home *Home) loadRedirect(ctx context.Context) (redirect Redirect, err error) {
|
||||
if redirect.Overrides == nil {
|
||||
redirect.Overrides = make(map[string]string)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ func (info *Info) ingredients(r *http.Request) (cp ingredientsContext, err error
|
|||
cp.Time = time.Now().UTC()
|
||||
|
||||
// find the instance itself!
|
||||
instance, err := info.Instances.WissKI(mux.Vars(r)["slug"])
|
||||
instance, err := info.Instances.WissKI(r.Context(), mux.Vars(r)["slug"])
|
||||
if err == instances.ErrWissKINotFound {
|
||||
return cp, httpx.ErrNotFound
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package info
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
|
@ -25,18 +26,18 @@ type indexContext struct {
|
|||
}
|
||||
|
||||
func (info *Info) index(r *http.Request) (idx indexContext, err error) {
|
||||
idx.Distillery, idx.Instances, err = info.Status(true)
|
||||
idx.Distillery, idx.Instances, err = info.Status(r.Context(), true)
|
||||
return
|
||||
}
|
||||
|
||||
// Status produces a new observation of the distillery, and a new information of all instances
|
||||
// The information on all instances is passed the given quick flag.
|
||||
func (info *Info) Status(QuickInformation bool) (target status.Distillery, information []status.WissKI, err error) {
|
||||
func (info *Info) Status(ctx context.Context, QuickInformation bool) (target status.Distillery, information []status.WissKI, err error) {
|
||||
var group errgroup.Group
|
||||
|
||||
group.Go(func() error {
|
||||
// list all the instances
|
||||
all, err := info.Instances.All()
|
||||
all, err := info.Instances.All(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -50,7 +51,7 @@ func (info *Info) Status(QuickInformation bool) (target status.Distillery, infor
|
|||
|
||||
// store the info for this group!
|
||||
group.Go(func() (err error) {
|
||||
information[i], err = instance.Info().Information(true)
|
||||
information[i], err = instance.Info().Information(ctx, true)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
|
@ -59,7 +60,9 @@ func (info *Info) Status(QuickInformation bool) (target status.Distillery, infor
|
|||
})
|
||||
|
||||
// gather all the observations
|
||||
var flags component.FetcherFlags
|
||||
flags := component.FetcherFlags{
|
||||
Context: ctx,
|
||||
}
|
||||
for _, o := range info.Fetchers {
|
||||
o := o
|
||||
group.Go(func() error {
|
||||
|
|
|
|||
|
|
@ -28,11 +28,11 @@ type Info struct {
|
|||
|
||||
func (*Info) Routes() []string { return []string{"/dis/"} }
|
||||
|
||||
func (info *Info) Handler(route string, context context.Context, io stream.IOStream) (handler http.Handler, err error) {
|
||||
func (info *Info) Handler(ctx context.Context, route string, io stream.IOStream) (handler http.Handler, err error) {
|
||||
router := mux.NewRouter()
|
||||
{
|
||||
socket := &httpx.WebSocket{
|
||||
Context: context,
|
||||
Context: ctx,
|
||||
Fallback: router,
|
||||
Handler: info.serveSocket,
|
||||
}
|
||||
|
|
@ -82,12 +82,12 @@ func (info *Info) Handler(route string, context context.Context, io stream.IOStr
|
|||
}
|
||||
|
||||
// get the instance
|
||||
instance, err := info.Instances.WissKI(r.PostFormValue("slug"))
|
||||
instance, err := info.Instances.WissKI(r.Context(), r.PostFormValue("slug"))
|
||||
if err != nil {
|
||||
return "", httpx.ErrNotFound
|
||||
}
|
||||
|
||||
target, err := instance.Users().Login(nil, r.PostFormValue("user"))
|
||||
target, err := instance.Users().Login(r.Context(), nil, r.PostFormValue("user"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ type instanceContext struct {
|
|||
|
||||
func (info *Info) instance(r *http.Request) (is instanceContext, err error) {
|
||||
// find the instance itself!
|
||||
instance, err := info.Instances.WissKI(mux.Vars(r)["slug"])
|
||||
instance, err := info.Instances.WissKI(r.Context(), mux.Vars(r)["slug"])
|
||||
if err == instances.ErrWissKINotFound {
|
||||
return is, httpx.ErrNotFound
|
||||
}
|
||||
|
|
@ -39,7 +39,7 @@ func (info *Info) instance(r *http.Request) (is instanceContext, err error) {
|
|||
is.Instance = instance.Instance
|
||||
|
||||
// get some more info about the wisski
|
||||
is.Info, err = instance.Info().Information(false)
|
||||
is.Info, err = instance.Info().Information(r.Context(), false)
|
||||
if err != nil {
|
||||
return is, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package info
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
|
|
@ -14,14 +15,15 @@ import (
|
|||
type InstanceAction struct {
|
||||
NumParams int
|
||||
|
||||
HandleInteractive func(info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error
|
||||
HandleResult func(info *Info, instance *wisski.WissKI, params ...string) (value any, err error)
|
||||
HandleInteractive func(ctx context.Context, info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error
|
||||
HandleResult func(ctx context.Context, info *Info, instance *wisski.WissKI, params ...string) (value any, err error)
|
||||
}
|
||||
|
||||
var socketInstanceActions = map[string]InstanceAction{
|
||||
"snapshot": {
|
||||
HandleInteractive: func(info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
HandleInteractive: func(ctx context.Context, info *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return info.Exporter.MakeExport(
|
||||
ctx,
|
||||
str,
|
||||
exporter.ExportTask{
|
||||
Dest: "",
|
||||
|
|
@ -33,18 +35,18 @@ var socketInstanceActions = map[string]InstanceAction{
|
|||
},
|
||||
},
|
||||
"rebuild": {
|
||||
HandleInteractive: func(_ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Barrel().Build(str, true)
|
||||
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Barrel().Build(ctx, str, true)
|
||||
},
|
||||
},
|
||||
"update": {
|
||||
HandleInteractive: func(_ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Drush().Update(str)
|
||||
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Drush().Update(ctx, str)
|
||||
},
|
||||
},
|
||||
"cron": {
|
||||
HandleInteractive: func(_ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Drush().Cron(str)
|
||||
HandleInteractive: func(ctx context.Context, _ *Info, instance *wisski.WissKI, str stream.IOStream, params ...string) error {
|
||||
return instance.Drush().Cron(ctx, str)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -75,7 +77,7 @@ func (info *Info) handleInstanceAction(conn httpx.WebSocketConnection, action In
|
|||
}
|
||||
|
||||
// resolve the instance
|
||||
instance, err := info.Instances.WissKI(string(slug.Bytes))
|
||||
instance, err := info.Instances.WissKI(conn.Context(), string(slug.Bytes))
|
||||
if err != nil {
|
||||
<-conn.WriteText("Instance not found")
|
||||
return
|
||||
|
|
@ -110,7 +112,7 @@ func (info *Info) handleInstanceAction(conn httpx.WebSocketConnection, action In
|
|||
|
||||
// handle the interactive action
|
||||
if action.HandleInteractive != nil {
|
||||
err := action.HandleInteractive(info, instance, str, params...)
|
||||
err := action.HandleInteractive(conn.Context(), info, instance, str, params...)
|
||||
if err != nil {
|
||||
str.EPrintln(err)
|
||||
return
|
||||
|
|
@ -120,7 +122,7 @@ func (info *Info) handleInstanceAction(conn httpx.WebSocketConnection, action In
|
|||
|
||||
// handle the result computation
|
||||
if action.HandleResult != nil {
|
||||
result, err := action.HandleResult(info, instance, params...)
|
||||
result, err := action.HandleResult(conn.Context(), info, instance, params...)
|
||||
if err != nil {
|
||||
str.Println("false")
|
||||
return
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
// The server may spawn background tasks, but these should be terminated once context closes.
|
||||
//
|
||||
// Logging messages are directed to io.
|
||||
func (control *Control) Server(context context.Context, io stream.IOStream) (*http.ServeMux, error) {
|
||||
func (control *Control) Server(ctx context.Context, io stream.IOStream) (*http.ServeMux, error) {
|
||||
// create a new mux
|
||||
mux := http.NewServeMux()
|
||||
|
||||
|
|
@ -19,7 +19,7 @@ func (control *Control) Server(context context.Context, io stream.IOStream) (*ht
|
|||
for _, s := range control.Servables {
|
||||
for _, route := range s.Routes() {
|
||||
io.Printf("mounting %s\n", route)
|
||||
handler, err := s.Handler(route, context, io)
|
||||
handler, err := s.Handler(ctx, route, io)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ func (*Static) Routes() []string { return []string{"/static/"} }
|
|||
//go:embed dist
|
||||
var staticFS embed.FS
|
||||
|
||||
func (static *Static) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
|
||||
func (static *Static) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
|
||||
// take the filesystem
|
||||
fs, err := fs.Sub(staticFS, "dist")
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
|
@ -49,7 +50,7 @@ type BackupDescription struct {
|
|||
}
|
||||
|
||||
// New create a new Backup
|
||||
func (exporter *Exporter) NewBackup(io stream.IOStream, description BackupDescription) (backup Backup) {
|
||||
func (exporter *Exporter) NewBackup(ctx context.Context, io stream.IOStream, description BackupDescription) (backup Backup) {
|
||||
backup.Description = description
|
||||
|
||||
// catch anything critical that happened during the snapshot
|
||||
|
|
@ -60,7 +61,7 @@ func (exporter *Exporter) NewBackup(io stream.IOStream, description BackupDescri
|
|||
// do the create keeping track of time!
|
||||
logging.LogOperation(func() error {
|
||||
backup.StartTime = time.Now().UTC()
|
||||
backup.run(io, exporter)
|
||||
backup.run(ctx, io, exporter)
|
||||
backup.EndTime = time.Now().UTC()
|
||||
|
||||
return nil
|
||||
|
|
@ -69,7 +70,7 @@ func (exporter *Exporter) NewBackup(io stream.IOStream, description BackupDescri
|
|||
return
|
||||
}
|
||||
|
||||
func (backup *Backup) run(ios stream.IOStream, exporter *Exporter) {
|
||||
func (backup *Backup) run(ctx context.Context, ios stream.IOStream, exporter *Exporter) {
|
||||
// create a manifest
|
||||
manifest, done := backup.handleManifest(backup.Description.Dest)
|
||||
defer done()
|
||||
|
|
@ -93,6 +94,7 @@ func (backup *Backup) run(ios stream.IOStream, exporter *Exporter) {
|
|||
Handler: func(bc component.Backupable, index int, writer io.Writer) error {
|
||||
return bc.Backup(
|
||||
component.NewStagingContext(
|
||||
ctx,
|
||||
exporter.Environment,
|
||||
stream.NewIOStream(writer, writer, nil, 0),
|
||||
filepath.Join(backup.Description.Dest, bc.BackupName()),
|
||||
|
|
@ -124,7 +126,7 @@ func (backup *Backup) run(ios stream.IOStream, exporter *Exporter) {
|
|||
}
|
||||
|
||||
// list all instances
|
||||
wissKIs, err := exporter.Instances.All()
|
||||
wissKIs, err := exporter.Instances.All(ctx)
|
||||
if err != nil {
|
||||
backup.InstanceListErr = err
|
||||
return nil
|
||||
|
|
@ -147,7 +149,7 @@ func (backup *Backup) run(ios stream.IOStream, exporter *Exporter) {
|
|||
|
||||
manifest <- dir
|
||||
|
||||
return exporter.NewSnapshot(instance, stream.NewIOStream(writer, writer, nil, 0), SnapshotDescription{
|
||||
return exporter.NewSnapshot(ctx, instance, stream.NewIOStream(writer, writer, nil, 0), SnapshotDescription{
|
||||
Dest: dir,
|
||||
})
|
||||
},
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
|
|
@ -19,8 +20,8 @@ func (Bookkeeping) SnapshotNeedsRunning() bool { return false }
|
|||
func (Bookkeeping) SnapshotName() string { return "bookkeeping.txt" }
|
||||
|
||||
// Snapshot creates a snapshot of this instance
|
||||
func (*Bookkeeping) Snapshot(wisski models.Instance, context component.StagingContext) error {
|
||||
return context.AddFile(".", func(file io.Writer) error {
|
||||
func (*Bookkeeping) Snapshot(wisski models.Instance, scontext component.StagingContext) error {
|
||||
return scontext.AddFile(".", func(ctx context.Context, file io.Writer) error {
|
||||
_, err := fmt.Fprintf(file, "%#v\n", wisski)
|
||||
return err
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
|
|
@ -15,13 +16,13 @@ func (*Config) BackupName() string {
|
|||
return "config"
|
||||
}
|
||||
|
||||
func (control *Config) Backup(context component.StagingContext) error {
|
||||
func (control *Config) Backup(scontext component.StagingContext) error {
|
||||
files := control.backupFiles()
|
||||
|
||||
return context.AddDirectory("", func() error {
|
||||
return scontext.AddDirectory("", func(ctx context.Context) error {
|
||||
for _, src := range files {
|
||||
name := filepath.Base(src)
|
||||
if err := context.CopyFile(name, src); err != nil {
|
||||
if err := scontext.CopyFile(name, src); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
|
|
@ -17,15 +18,15 @@ func (Pathbuilders) SnapshotNeedsRunning() bool { return true }
|
|||
|
||||
func (Pathbuilders) SnapshotName() string { return "pathbuilders" }
|
||||
|
||||
func (pbs *Pathbuilders) Snapshot(wisski models.Instance, context component.StagingContext) error {
|
||||
return context.AddDirectory(".", func() error {
|
||||
builders, err := pbs.Instances.Instance(wisski).Pathbuilder().GetAll(nil)
|
||||
func (pbs *Pathbuilders) Snapshot(wisski models.Instance, scontext component.StagingContext) error {
|
||||
return scontext.AddDirectory(".", func(ctx context.Context) error {
|
||||
builders, err := pbs.Instances.Instance(ctx, wisski).Pathbuilder().GetAll(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for name, bytes := range builders {
|
||||
if err := context.AddFile(name+".xml", func(file io.Writer) error {
|
||||
if err := scontext.AddFile(name+".xml", func(ctx context.Context, file io.Writer) error {
|
||||
_, err := file.Write([]byte(bytes))
|
||||
return err
|
||||
}); err != nil {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
|
|
@ -42,7 +43,7 @@ type export interface {
|
|||
|
||||
// MakeExport performs an export task as described by flags.
|
||||
// Output is directed to the provided io.
|
||||
func (exporter *Exporter) MakeExport(io stream.IOStream, task ExportTask) (err error) {
|
||||
func (exporter *Exporter) MakeExport(ctx context.Context, io stream.IOStream, task ExportTask) (err error) {
|
||||
// extract parameters
|
||||
Title := "Backup"
|
||||
Slug := ""
|
||||
|
|
@ -95,11 +96,11 @@ func (exporter *Exporter) MakeExport(io stream.IOStream, task ExportTask) (err e
|
|||
var sl export
|
||||
if task.Instance == nil {
|
||||
task.BackupDescription.Dest = stagingDir
|
||||
backup := exporter.NewBackup(io, task.BackupDescription)
|
||||
backup := exporter.NewBackup(ctx, io, task.BackupDescription)
|
||||
sl = &backup
|
||||
} else {
|
||||
task.SnapshotDescription.Dest = stagingDir
|
||||
snapshot := exporter.NewSnapshot(task.Instance, io, task.SnapshotDescription)
|
||||
snapshot := exporter.NewSnapshot(ctx, task.Instance, io, task.SnapshotDescription)
|
||||
sl = &snapshot
|
||||
}
|
||||
|
||||
|
|
@ -131,7 +132,7 @@ func (exporter *Exporter) MakeExport(io stream.IOStream, task ExportTask) (err e
|
|||
// write out the log entry
|
||||
entry.Path = stagingDir
|
||||
entry.Packed = false
|
||||
exporter.ExporterLogger.Add(entry)
|
||||
exporter.ExporterLogger.Add(ctx, entry)
|
||||
|
||||
io.Printf("Wrote %s\n", stagingDir)
|
||||
return nil
|
||||
|
|
@ -159,7 +160,7 @@ func (exporter *Exporter) MakeExport(io stream.IOStream, task ExportTask) (err e
|
|||
logging.LogMessage(io, "Writing Log Entry")
|
||||
entry.Path = archivePath
|
||||
entry.Packed = true
|
||||
exporter.ExporterLogger.Add(entry)
|
||||
exporter.ExporterLogger.Add(ctx, entry)
|
||||
|
||||
// and we're done!
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/sql"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||
|
|
@ -19,8 +21,8 @@ type Logger struct {
|
|||
// For retrieves (and prunes) the ExportLog.
|
||||
// Slug determines if entries for Backups (empty slug)
|
||||
// or a specific Instance (non-empty slug) are returned.
|
||||
func (log *Logger) For(slug string) (exports []models.Export, err error) {
|
||||
exports, err = log.Log()
|
||||
func (log *Logger) For(ctx context.Context, slug string) (exports []models.Export, err error) {
|
||||
exports, err = log.Log(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -31,9 +33,9 @@ func (log *Logger) For(slug string) (exports []models.Export, err error) {
|
|||
}
|
||||
|
||||
// Log retrieves (and prunes) all entries in the snapshot log.
|
||||
func (log *Logger) Log() ([]models.Export, error) {
|
||||
func (log *Logger) Log(ctx context.Context) ([]models.Export, error) {
|
||||
// query the table!
|
||||
table, err := log.SQL.QueryTable(false, models.ExportTable)
|
||||
table, err := log.SQL.QueryTable(ctx, false, models.ExportTable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -63,9 +65,9 @@ func (log *Logger) Log() ([]models.Export, error) {
|
|||
}
|
||||
|
||||
// AddToExportLog adds the provided export to the log.
|
||||
func (log *Logger) Add(export models.Export) error {
|
||||
func (log *Logger) Add(ctx context.Context, export models.Export) error {
|
||||
// find the table
|
||||
table, err := log.SQL.QueryTable(false, models.ExportTable)
|
||||
table, err := log.SQL.QueryTable(ctx, false, models.ExportTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -79,7 +81,7 @@ func (log *Logger) Add(export models.Export) error {
|
|||
}
|
||||
|
||||
// Fetch writes the SnapshotLog into the given observation
|
||||
func (logger *Logger) Fetch(flags component.FetcherFlags, target *status.Distillery) (err error) {
|
||||
target.Backups, err = logger.For("")
|
||||
func (logger *Logger) Fetch(ctx context.Context, flags component.FetcherFlags, target *status.Distillery) (err error) {
|
||||
target.Backups, err = logger.For(ctx, "")
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
|
|
@ -14,7 +15,7 @@ func (exporter *Exporter) ShouldPrune(modtime time.Time) bool {
|
|||
}
|
||||
|
||||
// Prune prunes all old exports
|
||||
func (exporter *Exporter) PruneExports(io stream.IOStream) error {
|
||||
func (exporter *Exporter) PruneExports(ctx context.Context, io stream.IOStream) error {
|
||||
sPath := exporter.ArchivePath()
|
||||
|
||||
// list all the files
|
||||
|
|
@ -50,6 +51,6 @@ func (exporter *Exporter) PruneExports(io stream.IOStream) error {
|
|||
}
|
||||
|
||||
// prune the snapshot log!
|
||||
_, err = exporter.ExporterLogger.Log()
|
||||
_, err = exporter.ExporterLogger.Log(ctx)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package exporter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
|
@ -44,10 +45,10 @@ type Snapshot struct {
|
|||
}
|
||||
|
||||
// Snapshot creates a new snapshot of this instance into dest
|
||||
func (snapshots *Exporter) NewSnapshot(instance *wisski.WissKI, io stream.IOStream, desc SnapshotDescription) (snapshot Snapshot) {
|
||||
func (snapshots *Exporter) NewSnapshot(ctx context.Context, instance *wisski.WissKI, io stream.IOStream, desc SnapshotDescription) (snapshot Snapshot) {
|
||||
|
||||
logging.LogMessage(io, "Locking instance")
|
||||
if !instance.Locker().TryLock() {
|
||||
if !instance.Locker().TryLock(ctx) {
|
||||
err := locker.Locked
|
||||
io.EPrintln(err)
|
||||
logging.LogMessage(io, "Aborting snapshot creation")
|
||||
|
|
@ -58,7 +59,7 @@ func (snapshots *Exporter) NewSnapshot(instance *wisski.WissKI, io stream.IOStre
|
|||
}
|
||||
defer func() {
|
||||
logging.LogMessage(io, "Unlocking instance")
|
||||
instance.Locker().Unlock()
|
||||
instance.Locker().Unlock(ctx)
|
||||
}()
|
||||
|
||||
// setup the snapshot
|
||||
|
|
@ -74,8 +75,8 @@ func (snapshots *Exporter) NewSnapshot(instance *wisski.WissKI, io stream.IOStre
|
|||
logging.LogOperation(func() error {
|
||||
snapshot.StartTime = time.Now().UTC()
|
||||
|
||||
snapshot.ErrWhitebox = snapshot.makeParts(io, snapshots, instance, false)
|
||||
snapshot.ErrBlackbox = snapshot.makeParts(io, snapshots, instance, true)
|
||||
snapshot.ErrWhitebox = snapshot.makeParts(ctx, io, snapshots, instance, false)
|
||||
snapshot.ErrBlackbox = snapshot.makeParts(ctx, io, snapshots, instance, true)
|
||||
|
||||
snapshot.EndTime = time.Now().UTC()
|
||||
return nil
|
||||
|
|
@ -85,16 +86,16 @@ func (snapshots *Exporter) NewSnapshot(instance *wisski.WissKI, io stream.IOStre
|
|||
return
|
||||
}
|
||||
|
||||
func (snapshot *Snapshot) makeParts(ios stream.IOStream, snapshots *Exporter, instance *wisski.WissKI, needsRunning bool) map[string]error {
|
||||
func (snapshot *Snapshot) makeParts(ctx context.Context, ios stream.IOStream, snapshots *Exporter, instance *wisski.WissKI, needsRunning bool) map[string]error {
|
||||
if !needsRunning && !snapshot.Description.Keepalive {
|
||||
stack := instance.Barrel().Stack()
|
||||
|
||||
logging.LogMessage(ios, "Stopping instance")
|
||||
snapshot.ErrStop = stack.Down(ios)
|
||||
snapshot.ErrStop = stack.Down(ctx, ios)
|
||||
|
||||
defer func() {
|
||||
logging.LogMessage(ios, "Starting instance")
|
||||
snapshot.ErrStart = stack.Up(ios)
|
||||
snapshot.ErrStart = stack.Up(ctx, ios)
|
||||
}()
|
||||
}
|
||||
// handle writing the manifest!
|
||||
|
|
@ -123,6 +124,7 @@ func (snapshot *Snapshot) makeParts(ios stream.IOStream, snapshots *Exporter, in
|
|||
return sc.Snapshot(
|
||||
instance.Instance,
|
||||
component.NewStagingContext(
|
||||
ctx,
|
||||
snapshots.Environment,
|
||||
stream.NewIOStream(writer, writer, nil, 0),
|
||||
filepath.Join(snapshot.Description.Dest, sc.SnapshotName()),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
package component
|
||||
|
||||
import "github.com/FAU-CDI/wisski-distillery/internal/status"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/status"
|
||||
)
|
||||
|
||||
type DistilleryFetcher interface {
|
||||
Component
|
||||
|
|
@ -11,4 +15,6 @@ type DistilleryFetcher interface {
|
|||
}
|
||||
|
||||
// FetcherFlags describes options for a DistilleryFetcher
|
||||
type FetcherFlags struct{}
|
||||
type FetcherFlags struct {
|
||||
Context context.Context
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
package component
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
|
||||
"github.com/tkw1536/goprogram/stream"
|
||||
)
|
||||
|
|
@ -40,5 +42,5 @@ type Updatable interface {
|
|||
// It may send output to the provided stream.
|
||||
//
|
||||
// Updating should be idempotent, meaning running it multiple times must not break the existing system.
|
||||
Update(stream stream.IOStream) error
|
||||
Update(ctx context.Context, stream stream.IOStream) error
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package instances
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
|
||||
|
|
@ -42,13 +43,13 @@ func (instances *Instances) use(wisski *wisski.WissKI) {
|
|||
|
||||
// WissKI returns the WissKI with the provided slug, if it exists.
|
||||
// It the WissKI does not exist, returns ErrWissKINotFound.
|
||||
func (instances *Instances) WissKI(slug string) (wissKI *wisski.WissKI, err error) {
|
||||
func (instances *Instances) WissKI(ctx context.Context, slug string) (wissKI *wisski.WissKI, err error) {
|
||||
sql := instances.SQL
|
||||
if err := sql.WaitQueryTable(); err != nil {
|
||||
if err := sql.WaitQueryTable(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
table, err := sql.QueryTable(false, models.InstanceTable)
|
||||
table, err := sql.QueryTable(ctx, false, models.InstanceTable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -72,8 +73,8 @@ func (instances *Instances) WissKI(slug string) (wissKI *wisski.WissKI, err erro
|
|||
|
||||
// Instance is a convenience function to return an instance based on a model slug.
|
||||
// When the instance does not exist, returns nil.
|
||||
func (instances *Instances) Instance(instance models.Instance) *wisski.WissKI {
|
||||
wissKI, err := instances.WissKI(instance.Slug)
|
||||
func (instances *Instances) Instance(ctx context.Context, instance models.Instance) *wisski.WissKI {
|
||||
wissKI, err := instances.WissKI(ctx, instance.Slug)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -82,13 +83,13 @@ func (instances *Instances) Instance(instance models.Instance) *wisski.WissKI {
|
|||
|
||||
// Has checks if a WissKI with the provided slug exists inside the database.
|
||||
// It does not perform any checks on the WissKI itself.
|
||||
func (instances *Instances) Has(slug string) (ok bool, err error) {
|
||||
func (instances *Instances) Has(ctx context.Context, slug string) (ok bool, err error) {
|
||||
sql := instances.SQL
|
||||
if err := sql.WaitQueryTable(); err != nil {
|
||||
if err := sql.WaitQueryTable(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
table, err := sql.QueryTable(false, models.InstanceTable)
|
||||
table, err := sql.QueryTable(ctx, false, models.InstanceTable)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -103,37 +104,37 @@ func (instances *Instances) Has(slug string) (ok bool, err error) {
|
|||
// All returns all instances of the WissKI Distillery in consistent order.
|
||||
//
|
||||
// There is no guarantee that this order remains identical between different api releases; however subsequent invocations are guaranteed to return the same order.
|
||||
func (instances *Instances) All() ([]*wisski.WissKI, error) {
|
||||
return instances.find(true, func(table *gorm.DB) *gorm.DB {
|
||||
func (instances *Instances) All(ctx context.Context) ([]*wisski.WissKI, error) {
|
||||
return instances.find(ctx, true, func(table *gorm.DB) *gorm.DB {
|
||||
return table
|
||||
})
|
||||
}
|
||||
|
||||
// WissKIs returns the WissKI instances with the provides slugs.
|
||||
// If a slug does not exist, it is omitted from the result.
|
||||
func (instances *Instances) WissKIs(slugs ...string) ([]*wisski.WissKI, error) {
|
||||
return instances.find(true, func(table *gorm.DB) *gorm.DB {
|
||||
func (instances *Instances) WissKIs(ctx context.Context, slugs ...string) ([]*wisski.WissKI, error) {
|
||||
return instances.find(ctx, true, func(table *gorm.DB) *gorm.DB {
|
||||
return table.Where("slug IN ?", slugs)
|
||||
})
|
||||
}
|
||||
|
||||
// Load is like All, except that when no slugs are provided, it calls All.
|
||||
func (instances *Instances) Load(slugs ...string) ([]*wisski.WissKI, error) {
|
||||
func (instances *Instances) Load(ctx context.Context, slugs ...string) ([]*wisski.WissKI, error) {
|
||||
if len(slugs) == 0 {
|
||||
return instances.All()
|
||||
return instances.All(ctx)
|
||||
}
|
||||
return instances.WissKIs(slugs...)
|
||||
return instances.WissKIs(ctx, slugs...)
|
||||
}
|
||||
|
||||
// find finds instances based on the provided query
|
||||
func (instances *Instances) find(order bool, query func(table *gorm.DB) *gorm.DB) (results []*wisski.WissKI, err error) {
|
||||
func (instances *Instances) find(ctx context.Context, order bool, query func(table *gorm.DB) *gorm.DB) (results []*wisski.WissKI, err error) {
|
||||
sql := instances.SQL
|
||||
if err := sql.WaitQueryTable(); err != nil {
|
||||
if err := sql.WaitQueryTable(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open the bookkeeping table
|
||||
table, err := sql.QueryTable(false, models.InstanceTable)
|
||||
table, err := sql.QueryTable(ctx, false, models.InstanceTable)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package instances
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/unpack"
|
||||
|
|
@ -19,7 +20,7 @@ var errBootstrapFailedRuntime = exit.Error{
|
|||
var runtimeResources embed.FS
|
||||
|
||||
// Update installs or updates runtime components needed by this component.
|
||||
func (instances *Instances) Update(stream stream.IOStream) error {
|
||||
func (instances *Instances) Update(ctx context.Context, stream stream.IOStream) error {
|
||||
err := unpack.InstallDir(instances.Still.Environment, instances.Config.RuntimeDir(), "runtime", runtimeResources, func(dst, src string) {
|
||||
stream.Printf("[copy] %s\n", dst)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1,14 +1,18 @@
|
|||
package meta
|
||||
|
||||
import "github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||
)
|
||||
|
||||
// Provision provisions new meta storage for this instance.
|
||||
// NOTE(twiesing): This is a no-op, because we implement Purge.
|
||||
func (meta *Meta) Provision(instance models.Instance, domain string) error {
|
||||
func (meta *Meta) Provision(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Purge purges the storage for the given instance.
|
||||
func (meta *Meta) Purge(instance models.Instance, domain string) error {
|
||||
return meta.Storage(instance.Slug).Purge()
|
||||
func (meta *Meta) Purge(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return meta.Storage(instance.Slug).Purge(ctx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
||||
|
|
@ -24,8 +25,8 @@ type Storage struct {
|
|||
|
||||
// Get retrieves metadata with the provided key and deserializes the first one into target.
|
||||
// If no metadatum exists, returns [ErrMetadatumNotSet].
|
||||
func (s Storage) Get(key Key, target any) error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) Get(ctx context.Context, key Key, target any) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -53,8 +54,8 @@ func (s Storage) Get(key Key, target any) error {
|
|||
// The function is intended to return a target for deserialization.
|
||||
//
|
||||
// When no metadatum exists, targets is not called, and nil error is returned.
|
||||
func (s Storage) GetAll(key Key, target func(index, total int) any) error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) GetAll(ctx context.Context, key Key, target func(index, total int) any) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -80,8 +81,8 @@ func (s Storage) GetAll(key Key, target func(index, total int) any) error {
|
|||
}
|
||||
|
||||
// Delete deletes all metadata with the provided key.
|
||||
func (s Storage) Delete(key Key) error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) Delete(ctx context.Context, key Key) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -96,8 +97,8 @@ func (s Storage) Delete(key Key) error {
|
|||
|
||||
// Set serializes value and stores it with the provided key.
|
||||
// Any other metadata with the same key is deleted.
|
||||
func (s Storage) Set(key Key, value any) error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) Set(ctx context.Context, key Key, value any) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -131,8 +132,8 @@ func (s Storage) Set(key Key, value any) error {
|
|||
|
||||
// Set serializes values and stores them with the provided key.
|
||||
// Any other metadata with the same key is deleted.
|
||||
func (s Storage) SetAll(key Key, values ...any) error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) SetAll(ctx context.Context, key Key, values ...any) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -165,8 +166,8 @@ func (s Storage) SetAll(key Key, values ...any) error {
|
|||
}
|
||||
|
||||
// Purge removes all metadata, regardless of key.
|
||||
func (s Storage) Purge() error {
|
||||
table, err := s.sql.QueryTable(true, models.MetadataTable)
|
||||
func (s Storage) Purge(ctx context.Context) error {
|
||||
table, err := s.sql.QueryTable(ctx, true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -181,22 +182,22 @@ func (s Storage) Purge() error {
|
|||
// TypedKey represents a convenience wrapper for a given with a given value.
|
||||
type TypedKey[Value any] Key
|
||||
|
||||
func (f TypedKey[Value]) Get(s *Storage) (value Value, err error) {
|
||||
err = s.Get(Key(f), &value)
|
||||
func (f TypedKey[Value]) Get(ctx context.Context, s *Storage) (value Value, err error) {
|
||||
err = s.Get(ctx, Key(f), &value)
|
||||
return
|
||||
}
|
||||
|
||||
func (f TypedKey[Value]) GetOrSet(s *Storage, dflt Value) (value Value, err error) {
|
||||
value, err = f.Get(s)
|
||||
func (f TypedKey[Value]) GetOrSet(ctx context.Context, s *Storage, dflt Value) (value Value, err error) {
|
||||
value, err = f.Get(ctx, s)
|
||||
if err == ErrMetadatumNotSet {
|
||||
value = dflt
|
||||
err = f.Set(s, value)
|
||||
err = f.Set(ctx, s, value)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f TypedKey[Value]) GetAll(m *Storage) (values []Value, err error) {
|
||||
err = m.GetAll(Key(f), func(index, total int) any {
|
||||
func (f TypedKey[Value]) GetAll(ctx context.Context, m *Storage) (values []Value, err error) {
|
||||
err = m.GetAll(ctx, Key(f), func(index, total int) any {
|
||||
if values == nil {
|
||||
values = make([]Value, total)
|
||||
}
|
||||
|
|
@ -205,14 +206,14 @@ func (f TypedKey[Value]) GetAll(m *Storage) (values []Value, err error) {
|
|||
return values, err
|
||||
}
|
||||
|
||||
func (f TypedKey[Value]) Set(m *Storage, value Value) error {
|
||||
return m.Set(Key(f), value)
|
||||
func (f TypedKey[Value]) Set(ctx context.Context, m *Storage, value Value) error {
|
||||
return m.Set(ctx, Key(f), value)
|
||||
}
|
||||
|
||||
func (f TypedKey[Value]) SetAll(m *Storage, values ...Value) error {
|
||||
return m.SetAll(Key(f), collection.AsAny(values)...)
|
||||
func (f TypedKey[Value]) SetAll(ctx context.Context, m *Storage, values ...Value) error {
|
||||
return m.SetAll(ctx, Key(f), collection.AsAny(values)...)
|
||||
}
|
||||
|
||||
func (f TypedKey[Value]) Delete(m *Storage) error {
|
||||
return m.Delete(Key(f))
|
||||
func (f TypedKey[Value]) Delete(ctx context.Context, m *Storage) error {
|
||||
return m.Delete(ctx, Key(f))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
package component
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||
)
|
||||
|
||||
|
|
@ -10,9 +12,9 @@ type Provisionable interface {
|
|||
|
||||
// Provision provisions resources specific to the provided instance.
|
||||
// Domain holds the full (unique) domain name of the given instance.
|
||||
Provision(instance models.Instance, domain string) error
|
||||
Provision(ctx context.Context, instance models.Instance, domain string) error
|
||||
|
||||
// Purge purges resources specific to the provided instance.
|
||||
// Domain holds the full (unique) domain name of the given instance.
|
||||
Purge(instance models.Instance, domain string) error
|
||||
Purge(ctx context.Context, instance models.Instance, domain string) error
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,20 +9,34 @@ import (
|
|||
)
|
||||
|
||||
// updatePrefixes starts updating prefixes
|
||||
func (resolver *Resolver) updatePrefixes(io stream.IOStream, ctx context.Context) {
|
||||
func (resolver *Resolver) updatePrefixes(ctx context.Context, io stream.IOStream) {
|
||||
go func() {
|
||||
for t := range timex.TickContext(ctx, resolver.RefreshInterval) {
|
||||
io.Printf("[%s]: reloading prefixes\n", t.Format(time.Stamp))
|
||||
prefixes, _ := resolver.AllPrefixes()
|
||||
resolver.prefixes.Set(prefixes)
|
||||
|
||||
err := (func() (err error) {
|
||||
ctx, cancel := context.WithTimeout(ctx, resolver.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
prefixes, err := resolver.AllPrefixes(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resolver.prefixes.Set(prefixes)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
io.EPrintf("error reloading prefixes: ", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// AllPrefixes returns a list of all prefixes from the server.
|
||||
// Prefixes may be cached on the server
|
||||
func (resolver *Resolver) AllPrefixes() (map[string]string, error) {
|
||||
instances, err := resolver.Instances.All()
|
||||
func (resolver *Resolver) AllPrefixes(ctx context.Context) (map[string]string, error) {
|
||||
instances, err := resolver.Instances.All(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -37,7 +51,7 @@ func (resolver *Resolver) AllPrefixes() (map[string]string, error) {
|
|||
|
||||
// failed to fetch prefixes for this particular instance
|
||||
// => skip it!
|
||||
prefixes, err := instance.Prefixes().AllCached()
|
||||
prefixes, err := instance.Prefixes().AllCached(ctx)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ type Resolver struct {
|
|||
|
||||
func (resolver *Resolver) Routes() []string { return []string{"/go/", "/wisski/get/"} }
|
||||
|
||||
func (resolver *Resolver) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
|
||||
func (resolver *Resolver) Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error) {
|
||||
var err error
|
||||
return resolver.handler.Get(func() (p wdresolve.ResolveHandler) {
|
||||
p.TrustXForwardedProto = true
|
||||
|
|
@ -51,7 +51,7 @@ func (resolver *Resolver) Handler(route string, context context.Context, io stre
|
|||
}
|
||||
|
||||
// start updating prefixes
|
||||
resolver.updatePrefixes(io, context)
|
||||
resolver.updatePrefixes(ctx, io)
|
||||
|
||||
// resolve the prefixes
|
||||
p.Resolver = resolvers.InOrder{
|
||||
|
|
|
|||
|
|
@ -15,5 +15,5 @@ type Servable interface {
|
|||
Routes() []string
|
||||
|
||||
// Handler returns the handler for the requested route
|
||||
Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error)
|
||||
Handler(ctx context.Context, route string, io stream.IOStream) (http.Handler, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package solr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
|
@ -15,8 +14,7 @@ type Solr struct {
|
|||
|
||||
BaseURL string // upstream solr url
|
||||
|
||||
PollContext context.Context // context to abort polling with
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
}
|
||||
|
||||
func (s *Solr) Path() string {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
|
|
@ -14,10 +15,10 @@ func (*SQL) BackupName() string {
|
|||
}
|
||||
|
||||
// Backup makes a backup of all SQL databases into the path dest.
|
||||
func (sql *SQL) Backup(context component.StagingContext) error {
|
||||
return context.AddFile("", func(file io.Writer) error {
|
||||
io := context.IO().Streams(file, nil, nil, 0).NonInteractive()
|
||||
code, err := sql.Stack(sql.Environment).Exec(io, "sql", "mysqldump", "--all-databases")
|
||||
func (sql *SQL) Backup(scontext component.StagingContext) error {
|
||||
return scontext.AddFile("", func(ctx context.Context, file io.Writer) error {
|
||||
io := scontext.IO().Streams(file, nil, nil, 0).NonInteractive()
|
||||
code, err := sql.Stack(sql.Environment).Exec(ctx, io, "sql", "mysqldump", "--all-databases")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,20 +40,12 @@ func (sql *SQL) Exec(query string, args ...interface{}) error {
|
|||
}
|
||||
}
|
||||
|
||||
// WaitExec waits for the query interface to be able to connect to the database
|
||||
func (sql *SQL) WaitExec() error {
|
||||
return timex.TickUntilFunc(func(time.Time) bool {
|
||||
err := sql.Exec("select 1;")
|
||||
return err == nil
|
||||
}, sql.PollContext, sql.PollInterval)
|
||||
}
|
||||
|
||||
//
|
||||
// ========== connection via gorm ==========
|
||||
//
|
||||
|
||||
// QueryTable returns a gorm.DB to connect to the provided distillery database table
|
||||
func (sql *SQL) QueryTable(silent bool, table string) (*gorm.DB, error) {
|
||||
func (sql *SQL) QueryTable(ctx context.Context, silent bool, table string) (*gorm.DB, error) {
|
||||
conn, err := sql.connect(sql.Config.DistilleryDatabase)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -79,7 +71,7 @@ func (sql *SQL) QueryTable(silent bool, table string) (*gorm.DB, error) {
|
|||
}
|
||||
|
||||
// set the table
|
||||
db = db.Table(table)
|
||||
db = db.WithContext(ctx).Table(table)
|
||||
|
||||
// check that nothing went wrong
|
||||
if db.Error != nil {
|
||||
|
|
@ -89,12 +81,12 @@ func (sql *SQL) QueryTable(silent bool, table string) (*gorm.DB, error) {
|
|||
}
|
||||
|
||||
// WaitQueryTable waits for a connection to succeed via QueryTable
|
||||
func (sql *SQL) WaitQueryTable() error {
|
||||
func (sql *SQL) WaitQueryTable(ctx context.Context) error {
|
||||
// TODO: Establish a convention on when to wait for this!
|
||||
return timex.TickUntilFunc(func(time.Time) bool {
|
||||
_, err := sql.QueryTable(true, models.InstanceTable)
|
||||
_, err := sql.QueryTable(ctx, true, models.InstanceTable)
|
||||
return err == nil
|
||||
}, sql.PollContext, sql.PollInterval)
|
||||
}, ctx, sql.PollInterval)
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||
|
|
@ -12,15 +13,15 @@ var errProvisionInvalidDatabaseParams = errors.New("Provision: Invalid parameter
|
|||
var errProvisionInvalidGrant = errors.New("Provision: Grant failed")
|
||||
|
||||
// Provision provisions sql-specific resource for the given instance
|
||||
func (sql *SQL) Provision(instance models.Instance, domain string) error {
|
||||
return sql.CreateDatabase(instance.SqlDatabase, instance.SqlUsername, instance.SqlPassword)
|
||||
func (sql *SQL) Provision(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return sql.CreateDatabase(ctx, instance.SqlDatabase, instance.SqlUsername, instance.SqlPassword)
|
||||
}
|
||||
|
||||
// Purge purges sql-specific resources for the given instance
|
||||
func (sql *SQL) Purge(instance models.Instance, domain string) error {
|
||||
func (sql *SQL) Purge(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return errorx.First(
|
||||
sql.PurgeDatabase(instance.SqlDatabase),
|
||||
sql.PurgeUser(instance.SqlUsername),
|
||||
sql.PurgeUser(ctx, instance.SqlUsername),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -28,7 +29,7 @@ func (sql *SQL) Purge(instance models.Instance, domain string) error {
|
|||
// It then generates a new user, with the name 'user' and the password 'password', that is then granted access to this database.
|
||||
//
|
||||
// Provision internally waits for the database to become available.
|
||||
func (sql *SQL) CreateDatabase(name, user, password string) error {
|
||||
func (sql *SQL) CreateDatabase(ctx context.Context, name, user, password string) error {
|
||||
|
||||
// NOTE(twiesing): We shouldn't use string concat to build sql queries.
|
||||
// But the driver doesn't support using query params for this particular query.
|
||||
|
|
@ -43,14 +44,14 @@ func (sql *SQL) CreateDatabase(name, user, password string) error {
|
|||
// Queries of the form "CREATE USER 'test'@'%' IDENTIFIED BY 'test'; FLUSH PRIVILEGES;" return error 1064 when using driver, but are fine with the shell.
|
||||
// This should be fixed eventually, but I have no idea how.
|
||||
|
||||
if err := sql.unsafeWaitShell(); err != nil {
|
||||
if err := sql.unsafeWaitShell(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query := "CREATE DATABASE `" + name + "`;" +
|
||||
"CREATE USER '" + user + "'@'%' IDENTIFIED BY '" + password + "';" +
|
||||
"GRANT ALL PRIVILEGES ON `" + name + "`.* TO `" + user + "`@`%`; FLUSH PRIVILEGES;"
|
||||
if !sql.unsafeQueryShell(query) {
|
||||
if !sql.unsafeQueryShell(ctx, query) {
|
||||
return errProvisionInvalidGrant
|
||||
}
|
||||
|
||||
|
|
@ -63,7 +64,7 @@ var errCreateSuperuserGrant = errors.New("CreateSuperUser: Grant failed")
|
|||
// It then grants this user superuser status in the database.
|
||||
//
|
||||
// CreateSuperuser internally waits for the database to become available.
|
||||
func (sql *SQL) CreateSuperuser(user, password string, allowExisting bool) error {
|
||||
func (sql *SQL) CreateSuperuser(ctx context.Context, user, password string, allowExisting bool) error {
|
||||
// NOTE(twiesing): This function unsafely uses the shell directly to create a superuser.
|
||||
// This is for two reasons:
|
||||
// (1) this is used during bootstraping
|
||||
|
|
@ -74,7 +75,7 @@ func (sql *SQL) CreateSuperuser(user, password string, allowExisting bool) error
|
|||
return errProvisionInvalidDatabaseParams
|
||||
}
|
||||
|
||||
if err := sql.unsafeWaitShell(); err != nil {
|
||||
if err := sql.unsafeWaitShell(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -85,7 +86,7 @@ func (sql *SQL) CreateSuperuser(user, password string, allowExisting bool) error
|
|||
|
||||
query := "CREATE USER " + IfNotExists + " '" + user + "'@'%' IDENTIFIED BY '" + password + "';" +
|
||||
"GRANT ALL PRIVILEGES ON *.* TO '" + user + "'@'%' WITH GRANT OPTION; FLUSH PRIVILEGES;"
|
||||
if !sql.unsafeQueryShell(query) {
|
||||
if !sql.unsafeQueryShell(ctx, query) {
|
||||
return errCreateSuperuserGrant
|
||||
}
|
||||
|
||||
|
|
@ -95,14 +96,14 @@ func (sql *SQL) CreateSuperuser(user, password string, allowExisting bool) error
|
|||
var errPurgeUser = errors.New("PurgeUser: Failed to drop user")
|
||||
|
||||
// SQLPurgeUser deletes the specified user from the database
|
||||
func (sql *SQL) PurgeUser(user string) error {
|
||||
func (sql *SQL) PurgeUser(ctx context.Context, user string) error {
|
||||
if !sqle.IsSafeDatabaseSingleQuote(user) {
|
||||
return errPurgeUser
|
||||
}
|
||||
|
||||
query := "DROP USER IF EXISTS '" + user + "'@'%';" +
|
||||
"FLUSH PRIVILEGES;"
|
||||
if !sql.unsafeQueryShell(query) {
|
||||
if !sql.unsafeQueryShell(ctx, query) {
|
||||
return errPurgeUser
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
|
|
@ -12,19 +13,19 @@ func (*SQL) SnapshotNeedsRunning() bool { return false }
|
|||
|
||||
func (*SQL) SnapshotName() string { return "sql" }
|
||||
|
||||
func (sql *SQL) Snapshot(wisski models.Instance, context component.StagingContext) error {
|
||||
return context.AddDirectory(".", func() error {
|
||||
return context.AddFile(wisski.SqlDatabase+".sql", func(file io.Writer) error {
|
||||
return sql.SnapshotDB(context.IO(), file, wisski.SqlDatabase)
|
||||
func (sql *SQL) Snapshot(wisski models.Instance, scontext component.StagingContext) error {
|
||||
return scontext.AddDirectory(".", func(ctx context.Context) error {
|
||||
return scontext.AddFile(wisski.SqlDatabase+".sql", func(ctx context.Context, file io.Writer) error {
|
||||
return sql.SnapshotDB(ctx, scontext.IO(), file, wisski.SqlDatabase)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// SnapshotDB makes a backup of the sql database into dest.
|
||||
func (sql *SQL) SnapshotDB(io stream.IOStream, dest io.Writer, database string) error {
|
||||
func (sql *SQL) SnapshotDB(ctx context.Context, io stream.IOStream, dest io.Writer, database string) error {
|
||||
io = io.Streams(dest, nil, nil, 0).NonInteractive()
|
||||
|
||||
code, err := sql.Stack(sql.Environment).Exec(io, "sql", "mysqldump", "--databases", database)
|
||||
code, err := sql.Stack(sql.Environment).Exec(ctx, io, "sql", "mysqldump", "--databases", database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
|
@ -16,8 +15,7 @@ type SQL struct {
|
|||
|
||||
ServerURL string // upstream server url
|
||||
|
||||
PollContext context.Context // context to abort polling with
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
|
||||
lazyNetwork lazy.Lazy[string]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
|
@ -16,22 +17,22 @@ import (
|
|||
// Shell runs a mysql shell with the provided databases.
|
||||
//
|
||||
// NOTE(twiesing): This command should not be used to connect to the database or execute queries except in known situations.
|
||||
func (sql *SQL) Shell(io stream.IOStream, argv ...string) (int, error) {
|
||||
return sql.Stack(sql.Environment).Exec(io, "sql", "mysql", argv...)
|
||||
func (sql *SQL) Shell(ctx context.Context, io stream.IOStream, argv ...string) (int, error) {
|
||||
return sql.Stack(sql.Environment).Exec(ctx, io, "sql", "mysql", argv...)
|
||||
}
|
||||
|
||||
// unsafeWaitShell waits for a connection via the database shell to succeed
|
||||
func (sql *SQL) unsafeWaitShell() error {
|
||||
func (sql *SQL) unsafeWaitShell(ctx context.Context) error {
|
||||
n := stream.FromNil()
|
||||
return timex.TickUntilFunc(func(time.Time) bool {
|
||||
code, err := sql.Shell(n, "-e", "select 1;")
|
||||
code, err := sql.Shell(ctx, n, "-e", "select 1;")
|
||||
return err == nil && code == 0
|
||||
}, sql.PollContext, sql.PollInterval)
|
||||
}, ctx, sql.PollInterval)
|
||||
}
|
||||
|
||||
// unsafeQuery shell executes a raw database query.
|
||||
func (sql *SQL) unsafeQueryShell(query string) bool {
|
||||
code, err := sql.Shell(stream.FromNil(), "-e", query)
|
||||
func (sql *SQL) unsafeQueryShell(ctx context.Context, query string) bool {
|
||||
code, err := sql.Shell(ctx, stream.FromNil(), "-e", query)
|
||||
return err == nil && code == 0
|
||||
}
|
||||
|
||||
|
|
@ -43,18 +44,18 @@ var errSQLUnableToMigrate = exit.Error{
|
|||
}
|
||||
|
||||
// Update initializes or updates the SQL database.
|
||||
func (sql *SQL) Update(io stream.IOStream) error {
|
||||
func (sql *SQL) Update(ctx context.Context, io stream.IOStream) error {
|
||||
|
||||
// unsafely create the admin user!
|
||||
{
|
||||
if err := sql.unsafeWaitShell(); err != nil {
|
||||
if err := sql.unsafeWaitShell(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
logging.LogMessage(io, "Creating administrative user")
|
||||
{
|
||||
username := sql.Config.MysqlAdminUser
|
||||
password := sql.Config.MysqlAdminPassword
|
||||
if err := sql.CreateSuperuser(username, password, true); err != nil {
|
||||
if err := sql.CreateSuperuser(ctx, username, password, true); err != nil {
|
||||
return errSQLUnableToCreateUser
|
||||
}
|
||||
}
|
||||
|
|
@ -74,7 +75,7 @@ func (sql *SQL) Update(io stream.IOStream) error {
|
|||
|
||||
// wait for the database to come up
|
||||
logging.LogMessage(io, "Waiting for database update to be complete")
|
||||
sql.WaitQueryTable()
|
||||
sql.WaitQueryTable(ctx)
|
||||
|
||||
tables := []struct {
|
||||
name string
|
||||
|
|
@ -107,7 +108,7 @@ func (sql *SQL) Update(io stream.IOStream) error {
|
|||
return logging.LogOperation(func() error {
|
||||
for _, table := range tables {
|
||||
logging.LogMessage(io, "migrating %q table", table.name)
|
||||
db, err := sql.QueryTable(false, table.table)
|
||||
db, err := sql.QueryTable(ctx, false, table.table)
|
||||
if err != nil {
|
||||
return errSQLUnableToMigrate.WithMessageF(table.name, "unable to access table")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ func (ssh2 *SSH2) handleAuth(ctx ssh.Context, key ssh.PublicKey) bool {
|
|||
|
||||
// grab permissions for each instance
|
||||
{
|
||||
instances, err := ssh2.Instances.All()
|
||||
instances, err := ssh2.Instances.All(ctx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ package component
|
|||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"io/fs"
|
||||
"path/filepath"
|
||||
|
||||
|
|
@ -33,9 +34,9 @@ var errStackUpdateBuild = errors.New("Stack.Update: Build returned non-zero exit
|
|||
// This does not have a direct 'docker compose' shell equivalent.
|
||||
//
|
||||
// See also Up.
|
||||
func (ds Stack) Update(io stream.IOStream, start bool) error {
|
||||
func (ds Stack) Update(ctx context.Context, io stream.IOStream, start bool) error {
|
||||
{
|
||||
code, err := ds.compose(io, "pull")
|
||||
code, err := ds.compose(ctx, io, "pull")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -45,7 +46,7 @@ func (ds Stack) Update(io stream.IOStream, start bool) error {
|
|||
}
|
||||
|
||||
{
|
||||
code, err := ds.compose(io, "build", "--pull")
|
||||
code, err := ds.compose(ctx, io, "build", "--pull")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -54,7 +55,7 @@ func (ds Stack) Update(io stream.IOStream, start bool) error {
|
|||
}
|
||||
}
|
||||
if start {
|
||||
return ds.Up(io)
|
||||
return ds.Up(ctx, io)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -63,8 +64,8 @@ var errStackUp = errors.New("Stack.Up: Up returned non-zero exit code")
|
|||
|
||||
// Up creates and starts the containers in this Stack.
|
||||
// It is equivalent to 'docker compose up --remove-orphans --detach' on the shell.
|
||||
func (ds Stack) Up(io stream.IOStream) error {
|
||||
code, err := ds.compose(io, "up", "--remove-orphans", "--detach")
|
||||
func (ds Stack) Up(ctx context.Context, io stream.IOStream) error {
|
||||
code, err := ds.compose(ctx, io, "up", "--remove-orphans", "--detach")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -78,7 +79,7 @@ func (ds Stack) Up(io stream.IOStream) error {
|
|||
// It is equivalent to 'docker compose exec $service $executable $args...'.
|
||||
//
|
||||
// It returns the exit code of the process.
|
||||
func (ds Stack) Exec(io stream.IOStream, service, executable string, args ...string) (int, error) {
|
||||
func (ds Stack) Exec(ctx context.Context, io stream.IOStream, service, executable string, args ...string) (int, error) {
|
||||
compose := []string{"exec"}
|
||||
if io.StdinIsATerminal() {
|
||||
compose = append(compose, "-ti")
|
||||
|
|
@ -86,14 +87,14 @@ func (ds Stack) Exec(io stream.IOStream, service, executable string, args ...str
|
|||
compose = append(compose, service)
|
||||
compose = append(compose, executable)
|
||||
compose = append(compose, args...)
|
||||
return ds.compose(io, compose...)
|
||||
return ds.compose(ctx, io, compose...)
|
||||
}
|
||||
|
||||
// Run runs a command in a running container with the given executable.
|
||||
// It is equivalent to 'docker compose run [--rm] $service $executable $args...'.
|
||||
//
|
||||
// It returns the exit code of the process.
|
||||
func (ds Stack) Run(io stream.IOStream, autoRemove bool, service, command string, args ...string) (int, error) {
|
||||
func (ds Stack) Run(ctx context.Context, io stream.IOStream, autoRemove bool, service, command string, args ...string) (int, error) {
|
||||
compose := []string{"run"}
|
||||
if autoRemove {
|
||||
compose = append(compose, "--rm")
|
||||
|
|
@ -104,7 +105,7 @@ func (ds Stack) Run(io stream.IOStream, autoRemove bool, service, command string
|
|||
compose = append(compose, service, command)
|
||||
compose = append(compose, args...)
|
||||
|
||||
code, err := ds.compose(io, compose...)
|
||||
code, err := ds.compose(ctx, io, compose...)
|
||||
if err != nil {
|
||||
return environment.ExecCommandError, nil
|
||||
}
|
||||
|
|
@ -115,8 +116,8 @@ var errStackRestart = errors.New("Stack.Restart: Restart returned non-zero exit
|
|||
|
||||
// Restart restarts all containers in this Stack.
|
||||
// It is equivalent to 'docker compose restart' on the shell.
|
||||
func (ds Stack) Restart(io stream.IOStream) error {
|
||||
code, err := ds.compose(io, "restart")
|
||||
func (ds Stack) Restart(ctx context.Context, io stream.IOStream) error {
|
||||
code, err := ds.compose(ctx, io, "restart")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -129,12 +130,12 @@ func (ds Stack) Restart(io stream.IOStream) error {
|
|||
var errStackPs = errors.New("Stack.Ps: Down returned non-zero exit code")
|
||||
|
||||
// Ps returns the ids of the containers currently running
|
||||
func (ds Stack) Ps(io stream.IOStream) ([]string, error) {
|
||||
func (ds Stack) Ps(ctx context.Context, io stream.IOStream) ([]string, error) {
|
||||
// create a buffer
|
||||
var buffer bytes.Buffer
|
||||
|
||||
// read the ids from the command!
|
||||
code, err := ds.compose(io.Streams(&buffer, nil, nil, 0), "ps", "-q")
|
||||
code, err := ds.compose(ctx, io.Streams(&buffer, nil, nil, 0), "ps", "-q")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -162,8 +163,8 @@ var errStackDown = errors.New("Stack.Down: Down returned non-zero exit code")
|
|||
|
||||
// Down stops and removes all containers in this Stack.
|
||||
// It is equivalent to 'docker compose down -v' on the shell.
|
||||
func (ds Stack) Down(io stream.IOStream) error {
|
||||
code, err := ds.compose(io, "down", "-v")
|
||||
func (ds Stack) Down(ctx context.Context, io stream.IOStream) error {
|
||||
code, err := ds.compose(ctx, io, "down", "-v")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -177,7 +178,7 @@ func (ds Stack) Down(io stream.IOStream) error {
|
|||
//
|
||||
// NOTE(twiesing): Check if this can be replaced by an internal call to libcompose.
|
||||
// But probably not.
|
||||
func (ds Stack) compose(io stream.IOStream, args ...string) (int, error) {
|
||||
func (ds Stack) compose(ctx context.Context, io stream.IOStream, args ...string) (int, error) {
|
||||
if ds.DockerExecutable == "" {
|
||||
var err error
|
||||
ds.DockerExecutable, err = ds.Env.LookPathAbs("docker")
|
||||
|
|
@ -185,7 +186,7 @@ func (ds Stack) compose(io stream.IOStream, args ...string) (int, error) {
|
|||
return environment.ExecCommandError, err
|
||||
}
|
||||
}
|
||||
return ds.Env.Exec(io, ds.Dir, ds.DockerExecutable, append([]string{"compose"}, args...)...), nil
|
||||
return ds.Env.Exec(ctx, io, ds.Dir, ds.DockerExecutable, append([]string{"compose"}, args...)...), nil
|
||||
}
|
||||
|
||||
// StackWithResources represents a Stack that can be automatically installed from a set of resources.
|
||||
|
|
@ -218,7 +219,7 @@ type InstallationContext map[string]string
|
|||
//
|
||||
// Installation is non-interactive, but will provide debugging output onto io.
|
||||
// InstallationContext
|
||||
func (is StackWithResources) Install(io stream.IOStream, context InstallationContext) error {
|
||||
func (is StackWithResources) Install(ctx context.Context, io stream.IOStream, context InstallationContext) error {
|
||||
env := is.Stack.Env
|
||||
if is.ContextPath != "" {
|
||||
// setup the base files
|
||||
|
|
@ -277,7 +278,7 @@ func (is StackWithResources) Install(io stream.IOStream, context InstallationCon
|
|||
|
||||
// copy over file from context
|
||||
io.Printf("[copy] %s (from %s)\n", dst, src)
|
||||
if err := fsx.CopyFile(env, dst, src); err != nil {
|
||||
if err := fsx.CopyFile(ctx, env, dst, src); err != nil {
|
||||
return errors.Wrapf(err, "Unable to copy file %s", src)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package triplestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
|
|
@ -10,19 +11,17 @@ import (
|
|||
func (ts *Triplestore) BackupName() string { return "triplestore" }
|
||||
|
||||
// Backup makes a backup of all Triplestore repositories databases into the path dest.
|
||||
func (ts *Triplestore) Backup(context component.StagingContext) error {
|
||||
func (ts *Triplestore) Backup(scontext component.StagingContext) error {
|
||||
return scontext.AddDirectory("", func(ctx context.Context) error {
|
||||
// list all the directories
|
||||
repos, err := ts.listRepositories(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// list all the directories
|
||||
repos, err := ts.listRepositories()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// then backup each file separatly
|
||||
return context.AddDirectory("", func() error {
|
||||
for _, repo := range repos {
|
||||
if err := context.AddFile(repo.ID+".nq", func(file io.Writer) error {
|
||||
_, err := ts.SnapshotDB(file, repo.ID)
|
||||
if err := scontext.AddFile(repo.ID+".nq", func(ctx context.Context, file io.Writer) error {
|
||||
_, err := ts.SnapshotDB(ctx, file, repo.ID)
|
||||
return err
|
||||
}); err != nil {
|
||||
return err
|
||||
|
|
@ -32,8 +31,8 @@ func (ts *Triplestore) Backup(context component.StagingContext) error {
|
|||
})
|
||||
}
|
||||
|
||||
func (ts Triplestore) listRepositories() (repos []Repository, err error) {
|
||||
res, err := ts.OpenRaw("GET", "/rest/repositories", nil, "", "application/json")
|
||||
func (ts Triplestore) listRepositories(ctx context.Context) (repos []Repository, err error) {
|
||||
res, err := ts.OpenRaw(ctx, "GET", "/rest/repositories", nil, "", "application/json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package triplestore
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
|
|
@ -30,7 +31,7 @@ type TriplestoreUserAppSettings struct {
|
|||
//
|
||||
// When bodyName is non-empty, expect body to be a byte slice representing a multipart/form-data upload with the given name.
|
||||
// When bodyName is empty, simply marshal body as application/json
|
||||
func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName string, accept string) (*http.Response, error) {
|
||||
func (ts Triplestore) OpenRaw(ctx context.Context, method, url string, body interface{}, bodyName string, accept string) (*http.Response, error) {
|
||||
var reader io.Reader
|
||||
|
||||
var contentType string
|
||||
|
|
@ -66,7 +67,7 @@ func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName str
|
|||
DisableKeepAlives: true,
|
||||
},
|
||||
}
|
||||
req, err := http.NewRequest(method, ts.BaseURL+url, reader)
|
||||
req, err := http.NewRequestWithContext(ctx, method, ts.BaseURL+url, reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -86,23 +87,23 @@ func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName str
|
|||
|
||||
// Wait waits for the connection to the Triplestore to succeed.
|
||||
// This is achieved using a polling strategy.
|
||||
func (ts Triplestore) Wait() error {
|
||||
func (ts Triplestore) Wait(ctx context.Context) error {
|
||||
n := stream.FromNil()
|
||||
return timex.TickUntilFunc(func(time.Time) bool {
|
||||
res, err := ts.OpenRaw("GET", "/rest/repositories", nil, "", "")
|
||||
res, err := ts.OpenRaw(ctx, "GET", "/rest/repositories", nil, "", "")
|
||||
n.EPrintf("[Triplestore.Wait]: %s\n", err)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer res.Body.Close()
|
||||
return true
|
||||
}, ts.PollContext, ts.PollInterval)
|
||||
}, ctx, ts.PollInterval)
|
||||
}
|
||||
|
||||
// PurgeUser deletes the specified user from the triplestore.
|
||||
// When the user does not exist, returns no error.
|
||||
func (ts Triplestore) PurgeUser(user string) error {
|
||||
res, err := ts.OpenRaw("DELETE", "/rest/security/users/"+user, nil, "", "")
|
||||
func (ts Triplestore) PurgeUser(ctx context.Context, user string) error {
|
||||
res, err := ts.OpenRaw(ctx, "DELETE", "/rest/security/users/"+user, nil, "", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -114,8 +115,8 @@ func (ts Triplestore) PurgeUser(user string) error {
|
|||
|
||||
// PurgeRepo deletes the specified repo from the triplestore.
|
||||
// When the repo does not exist, returns no error.
|
||||
func (ts Triplestore) PurgeRepo(repo string) error {
|
||||
res, err := ts.OpenRaw("DELETE", "/rest/repositories/"+repo, nil, "", "")
|
||||
func (ts Triplestore) PurgeRepo(ctx context.Context, repo string) error {
|
||||
res, err := ts.OpenRaw(ctx, "DELETE", "/rest/repositories/"+repo, nil, "", "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package triplestore
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
_ "embed"
|
||||
|
|
@ -20,19 +21,19 @@ var errTripleStoreFailedRepository = exit.Error{
|
|||
//go:embed create-repo.ttl
|
||||
var createRepoTTL []byte
|
||||
|
||||
func (ts *Triplestore) Provision(instance models.Instance, domain string) error {
|
||||
return ts.CreateRepository(instance.GraphDBRepository, domain, instance.GraphDBUsername, instance.GraphDBPassword)
|
||||
func (ts *Triplestore) Provision(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return ts.CreateRepository(ctx, instance.GraphDBRepository, domain, instance.GraphDBUsername, instance.GraphDBPassword)
|
||||
}
|
||||
|
||||
func (ts *Triplestore) Purge(instance models.Instance, domain string) error {
|
||||
func (ts *Triplestore) Purge(ctx context.Context, instance models.Instance, domain string) error {
|
||||
return errorx.First(
|
||||
ts.PurgeRepo(instance.GraphDBRepository),
|
||||
ts.PurgeUser(instance.GraphDBUsername),
|
||||
ts.PurgeRepo(ctx, instance.GraphDBRepository),
|
||||
ts.PurgeUser(ctx, instance.GraphDBUsername),
|
||||
)
|
||||
}
|
||||
|
||||
func (ts *Triplestore) CreateRepository(name, domain, user, password string) error {
|
||||
if err := ts.Wait(); err != nil {
|
||||
func (ts *Triplestore) CreateRepository(ctx context.Context, name, domain, user, password string) error {
|
||||
if err := ts.Wait(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +49,7 @@ func (ts *Triplestore) CreateRepository(name, domain, user, password string) err
|
|||
|
||||
// do the create!
|
||||
{
|
||||
res, err := ts.OpenRaw("POST", "/rest/repositories", createRepo.Bytes(), "config", "")
|
||||
res, err := ts.OpenRaw(ctx, "POST", "/rest/repositories", createRepo.Bytes(), "config", "")
|
||||
if err != nil {
|
||||
return errTripleStoreFailedRepository.WithMessageF(err)
|
||||
}
|
||||
|
|
@ -60,7 +61,7 @@ func (ts *Triplestore) CreateRepository(name, domain, user, password string) err
|
|||
|
||||
// create the user and grant them access
|
||||
{
|
||||
res, err := ts.OpenRaw("POST", "/rest/security/users/"+user, TriplestoreUserPayload{
|
||||
res, err := ts.OpenRaw(ctx, "POST", "/rest/security/users/"+user, TriplestoreUserPayload{
|
||||
Password: password,
|
||||
AppSettings: TriplestoreUserAppSettings{
|
||||
DefaultInference: true,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package triplestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
|
|
@ -13,10 +14,10 @@ func (Triplestore) SnapshotNeedsRunning() bool { return false }
|
|||
|
||||
func (Triplestore) SnapshotName() string { return "triplestore" }
|
||||
|
||||
func (ts *Triplestore) Snapshot(wisski models.Instance, context component.StagingContext) error {
|
||||
return context.AddDirectory(".", func() error {
|
||||
return context.AddFile(wisski.GraphDBRepository+".nq", func(file io.Writer) error {
|
||||
_, err := ts.SnapshotDB(file, wisski.GraphDBRepository)
|
||||
func (ts *Triplestore) Snapshot(wisski models.Instance, scontext component.StagingContext) error {
|
||||
return scontext.AddDirectory(".", func(ctx context.Context) error {
|
||||
return scontext.AddFile(wisski.GraphDBRepository+".nq", func(ctx context.Context, file io.Writer) error {
|
||||
_, err := ts.SnapshotDB(ctx, file, wisski.GraphDBRepository)
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
|
@ -25,8 +26,8 @@ func (ts *Triplestore) Snapshot(wisski models.Instance, context component.Stagin
|
|||
var errTSBackupWrongStatusCode = errors.New("Triplestore.Backup: Wrong status code")
|
||||
|
||||
// SnapshotDB snapshots the provided repository into dst
|
||||
func (ts Triplestore) SnapshotDB(dst io.Writer, repo string) (int64, error) {
|
||||
res, err := ts.OpenRaw("GET", "/repositories/"+repo+"/statements?infer=false", nil, "", "application/n-quads")
|
||||
func (ts Triplestore) SnapshotDB(ctx context.Context, dst io.Writer, repo string) (int64, error) {
|
||||
res, err := ts.OpenRaw(ctx, "GET", "/repositories/"+repo+"/statements?infer=false", nil, "", "application/n-quads")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package triplestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
|
@ -15,8 +14,7 @@ type Triplestore struct {
|
|||
|
||||
BaseURL string // upstream server url
|
||||
|
||||
PollContext context.Context // context to abort polling with
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
PollInterval time.Duration // duration to wait for during wait
|
||||
}
|
||||
|
||||
func (ts *Triplestore) Path() string {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package triplestore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
|
|
@ -11,15 +12,15 @@ import (
|
|||
|
||||
var errTriplestoreFailedSecurity = errors.New("failed to enable triplestore security: request did not succeed with HTTP 200 OK")
|
||||
|
||||
func (ts Triplestore) Update(io stream.IOStream) error {
|
||||
func (ts Triplestore) Update(ctx context.Context, io stream.IOStream) error {
|
||||
logging.LogMessage(io, "Waiting for Triplestore")
|
||||
if err := ts.Wait(); err != nil {
|
||||
if err := ts.Wait(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logging.LogMessage(io, "Resetting admin user password")
|
||||
{
|
||||
res, err := ts.OpenRaw("PUT", "/rest/security/users/"+ts.Config.TriplestoreAdminUser, TriplestoreUserPayload{
|
||||
res, err := ts.OpenRaw(ctx, "PUT", "/rest/security/users/"+ts.Config.TriplestoreAdminUser, TriplestoreUserPayload{
|
||||
Password: ts.Config.TriplestoreAdminPassword,
|
||||
AppSettings: TriplestoreUserAppSettings{
|
||||
DefaultInference: true,
|
||||
|
|
@ -51,7 +52,7 @@ func (ts Triplestore) Update(io stream.IOStream) error {
|
|||
|
||||
logging.LogMessage(io, "Enabling Triplestore security")
|
||||
{
|
||||
res, err := ts.OpenRaw("POST", "/rest/security", true, "", "")
|
||||
res, err := ts.OpenRaw(ctx, "POST", "/rest/security", true, "", "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to enable triplestore security: %s", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
package dis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
|
@ -32,8 +32,8 @@ type Distillery struct {
|
|||
// core holds the core of the distillery
|
||||
component.Still
|
||||
|
||||
// internal context for the distillery
|
||||
context context.Context
|
||||
// Where interactive progress is displayed
|
||||
Progress io.Writer
|
||||
|
||||
// Upstream holds information to connect to the various running
|
||||
// distillery components.
|
||||
|
|
@ -54,11 +54,6 @@ type Upstream struct {
|
|||
Solr string
|
||||
}
|
||||
|
||||
// Context returns a new Context belonging to this distillery
|
||||
func (dis *Distillery) Context() context.Context {
|
||||
return dis.context
|
||||
}
|
||||
|
||||
//
|
||||
// PUBLIC COMPONENT GETTERS
|
||||
//
|
||||
|
|
@ -110,17 +105,14 @@ func (dis *Distillery) allComponents() []initFunc {
|
|||
|
||||
manual(func(ts *triplestore.Triplestore) {
|
||||
ts.BaseURL = "http://" + dis.Upstream.Triplestore
|
||||
ts.PollContext = dis.Context()
|
||||
ts.PollInterval = time.Second
|
||||
}),
|
||||
manual(func(sql *sql.SQL) {
|
||||
sql.ServerURL = dis.Upstream.SQL
|
||||
sql.PollContext = dis.Context()
|
||||
sql.PollInterval = time.Second
|
||||
}),
|
||||
manual(func(s *solr.Solr) {
|
||||
s.BaseURL = dis.Upstream.Solr
|
||||
s.PollContext = dis.Context()
|
||||
s.PollInterval = time.Second
|
||||
}),
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ var errOpenConfig = exit.Error{
|
|||
// NewDistillery creates a new distillery from the provided flags
|
||||
func NewDistillery(params cli.Params, flags cli.Flags, req cli.Requirements) (dis *Distillery, err error) {
|
||||
dis = &Distillery{
|
||||
context: params.Context,
|
||||
Still: component.Still{
|
||||
Environment: new(environment.Native),
|
||||
},
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue