From 3b112f1b8e76b09ff2c961e600d6ce047b1e01df Mon Sep 17 00:00:00 2001 From: Tom Wiesing Date: Sun, 2 Oct 2022 18:17:47 +0200 Subject: [PATCH] snapshots: Handle as separate components --- cmd/backup.go | 6 +- cmd/system_update.go | 4 +- internal/backup/backup.go | 161 ---------- internal/backup/context.go | 98 ------ internal/component/backup.go | 113 +++++++ internal/component/component.go | 38 --- internal/component/extras/bookkeeping.go | 29 ++ internal/component/extras/config.go | 2 +- internal/component/extras/filesystem.go | 24 ++ internal/component/extras/pathbuilders.go | 39 +++ internal/component/instances/instances.go | 10 + .../instances/wisski_pathbuilders.go | 28 -- internal/component/pool.go | 131 ++++++++ internal/component/snapshots/backup.go | 170 ++++++++++ .../snapshots/{snapshots.go => manager.go} | 9 +- internal/component/snapshots/manifest.go | 33 ++ .../{backup => component/snapshots}/report.go | 89 ++++-- internal/component/snapshots/snapshot.go | 125 ++++++++ internal/component/snapshots/snapshot_make.go | 301 ------------------ internal/component/sql/snapshot.go | 16 +- internal/component/triplestore/database.go | 15 - internal/component/triplestore/snapshot.go | 38 +++ internal/dis/component.go | 130 ++++---- internal/dis/distillery.go | 36 +++ pkg/opgroup/opgroup.go | 50 --- pkg/rlock/rlock.go | 44 +++ pkg/slicesx/slicesx.go | 10 + 27 files changed, 960 insertions(+), 789 deletions(-) delete mode 100644 internal/backup/backup.go delete mode 100644 internal/backup/context.go create mode 100644 internal/component/extras/bookkeeping.go create mode 100644 internal/component/extras/filesystem.go create mode 100644 internal/component/extras/pathbuilders.go create mode 100644 internal/component/pool.go create mode 100644 internal/component/snapshots/backup.go rename internal/component/snapshots/{snapshots.go => manager.go} (91%) create mode 100644 internal/component/snapshots/manifest.go rename internal/{backup => component/snapshots}/report.go (51%) create mode 100644 internal/component/snapshots/snapshot.go delete mode 100644 internal/component/snapshots/snapshot_make.go create mode 100644 internal/component/triplestore/snapshot.go delete mode 100644 pkg/opgroup/opgroup.go create mode 100644 pkg/rlock/rlock.go diff --git a/cmd/backup.go b/cmd/backup.go index 3250baf..361c403 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -2,7 +2,7 @@ package cmd import ( wisski_distillery "github.com/FAU-CDI/wisski-distillery" - "github.com/FAU-CDI/wisski-distillery/internal/backup" + "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" "github.com/FAU-CDI/wisski-distillery/internal/core" "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/logging" @@ -52,7 +52,7 @@ func (bk backupC) Run(context wisski_distillery.Context) error { var sPath string if !bk.StagingOnly { // regular mode: create a temporary staging directory - logging.LogMessage(context.IOStream, "Creating new snapshot staging directory") + logging.LogMessage(context.IOStream, "Creating new backup staging directory") sPath, err = dis.SnapshotManager().NewStagingDir("") if err != nil { return errSnapshotFailed.Wrap(err) @@ -82,7 +82,7 @@ func (bk backupC) Run(context wisski_distillery.Context) error { context.Println(sPath) logging.LogOperation(func() error { - backup := backup.New(context.IOStream, dis, backup.Description{ + backup := dis.SnapshotManager().NewBackup(context.IOStream, snapshots.BackupDescription{ Dest: sPath, Auto: bk.Positionals.Dest == "", ConcurrentSnapshots: bk.ConcurrentSnapshots, diff --git a/cmd/system_update.go b/cmd/system_update.go index 057df73..db79e9d 100644 --- a/cmd/system_update.go +++ b/cmd/system_update.go @@ -131,13 +131,13 @@ func (si systemupdate) Run(context wisski_distillery.Context) error { return nil }, - }, dis.Installables()) + }, dis.Installable()) }, context.IOStream, "Performing Stack Updates"); err != nil { return err } if err := logging.LogOperation(func() error { - for _, component := range dis.Updateable() { + for _, component := range dis.Updatable() { name := component.Name() if err := logging.LogOperation(func() error { return component.Update(context.IOStream) diff --git a/internal/backup/backup.go b/internal/backup/backup.go deleted file mode 100644 index 6542ddd..0000000 --- a/internal/backup/backup.go +++ /dev/null @@ -1,161 +0,0 @@ -// Package backup implements Distillery backups. -package backup - -import ( - "fmt" - "io" - "path/filepath" - "time" - - "github.com/FAU-CDI/wisski-distillery/internal/component" - "github.com/FAU-CDI/wisski-distillery/internal/component/instances" - "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" - "github.com/FAU-CDI/wisski-distillery/internal/dis" - "github.com/FAU-CDI/wisski-distillery/pkg/environment" - "github.com/FAU-CDI/wisski-distillery/pkg/logging" - "github.com/tkw1536/goprogram/status" - "github.com/tkw1536/goprogram/stream" - "golang.org/x/exp/slices" -) - -// New create a new Backup -func New(io stream.IOStream, dis *dis.Distillery, description Description) (backup Backup) { - backup.Description = description - - // catch anything critical that happened during the snapshot - defer func() { - backup.ErrPanic = recover() - }() - - // do the create keeping track of time! - logging.LogOperation(func() error { - backup.StartTime = time.Now().UTC() - backup.run(io, dis) - backup.EndTime = time.Now().UTC() - - return nil - }, io, "Writing backup files") - - return -} - -func (backup *Backup) run(ios stream.IOStream, dis *dis.Distillery) { - // - // MANIFEST - // - - manifest := make(chan string) // receive all the entries in the manifest - manifestDone := make(chan struct{}) // to signal that everything is finished - - go func() { - defer close(manifestDone) - - for file := range manifest { - // get the relative path to the root of the manifest - // or fallback to the absolute path! - path, err := filepath.Rel(backup.Description.Dest, file) - if err != nil { - path = file - } - - // add the file to the manifest array - backup.Manifest = append(backup.Manifest, path) - } - - // sort the manifest - slices.Sort(backup.Manifest) - }() - - // - // BACKUP COMPONENTS - // - - // create a new status display - backups := dis.Backupable() - backup.ComponentErrors = make(map[string]error, len(backups)) - - // Component backup tasks - logging.LogOperation(func() error { - st := status.NewWithCompat(ios.Stdout, 0) - st.Start() - defer st.Stop() - - return status.UseErrorGroup(st, status.Group[component.Backupable, error]{ - PrefixString: func(item component.Backupable, index int) string { - return fmt.Sprintf("[backup %q]: ", item.Name()) - }, - PrefixAlign: true, - - Handler: func(bc component.Backupable, index int, writer io.Writer) error { - // create a new context for the backup! - context := &context{ - env: dis.Core.Environment, - io: stream.NewIOStream(writer, writer, nil, 0), - dst: filepath.Join(backup.Description.Dest, bc.BackupName()), - files: manifest, - } - - backup.ComponentErrors[bc.Name()] = bc.Backup(context) - return nil - }, - }, backups) - }, ios, "Backing up core components") - - // backup instances - logging.LogOperation(func() error { - st := status.NewWithCompat(ios.Stdout, 0) - st.Start() - defer st.Stop() - - instancesBackupDir := filepath.Join(backup.Description.Dest, "instances") - if err := dis.Core.Environment.Mkdir(instancesBackupDir, environment.DefaultDirPerm); err != nil { - backup.InstanceListErr = err - return nil - } - - // list all instances - wissKIs, err := dis.Instances().All() - if err != nil { - backup.InstanceListErr = err - return nil - } - - // re-use the backup of the snapshots - backup.InstanceSnapshots = status.Group[instances.WissKI, snapshots.Snapshot]{ - PrefixString: func(item instances.WissKI, index int) string { - return fmt.Sprintf("[snapshot %s]: ", item.Slug) - }, - PrefixAlign: true, - - Handler: func(instance instances.WissKI, index int, writer io.Writer) snapshots.Snapshot { - dir := filepath.Join(instancesBackupDir, instance.Slug) - if err := dis.Core.Environment.Mkdir(dir, environment.DefaultDirPerm); err != nil { - return snapshots.Snapshot{ - ErrPanic: err, - } - } - - manifest <- dir - - return dis.SnapshotManager().NewSnapshot(instance, stream.NewIOStream(writer, writer, nil, 0), snapshots.SnapshotDescription{ - Dest: dir, - }) - }, - ResultString: func(res snapshots.Snapshot, item instances.WissKI, index int) string { - return "done" - }, - WaitString: status.DefaultWaitString[instances.WissKI], - HandlerLimit: backup.Description.ConcurrentSnapshots, - }.Use(st, wissKIs) - return nil - }, ios, "creating instance snapshots") - - // close the manifest - close(manifest) - <-manifestDone - - // sort the instances manifest - slices.SortFunc(backup.InstanceSnapshots, func(a, b snapshots.Snapshot) bool { - return a.Instance.Slug < b.Instance.Slug - }) -} diff --git a/internal/backup/context.go b/internal/backup/context.go deleted file mode 100644 index ace658d..0000000 --- a/internal/backup/context.go +++ /dev/null @@ -1,98 +0,0 @@ -package backup - -import ( - "errors" - "io" - "path/filepath" - - "github.com/FAU-CDI/wisski-distillery/pkg/environment" - "github.com/FAU-CDI/wisski-distillery/pkg/fsx" - "github.com/tkw1536/goprogram/stream" -) - -// context implements [components.BackupContext] -type context struct { - env environment.Environment - io stream.IOStream - dst string // destination directory - files chan string // files channel -} - -func (bc *context) sendPath(path string) { - - // resolve the path, or bail out! - // TODO: Use the relative path here! - dst, err := bc.resolve(path) - if err != nil { - return - } - - bc.io.Println(dst) - bc.files <- dst -} - -func (bc *context) IO() stream.IOStream { - return bc.io -} - -var errResolveAbsolute = errors.New("resolve: path must be relative") - -func (bc *context) resolve(path string) (dest string, err error) { - if path == "" { - return bc.dst, nil - } - if filepath.IsAbs(path) { - return "", errResolveAbsolute - } - return filepath.Join(bc.dst, path), nil -} - -func (bc *context) AddDirectory(path string, op func() error) error { - // resolve the path! - dst, err := bc.resolve(path) - if err != nil { - return err - } - - // run the make directory - if err := bc.env.Mkdir(dst, environment.DefaultDirPerm); err != nil { - return err - } - - // tell the files that we are creating it! - bc.sendPath(path) - - // and run the files! - // TODO: Add to manifest of some sort - return op() -} - -func (bc *context) CopyFile(dst, src string) error { - dstPath, err := bc.resolve(dst) - if err != nil { - return err - } - bc.sendPath(dst) - return fsx.CopyFile(bc.env, dstPath, src) -} - -func (bc *context) AddFile(path string, op func(file io.Writer) error) error { - // resolve the path! - dst, err := bc.resolve(path) - if err != nil { - return err - } - - // create the file - file, err := bc.env.Create(dst, environment.DefaultFilePerm) - if err != nil { - return err - } - defer file.Close() - - // tell them that we are creating it! - bc.sendPath(path) - - // and do whatever they wanted to do - return op(file) -} diff --git a/internal/component/backup.go b/internal/component/backup.go index 3a1c969..afb3e0d 100644 --- a/internal/component/backup.go +++ b/internal/component/backup.go @@ -2,8 +2,12 @@ package component import ( "io" + "path/filepath" "github.com/FAU-CDI/wisski-distillery/internal/models" + "github.com/FAU-CDI/wisski-distillery/pkg/environment" + "github.com/FAU-CDI/wisski-distillery/pkg/fsx" + "github.com/pkg/errors" "github.com/tkw1536/goprogram/stream" ) @@ -32,6 +36,9 @@ type StagingContext interface { // CopyFile copies a file from src to dst. CopyFile(dst, src string) error + // CopyDirectory copies a directory from src to dst. + CopyDirectory(dst, src string) error + // AddFile creates a new file at the provided path inside the destination. // Passing the empty path creates the destination as a file. // @@ -56,3 +63,109 @@ type Snapshotable interface { // Snapshot snapshots a part of the instance Snapshot(wisski models.Instance, context StagingContext) error } + +// NewStagingContext returns a new [StagingContext] +func NewStagingContext(env environment.Environment, io stream.IOStream, path string, manifest chan<- string) StagingContext { + return &stagingContext{ + env: env, + io: io, + path: path, + manifest: manifest, + } +} + +// stagingContext implements [components.StagingContext] +type stagingContext struct { + env environment.Environment // environment + io stream.IOStream // context the files are sent to + path string // path to send files to + manifest chan<- string // channel the manifest is sent to +} + +func (bc *stagingContext) sendPath(path string) { + // resolve the path, or bail out! + // TODO: Use the relative path here! + dst, err := bc.resolve(path) + if err != nil { + return + } + + bc.io.Println(dst) + bc.manifest <- dst +} + +func (bc *stagingContext) IO() stream.IOStream { + return bc.io +} + +var errResolveAbsolute = errors.New("resolve: path must be relative") + +func (bc *stagingContext) resolve(path string) (dest string, err error) { + if path == "" { + return bc.path, nil + } + if filepath.IsAbs(path) { + return "", errResolveAbsolute + } + return filepath.Join(bc.path, path), nil +} + +func (sc *stagingContext) AddDirectory(path string, op func() error) error { + // resolve the path! + dst, err := sc.resolve(path) + if err != nil { + return err + } + + // run the make directory + if err := sc.env.Mkdir(dst, environment.DefaultDirPerm); err != nil { + return err + } + + // tell the files that we are creating it! + sc.sendPath(path) + + // and run the files! + return op() +} + +func (sc *stagingContext) CopyFile(dst, src string) error { + dstPath, err := sc.resolve(dst) + if err != nil { + return err + } + sc.sendPath(dst) + return fsx.CopyFile(sc.env, dstPath, src) +} + +func (sc *stagingContext) CopyDirectory(dst, src string) error { + dstPath, err := sc.resolve(dst) + if err != nil { + return err + } + + return fsx.CopyDirectory(sc.env, dstPath, src, func(dst, src string) { + sc.sendPath(dst) + }) +} + +func (sc *stagingContext) AddFile(path string, op func(file io.Writer) error) error { + // resolve the path! + dst, err := sc.resolve(path) + if err != nil { + return err + } + + // create the file + file, err := sc.env.Create(dst, environment.DefaultFilePerm) + if err != nil { + return err + } + defer file.Close() + + // tell them that we are creating it! + sc.sendPath(path) + + // and do whatever they wanted to do + return op(file) +} diff --git a/internal/component/component.go b/internal/component/component.go index 1a27d03..bc1aa86 100644 --- a/internal/component/component.go +++ b/internal/component/component.go @@ -1,12 +1,6 @@ // Package component holds the main abstraction for components. package component -import ( - "reflect" - - "github.com/FAU-CDI/wisski-distillery/pkg/lazy" -) - // Component represents a logical subsystem of the distillery. // Every component must embed [ComponentBase] and should be initialized using [Initialize]. // @@ -36,35 +30,3 @@ type ComponentBase struct { func (cb *ComponentBase) getBase() *ComponentBase { return cb } - -// Initialize makes or returns a component based on a lazy. -// -// C is the type of component to initialize. It must be backed by a pointer, or Initialize will panic. -// -// dis is the distillery to initialize components for -// field is a pointer to the appropriate struct field within the distillery components -// init is called with a new non-nil component to initialize it. -// It may be nil, to indicate no additional initialization is required. -// -// makeComponent returns the new or existing component instance -func Initialize[C Component](core Core, field *lazy.Lazy[C], init func(C)) C { - - // get the typeof C and make sure that it is a pointer type! - typC := reflect.TypeOf((*C)(nil)).Elem() - if typC.Kind() != reflect.Pointer { - panic("Initialize: C must be backed by a pointer") - } - - // return the field - return field.Get(func() (c C) { - c = reflect.New(typC.Elem()).Interface().(C) - if init != nil { - init(c) - } - - base := c.getBase() - base.Core = core - - return - }) -} diff --git a/internal/component/extras/bookkeeping.go b/internal/component/extras/bookkeeping.go new file mode 100644 index 0000000..31d0417 --- /dev/null +++ b/internal/component/extras/bookkeeping.go @@ -0,0 +1,29 @@ +package extras + +import ( + "fmt" + "io" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/models" +) + +type Bookkeeping struct { + component.ComponentBase +} + +func (Bookkeeping) Name() string { return "bookkeeping" } + +// SnapshotNeedsRunning returns if this Snapshotable requires a running instance. +func (Bookkeeping) SnapshotNeedsRunning() bool { return false } + +// SnapshotName returns a new name to be used as an argument for path. +func (Bookkeeping) SnapshotName() string { return "bookkeeping.txt" } + +// Snapshot creates a snapshot of this instance +func (*Bookkeeping) Snapshot(wisski models.Instance, context component.StagingContext) error { + return context.AddFile(".", func(file io.Writer) error { + _, err := fmt.Fprintf(file, "%#v\n", wisski) + return err + }) +} diff --git a/internal/component/extras/config.go b/internal/component/extras/config.go index 71844a6..0a8a7d6 100644 --- a/internal/component/extras/config.go +++ b/internal/component/extras/config.go @@ -11,7 +11,7 @@ type Config struct { component.ComponentBase } -func (Config) Name() string { return "extra-config" } +func (Config) Name() string { return "config" } func (*Config) BackupName() string { return "config" diff --git a/internal/component/extras/filesystem.go b/internal/component/extras/filesystem.go new file mode 100644 index 0000000..dbda83e --- /dev/null +++ b/internal/component/extras/filesystem.go @@ -0,0 +1,24 @@ +package extras + +import ( + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/models" +) + +// Filesystem implements snapshotting an instnace filesystem +type Filesystem struct { + component.ComponentBase +} + +func (Filesystem) Name() string { return "filesystem" } + +// SnapshotNeedsRunning returns if this Snapshotable requires a running instance. +func (Filesystem) SnapshotNeedsRunning() bool { return false } + +// SnapshotName returns a new name to be used as an argument for path. +func (Filesystem) SnapshotName() string { return "data" } + +// Snapshot creates a snapshot of this instance +func (*Filesystem) Snapshot(wisski models.Instance, context component.StagingContext) error { + return context.CopyDirectory(".", wisski.FilesystemBase) +} diff --git a/internal/component/extras/pathbuilders.go b/internal/component/extras/pathbuilders.go new file mode 100644 index 0000000..b868dff --- /dev/null +++ b/internal/component/extras/pathbuilders.go @@ -0,0 +1,39 @@ +package extras + +import ( + "io" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/models" +) + +type Pathbuilders struct { + component.ComponentBase + Instances *instances.Instances +} + +func (Pathbuilders) Name() string { return "pathbuilders" } + +func (Pathbuilders) SnapshotNeedsRunning() bool { return true } + +func (Pathbuilders) SnapshotName() string { return "pathbuilders" } + +func (pbs *Pathbuilders) Snapshot(wisski models.Instance, context component.StagingContext) error { + return context.AddDirectory(".", func() error { + builders, err := pbs.Instances.Instance(wisski).AllPathbuilders() + if err != nil { + return err + } + + for name, bytes := range builders { + if err := context.AddFile(name+".xml", func(file io.Writer) error { + _, err := file.Write([]byte(bytes)) + return err + }); err != nil { + return err + } + } + return nil + }) +} diff --git a/internal/component/instances/instances.go b/internal/component/instances/instances.go index 0f67659..52c508d 100644 --- a/internal/component/instances/instances.go +++ b/internal/component/instances/instances.go @@ -37,6 +37,16 @@ var errSQL = exit.Error{ ExitCode: exit.ExitGeneric, } +// Instance is a convenience function to return an instance based on a model slug. +// When the instance does not exist, returns nil. +func (instances *Instances) Instance(instance models.Instance) *WissKI { + i, err := instances.WissKI(instance.Slug) + if err != nil { + return nil + } + return &i +} + // WissKI returns the WissKI with the provided slug, if it exists. // It the WissKI does not exist, returns ErrWissKINotFound. func (instances *Instances) WissKI(slug string) (i WissKI, err error) { diff --git a/internal/component/instances/wisski_pathbuilders.go b/internal/component/instances/wisski_pathbuilders.go index 67502e9..0463289 100644 --- a/internal/component/instances/wisski_pathbuilders.go +++ b/internal/component/instances/wisski_pathbuilders.go @@ -1,14 +1,9 @@ package instances import ( - "fmt" - "path/filepath" - _ "embed" - "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/tkw1536/goprogram/stream" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -34,26 +29,3 @@ func (wisski *WissKI) AllPathbuilders() (pathbuilders map[string]string, err err err = wisski.ExecPHPScript(stream.FromDebug(), &pathbuilders, exportPathbuilderPHP, "all_xml") return } - -// ExportPathbuilders writes pathbuilders into the directory dest -func (wisski *WissKI) ExportPathbuilders(dest string) error { - pathbuilders, err := wisski.AllPathbuilders() - if err != nil { - return err - } - - // sort the names of the pathbuilders - names := maps.Keys(pathbuilders) - slices.Sort(names) - - // write each into a file! - for _, name := range names { - pbxml := []byte(pathbuilders[name]) - name := filepath.Join(dest, fmt.Sprintf("%s.xml", name)) - if err := environment.WriteFile(wisski.instances.Core.Environment, name, pbxml, environment.DefaultFilePerm); err != nil { - return err - } - } - - return nil -} diff --git a/internal/component/pool.go b/internal/component/pool.go new file mode 100644 index 0000000..74cea2a --- /dev/null +++ b/internal/component/pool.go @@ -0,0 +1,131 @@ +package component + +import ( + "reflect" + "sync" + "sync/atomic" + + "github.com/FAU-CDI/wisski-distillery/pkg/rlock" + "github.com/tkw1536/goprogram/lib/reflectx" +) + +// Pool represents a pool of components +type Pool struct { + rLock rlock.RLock + + // the actual queue of initi functions! + nested uint64 // is the q active? + queue []func(thread int32) + + // global initalization! + initOnce sync.Once + + // components and lock! + cLock sync.Mutex + components map[string]Component +} + +func (p *Pool) init() { + p.initOnce.Do(func() { + p.components = make(map[string]Component) + }) +} + +// ComponentDescription describes a component +type ComponentDescription struct { + Type reflect.Type + Elem reflect.Type + Name string +} + +// New creates a new ComponentDescription +func (cd ComponentDescription) New() any { + return reflect.New(cd.Elem).Interface() +} + +// GetDescription gets the description of a component type +func GetDescription[C Component]() (desc ComponentDescription) { + desc.Type = reflectx.TypeOf[C]() + if desc.Type.Kind() != reflect.Pointer { + panic("GetDescription: C must be backed by a pointer") + } + desc.Elem = desc.Type.Elem() + desc.Name = desc.Elem.PkgPath() + "." + desc.Elem.Name() + return +} + +func Find[C Component](components []Component) C { + for _, c := range components { + if cc, ok := c.(C); ok { + return cc + } + } + panic("FindComponent: Invalid arguments") +} + +// Put initializes a single component in the pool. +// +// Init may initialize components, but not call functions on them! +func PutComponent[C Component](p *Pool, thread int32, core Core, init func(component C, thread int32)) C { + p.init() + + p.rLock.Lock(int(thread)) + defer p.rLock.Unlock() + + // get a description of the type + cd := GetDescription[C]() + + // find a field to put the component into + instance, created := func() (C, bool) { + p.cLock.Lock() + defer p.cLock.Unlock() + + // create the component + field, ok := p.components[cd.Name] + if ok { + return field.(C), false + } + + // create a new component + p.components[cd.Name] = cd.New().(Component) + return p.components[cd.Name].(C), true + }() + + // if we already created the instance, then there is nothing to do + // as someone else will init it! + if !created { + return instance + } + + // setup the core initialization now! + instance.getBase().Core = core + + if init == nil { + return instance + } + + // if we are in nested mode, then delay the init! + if !atomic.CompareAndSwapUint64(&p.nested, 0, 1) { + func() { + p.queue = append(p.queue, func(thread int32) { + init(instance, thread) + }) + }() + return instance + } + defer atomic.StoreUint64(&p.nested, 0) + + // init ourselves first (everything below will be nested) + init(instance, thread) + + // do all the delayed initializations + index := 0 + for len(p.queue) > index { + p.queue[index](thread) + index++ + } + p.queue = nil + + // and return the instance + return instance +} diff --git a/internal/component/snapshots/backup.go b/internal/component/snapshots/backup.go new file mode 100644 index 0000000..a56c51b --- /dev/null +++ b/internal/component/snapshots/backup.go @@ -0,0 +1,170 @@ +package snapshots + +import ( + "fmt" + "io" + "path/filepath" + "time" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + + "github.com/FAU-CDI/wisski-distillery/pkg/environment" + "github.com/FAU-CDI/wisski-distillery/pkg/logging" + "github.com/tkw1536/goprogram/status" + "github.com/tkw1536/goprogram/stream" + "golang.org/x/exp/slices" +) + +// Backup describes a backup +type Backup struct { + Description BackupDescription + + // Start and End Time of the backup + StartTime time.Time + EndTime time.Time + + // various error states, which are ignored when creating the snapshot + ErrPanic interface{} + + // errors for the various components + ComponentErrors map[string]error + + // TODO: Make this proper + ConfigFileErr error + + // Snapshots containing instances + InstanceListErr error + InstanceSnapshots []Snapshot + + // List of files included + WithManifest +} + +// BackupDescription provides a description for a backup +type BackupDescription struct { + Dest string // Destination path + Auto bool // Was the path created automatically? + + ConcurrentSnapshots int // maximum number of concurrent snapshots +} + +// New create a new Backup +func (manager *Manager) NewBackup(io stream.IOStream, description BackupDescription) (backup Backup) { + backup.Description = description + + // catch anything critical that happened during the snapshot + defer func() { + backup.ErrPanic = recover() + }() + + // do the create keeping track of time! + logging.LogOperation(func() error { + backup.StartTime = time.Now().UTC() + backup.run(io, manager) + backup.EndTime = time.Now().UTC() + + return nil + }, io, "Writing backup files") + + return +} + +func (backup *Backup) run(ios stream.IOStream, manager *Manager) { + // create a manifest + manifest, done := backup.handleManifest(backup.Description.Dest) + defer done() + + // create a new status display + backups := manager.Backupable + backup.ComponentErrors = make(map[string]error, len(backups)) + + // Component backup tasks + logging.LogOperation(func() error { + st := status.NewWithCompat(ios.Stdout, 0) + st.Start() + defer st.Stop() + + errors := status.Group[component.Backupable, error]{ + PrefixString: func(item component.Backupable, index int) string { + return fmt.Sprintf("[backup %q]: ", item.Name()) + }, + PrefixAlign: true, + + Handler: func(bc component.Backupable, index int, writer io.Writer) error { + return bc.Backup( + component.NewStagingContext( + manager.Environment, + stream.NewIOStream(writer, writer, nil, 0), + filepath.Join(backup.Description.Dest, bc.BackupName()), + manifest, + ), + ) + }, + + ResultString: status.DefaultErrorString[component.Backupable], + }.Use(st, backups) + + for i, bc := range backups { + backup.ComponentErrors[bc.Name()] = errors[i] + } + + return nil + }, ios, "Backing up core components") + + // backup instances + logging.LogOperation(func() error { + st := status.NewWithCompat(ios.Stdout, 0) + st.Start() + defer st.Stop() + + instancesBackupDir := filepath.Join(backup.Description.Dest, "instances") + if err := manager.Environment.Mkdir(instancesBackupDir, environment.DefaultDirPerm); err != nil { + backup.InstanceListErr = err + return nil + } + + // list all instances + wissKIs, err := manager.Instances.All() + if err != nil { + backup.InstanceListErr = err + return nil + } + + // make a backup of the snapshots + backup.InstanceSnapshots = status.Group[instances.WissKI, Snapshot]{ + PrefixString: func(item instances.WissKI, index int) string { + return fmt.Sprintf("[snapshot %q]: ", item.Slug) + }, + PrefixAlign: true, + + Handler: func(instance instances.WissKI, index int, writer io.Writer) Snapshot { + dir := filepath.Join(instancesBackupDir, instance.Slug) + if err := manager.Environment.Mkdir(dir, environment.DefaultDirPerm); err != nil { + return Snapshot{ + ErrPanic: err, + } + } + + manifest <- dir + + return manager.NewSnapshot(instance, stream.NewIOStream(writer, writer, nil, 0), SnapshotDescription{ + Dest: dir, + }) + }, + ResultString: func(res Snapshot, item instances.WissKI, index int) string { + return "done" + }, + WaitString: status.DefaultWaitString[instances.WissKI], + HandlerLimit: backup.Description.ConcurrentSnapshots, + }.Use(st, wissKIs) + + // sort the instances + slices.SortFunc(backup.InstanceSnapshots, func(a, b Snapshot) bool { + return a.Instance.Slug < b.Instance.Slug + }) + + return nil + }, ios, "Creating instance snapshots") + +} diff --git a/internal/component/snapshots/snapshots.go b/internal/component/snapshots/manager.go similarity index 91% rename from internal/component/snapshots/snapshots.go rename to internal/component/snapshots/manager.go index bc7e767..90f7b32 100644 --- a/internal/component/snapshots/snapshots.go +++ b/internal/component/snapshots/manager.go @@ -6,8 +6,7 @@ import ( "time" "github.com/FAU-CDI/wisski-distillery/internal/component" - "github.com/FAU-CDI/wisski-distillery/internal/component/sql" - "github.com/FAU-CDI/wisski-distillery/internal/component/triplestore" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/fsx" "github.com/FAU-CDI/wisski-distillery/pkg/password" @@ -16,9 +15,10 @@ import ( // Manager manages snapshots and backups type Manager struct { component.ComponentBase + Instances *instances.Instances - TS *triplestore.Triplestore - SQL *sql.SQL + Snapshotable []component.Snapshotable + Backupable []component.Backupable } func (Manager) Name() string { return "snapshots" } @@ -68,6 +68,7 @@ func (*Manager) newSnapshotName(prefix string) string { func (dis *Manager) NewStagingDir(prefix string) (path string, err error) { for path == "" || environment.IsExist(err) { path = filepath.Join(dis.StagingPath(), dis.newSnapshotName(prefix)) + fmt.Println("path =>", prefix, "err => ", err) err = dis.Core.Environment.Mkdir(path, environment.DefaultFilePerm) } if err != nil { diff --git a/internal/component/snapshots/manifest.go b/internal/component/snapshots/manifest.go new file mode 100644 index 0000000..13f0b4c --- /dev/null +++ b/internal/component/snapshots/manifest.go @@ -0,0 +1,33 @@ +package snapshots + +import ( + "path/filepath" +) + +type WithManifest struct { + Manifest []string +} + +func (wm *WithManifest) handleManifest(dest string) (chan<- string, func()) { + manifest := make(chan string) + done := make(chan struct{}) + go func() { + defer close(done) + + for file := range manifest { + // get the relative path to the root of the manifest. + // nothing *should* go wrong, but in case it does, use the original path. + path, err := filepath.Rel(dest, file) + if err != nil { + path = file + } + + // add the manifest + wm.Manifest = append(wm.Manifest, path) + } + }() + return manifest, func() { + close(manifest) + <-done + } +} diff --git a/internal/backup/report.go b/internal/component/snapshots/report.go similarity index 51% rename from internal/backup/report.go rename to internal/component/snapshots/report.go index a19ab91..a72c34e 100644 --- a/internal/backup/report.go +++ b/internal/component/snapshots/report.go @@ -1,4 +1,4 @@ -package backup +package snapshots import ( "encoding/json" @@ -6,46 +6,83 @@ import ( "io" "path/filepath" "strings" - "time" - "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" "github.com/FAU-CDI/wisski-distillery/pkg/countwriter" "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/logging" "github.com/tkw1536/goprogram/stream" ) -// Description provides a description for a backup -type Description struct { - Dest string // Destination path - Auto bool // Was the path created automatically? - - ConcurrentSnapshots int // maximum number of concurrent snapshots +func (snapshot Snapshot) String() string { + var builder strings.Builder + snapshot.Report(&builder) + return builder.String() } -// Backup describes a backup -type Backup struct { - Description Description +// Report writes a report from snapshot into w +func (snapshot Snapshot) Report(w io.Writer) (int, error) { + ww := countwriter.NewCountWriter(w) - // Start and End Time of the backup - StartTime time.Time - EndTime time.Time + encoder := json.NewEncoder(ww) + encoder.SetIndent("", " ") - // various error states, which are ignored when creating the snapshot - ErrPanic interface{} + io.WriteString(ww, "======= Begin Snapshot Report "+snapshot.Instance.Slug+" =======\n") - // errors for the various components - ComponentErrors map[string]error + fmt.Fprintf(ww, "Slug: %s\n", snapshot.Instance.Slug) + fmt.Fprintf(ww, "Dest: %s\n", snapshot.Description.Dest) - // TODO: Make this proper - ConfigFileErr error + fmt.Fprintf(ww, "Start: %s\n", snapshot.StartTime) + fmt.Fprintf(ww, "End: %s\n", snapshot.EndTime) + io.WriteString(ww, "\n") - // Snapshots containing instances - InstanceListErr error - InstanceSnapshots []snapshots.Snapshot + io.WriteString(ww, "======= Description =======\n") + encoder.Encode(snapshot.Description) + io.WriteString(ww, "\n") - // List of files included - Manifest []string + io.WriteString(ww, "======= Instance =======\n") + encoder.Encode(snapshot.Instance) + io.WriteString(ww, "\n") + + io.WriteString(ww, "======= Errors =======\n") + fmt.Fprintf(ww, "Panic: %v\n", snapshot.ErrPanic) + fmt.Fprintf(ww, "Start: %s\n", snapshot.ErrStart) + fmt.Fprintf(ww, "Stop: %s\n", snapshot.ErrStop) + + fmt.Fprintf(ww, "Whitebox: %s\n", snapshot.ErrWhitebox) + fmt.Fprintf(ww, "Blackbox: %s\n", snapshot.ErrBlackbox) + + io.WriteString(ww, "\n") + + io.WriteString(ww, "======= Manifest =======\n") + for _, file := range snapshot.Manifest { + io.WriteString(ww, file+"\n") + } + + io.WriteString(ww, "\n") + + io.WriteString(ww, "======= End Snapshot Report "+snapshot.Instance.Slug+"=======\n") + + return ww.Sum() +} + +// WriteReport writes out the report belonging to this snapshot. +// It is a separate function, to allow writing it indepenently of the rest. +func (snapshot *Snapshot) WriteReport(env environment.Environment, stream stream.IOStream) error { + return logging.LogOperation(func() error { + reportPath := filepath.Join(snapshot.Description.Dest, "report.txt") + stream.Println(reportPath) + + // create the report file! + report, err := env.Create(reportPath, environment.DefaultFilePerm) + if err != nil { + return err + } + defer report.Close() + + // print the report into it! + _, err = io.WriteString(report, snapshot.String()) + return err + }, stream, "Writing snapshot report") } // Strings turns this backup into a string for the BackupReport. diff --git a/internal/component/snapshots/snapshot.go b/internal/component/snapshots/snapshot.go new file mode 100644 index 0000000..c32ff9b --- /dev/null +++ b/internal/component/snapshots/snapshot.go @@ -0,0 +1,125 @@ +package snapshots + +import ( + "fmt" + "io" + "path/filepath" + "time" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/models" + "github.com/FAU-CDI/wisski-distillery/pkg/logging" + "github.com/FAU-CDI/wisski-distillery/pkg/slicesx" + "github.com/tkw1536/goprogram/status" + "github.com/tkw1536/goprogram/stream" + "golang.org/x/exp/slices" +) + +// SnapshotDescription is a description for a snapshot +type SnapshotDescription struct { + Dest string // destination path + Log bool // should we log the creation of this snapshot? + Keepalive bool // should we keep the instance alive while making the snapshot? +} + +// Snapshot represents the result of generating a snapshot +type Snapshot struct { + Description SnapshotDescription + Instance models.Instance + + // Start and End Time of the snapshot + StartTime time.Time + EndTime time.Time + + // Generic Panic that may have occured + ErrPanic interface{} + ErrStart error + ErrStop error + ErrWhitebox map[string]error + ErrBlackbox map[string]error + + // List of files included + WithManifest +} + +// Snapshot creates a new snapshot of this instance into dest +func (snapshots *Manager) NewSnapshot(instance instances.WissKI, io stream.IOStream, desc SnapshotDescription) (snapshot Snapshot) { + // setup the snapshot + snapshot.Description = desc + snapshot.Instance = instance.Instance + + // capture anything critical, and write the end time + defer func() { + snapshot.ErrPanic = recover() + }() + + // do the create keeping track of time! + logging.LogOperation(func() error { + snapshot.StartTime = time.Now().UTC() + + snapshot.ErrWhitebox = snapshot.makeParts(io, snapshots, instance, false) + snapshot.ErrBlackbox = snapshot.makeParts(io, snapshots, instance, true) + + snapshot.EndTime = time.Now().UTC() + return nil + }, io, "Writing snapshot files") + + slices.Sort(snapshot.Manifest) + return +} + +func (snapshot *Snapshot) makeParts(ios stream.IOStream, snapshots *Manager, instance instances.WissKI, needsRunning bool) map[string]error { + if !needsRunning && !snapshot.Description.Keepalive { + stack := instance.Barrel() + + logging.LogMessage(ios, "Stopping instance") + snapshot.ErrStop = stack.Down(ios) + + defer func() { + logging.LogMessage(ios, "Starting instance") + snapshot.ErrStart = stack.Up(ios) + }() + } + // handle writing the manifest! + manifest, done := snapshot.handleManifest(snapshot.Description.Dest) + defer done() + + // create a new status + st := status.NewWithCompat(ios.Stdout, 0) + st.Start() + defer st.Stop() + + // get all the components + comps := slicesx.FilterClone(snapshots.Snapshotable, func(sc component.Snapshotable) bool { + return sc.SnapshotNeedsRunning() == needsRunning + }) + + results := make(map[string]error, len(comps)) + + errors := status.Group[component.Snapshotable, error]{ + PrefixString: func(item component.Snapshotable, index int) string { + return fmt.Sprintf("[snapshot %q]: ", item.Name()) + }, + PrefixAlign: true, + + Handler: func(sc component.Snapshotable, index int, writer io.Writer) error { + return sc.Snapshot( + instance.Instance, + component.NewStagingContext( + snapshots.Environment, + stream.NewIOStream(writer, writer, nil, 0), + filepath.Join(snapshot.Description.Dest, sc.SnapshotName()), + manifest, + ), + ) + }, + + ResultString: status.DefaultErrorString[component.Snapshotable], + }.Use(st, comps) + + for i, wc := range comps { + results[wc.Name()] = errors[i] + } + return results +} diff --git a/internal/component/snapshots/snapshot_make.go b/internal/component/snapshots/snapshot_make.go deleted file mode 100644 index ff301e7..0000000 --- a/internal/component/snapshots/snapshot_make.go +++ /dev/null @@ -1,301 +0,0 @@ -package snapshots - -import ( - "encoding/json" - "fmt" - "io" - "path/filepath" - "strings" - "time" - - "github.com/FAU-CDI/wisski-distillery/internal/component/instances" - "github.com/FAU-CDI/wisski-distillery/internal/models" - "github.com/FAU-CDI/wisski-distillery/pkg/countwriter" - "github.com/FAU-CDI/wisski-distillery/pkg/environment" - "github.com/FAU-CDI/wisski-distillery/pkg/fsx" - "github.com/FAU-CDI/wisski-distillery/pkg/logging" - "github.com/FAU-CDI/wisski-distillery/pkg/opgroup" - "github.com/tkw1536/goprogram/status" - "github.com/tkw1536/goprogram/stream" - "golang.org/x/exp/slices" -) - -// SnapshotDescription is a description for a snapshot -type SnapshotDescription struct { - Dest string // destination path - Log bool // should we log the creation of this snapshot? - Keepalive bool // should we keep the instance alive while making the snapshot? -} - -// Snapshot represents the result of generating a snapshot -type Snapshot struct { - Description SnapshotDescription - Instance models.Instance - - // Start and End Time of the snapshot - StartTime time.Time - EndTime time.Time - - // Generic Panic that may have occured - ErrPanic interface{} - - // Errors during starting and stopping the system - ErrStart error - ErrStop error - - // List of files included - Manifest []string - - // Errors during other parts - ErrBookkeep error - ErrPathbuilder error - ErrFilesystem error - ErrTriplestore error - ErrSQL error -} - -func (snapshot Snapshot) String() string { - var builder strings.Builder - snapshot.Report(&builder) - return builder.String() -} - -// Report writes a report from snapshot into w -func (snapshot Snapshot) Report(w io.Writer) (int, error) { - ww := countwriter.NewCountWriter(w) - - // TODO: Errors of the writer! - encoder := json.NewEncoder(ww) - encoder.SetIndent("", " ") - - io.WriteString(ww, "======= Begin Snapshot Report "+snapshot.Instance.Slug+" =======\n") - - fmt.Fprintf(ww, "Slug: %s\n", snapshot.Instance.Slug) - fmt.Fprintf(ww, "Dest: %s\n", snapshot.Description.Dest) - - fmt.Fprintf(ww, "Start: %s\n", snapshot.StartTime) - fmt.Fprintf(ww, "End: %s\n", snapshot.EndTime) - io.WriteString(ww, "\n") - - io.WriteString(ww, "======= Description =======\n") - encoder.Encode(snapshot.Description) - io.WriteString(ww, "\n") - - io.WriteString(ww, "======= Instance =======\n") - encoder.Encode(snapshot.Instance) - io.WriteString(ww, "\n") - - io.WriteString(ww, "======= Errors =======\n") - fmt.Fprintf(ww, "Panic: %v\n", snapshot.ErrPanic) - fmt.Fprintf(ww, "Start: %s\n", snapshot.ErrStart) - fmt.Fprintf(ww, "Stop: %s\n", snapshot.ErrStop) - fmt.Fprintf(ww, "Bookkeep: %s\n", snapshot.ErrBookkeep) - fmt.Fprintf(ww, "Pathbuilder: %s\n", snapshot.ErrPathbuilder) - fmt.Fprintf(ww, "Filesystem: %s\n", snapshot.ErrFilesystem) - fmt.Fprintf(ww, "Triplestore: %s\n", snapshot.ErrTriplestore) - fmt.Fprintf(ww, "SQL: %s\n", snapshot.ErrSQL) - io.WriteString(ww, "\n") - - io.WriteString(ww, "======= Manifest =======\n") - for _, file := range snapshot.Manifest { - io.WriteString(ww, file+"\n") - } - - io.WriteString(ww, "\n") - - io.WriteString(ww, "======= End Snapshot Report "+snapshot.Instance.Slug+"=======\n") - - return ww.Sum() -} - -// Snapshot creates a new snapshot of this instance into dest -func (snapshots *Manager) NewSnapshot(instance instances.WissKI, io stream.IOStream, desc SnapshotDescription) (snapshot Snapshot) { - // setup the snapshot - snapshot.Description = desc - snapshot.Instance = instance.Instance - - // capture anything critical, and write the end time - defer func() { - snapshot.ErrPanic = recover() - }() - - // do the create keeping track of time! - logging.LogOperation(func() error { - snapshot.StartTime = time.Now().UTC() - - snapshot.makeBlackbox(io, snapshots, instance) - snapshot.makeWhitebox(io, snapshots, instance) - - snapshot.EndTime = time.Now().UTC() - return nil - }, io, "Writing snapshot files") - - slices.Sort(snapshot.Manifest) - return -} - -// makeBlackbox runs the blackbox backup of the system. -// It pauses the Instance, if a consistent state is required. -func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, snapshots *Manager, instance instances.WissKI) { - stack := instance.Barrel() - - og := opgroup.NewOpGroup[string](4) - - st := status.NewWithCompat(io.Stdout, 0) - st.Start() - defer st.Stop() - - // stop the instance (unless it was explicitly asked to not do so!) - if !snapshot.Description.Keepalive { - logging.LogMessage(io, "Stopping instance") - snapshot.ErrStop = stack.Down(io) - - defer func() { - logging.LogMessage(io, "Starting instance") - snapshot.ErrStart = stack.Up(io) - }() - } - - // write bookkeeping information - og.GoErr(func(files chan<- string) error { - line := st.OpenLine("[snapshot bookkeeping]: ", "") - defer line.Close() - defer fmt.Fprintln(line, "done") - - bkPath := filepath.Join(snapshot.Description.Dest, "bookkeeping.txt") - fmt.Fprintln(line, bkPath) - files <- bkPath - - info, err := snapshots.Core.Environment.Create(bkPath, environment.DefaultFilePerm) - if err != nil { - return err - } - defer info.Close() - - // print whatever is in the database - // TODO: This should be sql code, maybe gorm can do that? - _, err = fmt.Fprintf(info, "%#v\n", instance.Instance) - return err - }, &snapshot.ErrBookkeep) - - // backup the filesystem - og.GoErr(func(files chan<- string) error { - line := st.OpenLine("[snapshot filesystem]: ", "") - defer line.Close() - defer fmt.Fprintln(line, "done") - - fsPath := filepath.Join(snapshot.Description.Dest, filepath.Base(instance.FilesystemBase)) - - // copy over whatever is in the base directory - defer fmt.Fprintln(line, "done") - return fsx.CopyDirectory(snapshots.Core.Environment, fsPath, instance.FilesystemBase, func(dst, src string) { - fmt.Fprintln(line, dst) - files <- dst - }) - - }, &snapshot.ErrFilesystem) - - // backup the graph db repository - og.GoErr(func(files chan<- string) error { - line := st.OpenLine("[snapshot triplestore]: ", "") - defer line.Close() - defer fmt.Fprintln(line, "done") - - tsPath := filepath.Join(snapshot.Description.Dest, instance.GraphDBRepository+".nq") - fmt.Fprintln(line, tsPath) - files <- tsPath - - nquads, err := snapshots.Core.Environment.Create(tsPath, environment.DefaultFilePerm) - if err != nil { - return err - } - defer nquads.Close() - - // directly store the result - _, err = snapshots.TS.SnapshotDB(nquads, instance.GraphDBRepository) - return err - }, &snapshot.ErrTriplestore) - - // backup the sql database - og.GoErr(func(files chan<- string) error { - line := st.OpenLine("[snapshot sql]: ", "") - defer line.Close() - defer fmt.Fprintln(line, "done") - - sqlPath := filepath.Join(snapshot.Description.Dest, snapshot.Instance.SqlDatabase+".sql") - fmt.Fprintln(line, sqlPath) - files <- sqlPath - - sql, err := snapshots.Core.Environment.Create(sqlPath, environment.DefaultFilePerm) - if err != nil { - return err - } - defer sql.Close() - - // directly store the result - return snapshots.SQL.SnapshotDB(io, sql, instance.SqlDatabase) - }, &snapshot.ErrSQL) - - // wait for the group! - snapshot.waitGroup(io, og) -} - -// makeWhitebox runs the whitebox backup of the system. -// The instance should be running during this step. -func (snapshot *Snapshot) makeWhitebox(io stream.IOStream, snapshots *Manager, instance instances.WissKI) { - og := opgroup.NewOpGroup[string](1) - - // write pathbuilders - og.GoErr(func(files chan<- string) error { - - pbPath := filepath.Join(snapshot.Description.Dest, "pathbuilders") - files <- pbPath - - // create the directory! - if err := snapshots.Core.Environment.Mkdir(pbPath, environment.DefaultDirPerm); err != nil { - return err - } - - // put in all the pathbuilders - return instance.ExportPathbuilders(pbPath) - }, &snapshot.ErrPathbuilder) - - // wait for the group! - snapshot.waitGroup(io, og) -} - -// waitGroup waits for the -func (snapshot *Snapshot) waitGroup(io stream.IOStream, og *opgroup.OpGroup[string]) { - // wait for the messages to return - for file := range og.Wait() { - // get the relative path to the root of the manifest. - // nothing *should* go wrong, but in case it does, use the original path. - path, err := filepath.Rel(snapshot.Description.Dest, file) - if err != nil { - path = file - } - - // add the manifest - snapshot.Manifest = append(snapshot.Manifest, path) - } -} - -// WriteReport writes out the report belonging to this snapshot. -// It is a separate function, to allow writing it indepenently of the rest. -func (snapshot *Snapshot) WriteReport(env environment.Environment, stream stream.IOStream) error { - return logging.LogOperation(func() error { - reportPath := filepath.Join(snapshot.Description.Dest, "report.txt") - stream.Println(reportPath) - - // create the report file! - report, err := env.Create(reportPath, environment.DefaultFilePerm) - if err != nil { - return err - } - defer report.Close() - - // print the report into it! - _, err = io.WriteString(report, snapshot.String()) - return err - }, stream, "Writing snapshot report") -} diff --git a/internal/component/sql/snapshot.go b/internal/component/sql/snapshot.go index e70e03c..28b94ed 100644 --- a/internal/component/sql/snapshot.go +++ b/internal/component/sql/snapshot.go @@ -3,11 +3,25 @@ package sql import ( "io" + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/models" "github.com/tkw1536/goprogram/stream" ) +func (SQL) SnapshotNeedsRunning() bool { return false } + +func (SQL) SnapshotName() string { return "sql" } + +func (sql *SQL) Snapshot(wisski models.Instance, context component.StagingContext) error { + return context.AddDirectory(".", func() error { + return context.AddFile(wisski.SqlDatabase+".sql", func(file io.Writer) error { + return sql.SnapshotDB(context.IO(), file, wisski.SqlDatabase) + }) + }) +} + // SnapshotDB makes a backup of the sql database into dest. -func (sql SQL) SnapshotDB(io stream.IOStream, dest io.Writer, database string) error { +func (sql *SQL) SnapshotDB(io stream.IOStream, dest io.Writer, database string) error { io = io.Streams(dest, nil, nil, 0).NonInteractive() code, err := sql.Stack(sql.Environment).Exec(io, "sql", "mysqldump", "--databases", database) diff --git a/internal/component/triplestore/database.go b/internal/component/triplestore/database.go index 8fd4542..3580a05 100644 --- a/internal/component/triplestore/database.go +++ b/internal/component/triplestore/database.go @@ -122,21 +122,6 @@ func (ts Triplestore) PurgeRepo(repo string) error { return nil } -var errTSBackupWrongStatusCode = errors.New("Distillery.Backup: Wrong status code") - -// SnapshotDB snapshots the provided repository into dst -func (ts Triplestore) SnapshotDB(dst io.Writer, repo string) (int64, error) { - res, err := ts.OpenRaw("GET", "/repositories/"+repo+"/statements?infer=false", nil, "", "application/n-quads") - if err != nil { - return 0, err - } - if res.StatusCode != http.StatusOK { - return 0, errTSBackupWrongStatusCode - } - defer res.Body.Close() - return io.Copy(dst, res.Body) -} - type Repository struct { ID string `json:"id"` Title string `json:"title"` diff --git a/internal/component/triplestore/snapshot.go b/internal/component/triplestore/snapshot.go new file mode 100644 index 0000000..8adea00 --- /dev/null +++ b/internal/component/triplestore/snapshot.go @@ -0,0 +1,38 @@ +package triplestore + +import ( + "io" + "net/http" + + "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/models" + "github.com/pkg/errors" +) + +func (Triplestore) SnapshotNeedsRunning() bool { return false } + +func (Triplestore) SnapshotName() string { return "triplestore" } + +func (ts *Triplestore) Snapshot(wisski models.Instance, context component.StagingContext) error { + return context.AddDirectory(".", func() error { + return context.AddFile(wisski.GraphDBRepository+".nq", func(file io.Writer) error { + _, err := ts.SnapshotDB(file, wisski.GraphDBRepository) + return err + }) + }) +} + +var errTSBackupWrongStatusCode = errors.New("Triplestore.Backup: Wrong status code") + +// SnapshotDB snapshots the provided repository into dst +func (ts Triplestore) SnapshotDB(dst io.Writer, repo string) (int64, error) { + res, err := ts.OpenRaw("GET", "/repositories/"+repo+"/statements?infer=false", nil, "", "application/n-quads") + if err != nil { + return 0, err + } + if res.StatusCode != http.StatusOK { + return 0, errTSBackupWrongStatusCode + } + defer res.Body.Close() + return io.Copy(dst, res.Body) +} diff --git a/internal/dis/component.go b/internal/dis/component.go index 8dcb043..b2808f8 100644 --- a/internal/dis/component.go +++ b/internal/dis/component.go @@ -1,6 +1,7 @@ package dis import ( + "sync/atomic" "time" "github.com/FAU-CDI/wisski-distillery/internal/component" @@ -13,7 +14,6 @@ import ( "github.com/FAU-CDI/wisski-distillery/internal/component/triplestore" "github.com/FAU-CDI/wisski-distillery/internal/component/web" "github.com/FAU-CDI/wisski-distillery/internal/core" - "github.com/FAU-CDI/wisski-distillery/pkg/lazy" ) // components holds the various components of the distillery @@ -21,68 +21,61 @@ import ( // // The caller is responsible for syncronizing access across multiple goroutines. type components struct { - - // installable components - web lazy.Lazy[*web.Web] - control lazy.Lazy[*control.Control] - ssh lazy.Lazy[*ssh.SSH] - ts lazy.Lazy[*triplestore.Triplestore] - sql lazy.Lazy[*sql.SQL] - - // other components - instances lazy.Lazy[*instances.Instances] - snapshots lazy.Lazy[*snapshots.Manager] - - // extras components - extrasConfig lazy.Lazy[*extras.Config] + t int32 // t is the previously used thread id! + pool component.Pool } // // Individual Components // -func (dis *Distillery) Web() *web.Web { - return component.Initialize(dis.Core, &dis.components.web, nil) +func (c *components) thread() int32 { + return atomic.AddInt32(&c.t, 1) } -func (d *Distillery) Control() *control.Control { - return component.Initialize(d.Core, &d.components.control, func(control *control.Control) { +func (dis *Distillery) cWeb(thread int32) *web.Web { + return component.PutComponent[*web.Web](&dis.pool, thread, dis.Core, nil) +} + +func (dis *Distillery) cControl(thread int32) *control.Control { + return component.PutComponent(&dis.pool, thread, dis.Core, func(control *control.Control, thread int32) { control.ResolverFile = core.PrefixConfig - control.Instances = d.Instances() + control.Instances = dis.cInstances(thread) }) } -func (dis *Distillery) SSH() *ssh.SSH { - return component.Initialize(dis.Core, &dis.components.ssh, nil) +func (dis *Distillery) cSSH(thread int32) *ssh.SSH { + return component.PutComponent[*ssh.SSH](&dis.pool, thread, dis.Core, nil) } -func (dis *Distillery) SQL() *sql.SQL { - return component.Initialize(dis.Core, &dis.components.sql, func(sql *sql.SQL) { +func (dis *Distillery) cSQL(thread int32) *sql.SQL { + return component.PutComponent(&dis.pool, thread, dis.Core, func(sql *sql.SQL, thread int32) { sql.ServerURL = dis.Upstream.SQL sql.PollContext = dis.Context() sql.PollInterval = time.Second }) } -func (dis *Distillery) Triplestore() *triplestore.Triplestore { - return component.Initialize(dis.Core, &dis.components.ts, func(ts *triplestore.Triplestore) { +func (dis *Distillery) cTriplestore(thread int32) *triplestore.Triplestore { + return component.PutComponent(&dis.pool, thread, dis.Core, func(ts *triplestore.Triplestore, thread int32) { ts.BaseURL = "http://" + dis.Upstream.Triplestore ts.PollContext = dis.Context() ts.PollInterval = time.Second }) } -func (dis *Distillery) Instances() *instances.Instances { - return component.Initialize(dis.Core, &dis.components.instances, func(instances *instances.Instances) { - instances.SQL = dis.SQL() - instances.TS = dis.Triplestore() +func (dis *Distillery) cInstances(thread int32) *instances.Instances { + return component.PutComponent(&dis.pool, thread, dis.Core, func(instances *instances.Instances, thread int32) { + instances.SQL = dis.cSQL(thread) + instances.TS = dis.cTriplestore(thread) }) } -func (dis *Distillery) SnapshotManager() *snapshots.Manager { - return component.Initialize(dis.Core, &dis.components.snapshots, func(snapshots *snapshots.Manager) { - snapshots.SQL = dis.SQL() - snapshots.TS = dis.Triplestore() +func (dis *Distillery) cSnapshotManager(thread int32) *snapshots.Manager { + return component.PutComponent(&dis.pool, thread, dis.Core, func(snapshots *snapshots.Manager, thread int32) { + snapshots.Instances = dis.cInstances(thread) + snapshots.Snapshotable = dis.cSnapshotable(thread) + snapshots.Backupable = dis.cBackupable(thread) }) } @@ -90,26 +83,42 @@ func (dis *Distillery) SnapshotManager() *snapshots.Manager { // EXTRAS COMPONENTS // -func (dis *Distillery) ExtrasConfig() *extras.Config { - return component.Initialize(dis.Core, &dis.components.extrasConfig, nil) +func (dis *Distillery) cExtrasConfig(thread int32) *extras.Config { + return component.PutComponent[*extras.Config](&dis.pool, thread, dis.Core, nil) +} + +func (dis *Distillery) cExtrasBookkeeping(thread int32) *extras.Bookkeeping { + return component.PutComponent[*extras.Bookkeeping](&dis.pool, thread, dis.Core, nil) +} + +func (dis *Distillery) cExtrasFilesystem(thread int32) *extras.Filesystem { + return component.PutComponent[*extras.Filesystem](&dis.pool, thread, dis.Core, nil) +} + +func (dis *Distillery) cExtrasPathbuilders(thread int32) *extras.Pathbuilders { + return component.PutComponent(&dis.pool, thread, dis.Core, func(pbs *extras.Pathbuilders, thread int32) { + pbs.Instances = dis.cInstances(thread) + }) } // // ALL COMPONENTS // -func (dis *Distillery) Components() []component.Component { +func (dis *Distillery) cComponents(thread int32) []component.Component { return []component.Component{ - dis.Web(), - dis.Control(), - dis.SSH(), - dis.Triplestore(), - dis.SQL(), - dis.Instances(), - dis.SnapshotManager(), + dis.cWeb(thread), + dis.cControl(thread), + dis.cSSH(thread), + dis.cTriplestore(thread), + dis.cSQL(thread), + dis.cInstances(thread), + dis.cSnapshotManager(thread), - // extras components - dis.ExtrasConfig(), + dis.cExtrasConfig(thread), + dis.cExtrasBookkeeping(thread), + dis.cExtrasFilesystem(thread), + dis.cExtrasPathbuilders(thread), } } @@ -117,29 +126,28 @@ func (dis *Distillery) Components() []component.Component { // COMPONENT SUBTYPE GETTERS // -// Backupable returns all the components that can be backuped up. -func (dis *Distillery) Backupable() []component.Backupable { - return getComponentSubtype[component.Backupable](dis) +func (dis *Distillery) cInstallables(thread int32) []component.Installable { + return getComponentSubtype[component.Installable](dis, thread) } -// Installables returns all components that can be installed -func (dis *Distillery) Installables() []component.Installable { - return getComponentSubtype[component.Installable](dis) +func (dis *Distillery) cUpdateable(thread int32) []component.Updatable { + return getComponentSubtype[component.Updatable](dis, thread) } -// Installables returns all components that can be installed -func (dis *Distillery) Updateable() []component.Updatable { - return getComponentSubtype[component.Updatable](dis) +func (dis *Distillery) cBackupable(thread int32) []component.Backupable { + return getComponentSubtype[component.Backupable](dis, thread) } -// Provisionable returns all components which can be provisioned -func (dis *Distillery) Provisionable() []component.Provisionable { - return getComponentSubtype[component.Provisionable](dis) +func (dis *Distillery) cProvisionable(thread int32) []component.Provisionable { + return getComponentSubtype[component.Provisionable](dis, thread) } -// getComponentSubtype gets all components of type T -func getComponentSubtype[T component.Component](dis *Distillery) (components []T) { - all := dis.Components() +func (dis *Distillery) cSnapshotable(thread int32) []component.Snapshotable { + return getComponentSubtype[component.Snapshotable](dis, thread) +} + +func getComponentSubtype[T component.Component](dis *Distillery, thread int32) (components []T) { + all := dis.cComponents(thread) components = make([]T, 0, len(all)) for _, c := range all { diff --git a/internal/dis/distillery.go b/internal/dis/distillery.go index a32a594..914c085 100644 --- a/internal/dis/distillery.go +++ b/internal/dis/distillery.go @@ -4,6 +4,12 @@ import ( "context" "github.com/FAU-CDI/wisski-distillery/internal/component" + "github.com/FAU-CDI/wisski-distillery/internal/component/control" + "github.com/FAU-CDI/wisski-distillery/internal/component/instances" + "github.com/FAU-CDI/wisski-distillery/internal/component/snapshots" + "github.com/FAU-CDI/wisski-distillery/internal/component/sql" + "github.com/FAU-CDI/wisski-distillery/internal/component/ssh" + "github.com/FAU-CDI/wisski-distillery/internal/component/triplestore" ) // Distillery represents a WissKI Distillery @@ -34,3 +40,33 @@ type Upstream struct { func (dis *Distillery) Context() context.Context { return context.Background() } + +// +// PUBLIC COMPONENT GETTERS +// + +func (dis *Distillery) Control() *control.Control { + return dis.cControl(dis.thread()) +} +func (dis *Distillery) SSH() *ssh.SSH { + return dis.cSSH(dis.thread()) +} +func (dis *Distillery) SQL() *sql.SQL { + return dis.cSQL(dis.thread()) +} +func (dis *Distillery) Triplestore() *triplestore.Triplestore { + return dis.cTriplestore(dis.thread()) +} +func (dis *Distillery) Instances() *instances.Instances { + return dis.cInstances(dis.thread()) +} +func (dis *Distillery) SnapshotManager() *snapshots.Manager { + return dis.cSnapshotManager(dis.thread()) +} + +func (dis *Distillery) Installable() []component.Installable { return dis.cInstallables(dis.thread()) } +func (dis *Distillery) Updatable() []component.Updatable { return dis.cUpdateable(dis.thread()) } + +func (dis *Distillery) Provisionable() []component.Provisionable { + return dis.cProvisionable(dis.thread()) +} diff --git a/pkg/opgroup/opgroup.go b/pkg/opgroup/opgroup.go deleted file mode 100644 index 5488f06..0000000 --- a/pkg/opgroup/opgroup.go +++ /dev/null @@ -1,50 +0,0 @@ -// Package opgroup provides OpGroup -package opgroup - -import "sync" - -// OpGroup represents an operation group that can send messages to the waiting goroutine. -// The zero value is not ready for use, use [NewOpGroup] instead. -type OpGroup[M any] struct { - wg sync.WaitGroup - c chan M -} - -// NewOpGroup creates a new OpGroup. -// -// The internal buffer size for messages will be expectedSize. -// If unsure about buffer size, 0 is a valid choice. -func NewOpGroup[M any](expectedSize int) *OpGroup[M] { - return &OpGroup[M]{ - c: make(chan M, expectedSize), - } -} - -// Go schedules a new operation (implemented by worker) to run in a separate goroutine. -// worker is passed a send-only reference to the message channel which it can uszxe to send messages to. -func (op *OpGroup[M]) Go(worker func(c chan<- M)) { - op.wg.Add(1) - go func() { - defer op.wg.Done() - worker(op.c) - }() -} - -// GoErr is like Go, except that once the operation is finished, it writes the returned error into dest. -func (op *OpGroup[M]) GoErr(worker func(c chan<- M) error, dest *error) { - op.Go(func(c chan<- M) { - *dest = worker(c) - }) -} - -// Wait returns a receive-only reference to the message channel. -// The message channel will be closed once all operations on this group have completed. -// -// The Wait function may only be called once. -func (op *OpGroup[M]) Wait() <-chan M { - go func() { - op.wg.Wait() - close(op.c) - }() - return op.c -} diff --git a/pkg/rlock/rlock.go b/pkg/rlock/rlock.go new file mode 100644 index 0000000..efb4bdd --- /dev/null +++ b/pkg/rlock/rlock.go @@ -0,0 +1,44 @@ +package rlock + +import ( + "sync" + "time" +) + +type RLock struct { + m sync.Mutex // m is held internally + + held bool + holder int + counter uint64 +} + +func (rm *RLock) Lock(id int) { + for { + rm.m.Lock() + if !rm.held { + rm.held = true + rm.holder = id + break + } else if rm.held && rm.holder == id { + break + } else { + rm.m.Unlock() + time.Sleep(time.Millisecond) + continue + } + } + + rm.counter++ + rm.m.Unlock() +} + +func (rm *RLock) Unlock() { + rm.m.Lock() + rm.counter-- + if rm.counter == 0 { + rm.held = false + rm.holder = 0 + } + rm.m.Unlock() +} diff --git a/pkg/slicesx/slicesx.go b/pkg/slicesx/slicesx.go index 37d0ef6..5538e1a 100644 --- a/pkg/slicesx/slicesx.go +++ b/pkg/slicesx/slicesx.go @@ -26,6 +26,16 @@ func Filter[T any](values []T, filter func(T) bool) []T { return results } +// FilterClone is like [Filter], but creates a new slice +func FilterClone[T any](values []T, filter func(T) bool) (results []T) { + for _, value := range values { + if filter(value) { + results = append(results, value) + } + } + return +} + // NonSequential sorts values, and then removes elements for which test() returns true. // NonSequential does not re-allocate, but uses the existing slice. func NonSequential[T constraints.Ordered](values []T, test func(prev, current T) bool) []T {