From f674e50e107e7d251f372f1113d0b84b169a5213 Mon Sep 17 00:00:00 2001 From: Tom Wiesing Date: Tue, 13 Sep 2022 16:34:18 +0200 Subject: [PATCH] snapshot: Separate into whitebox and blackbox backup --- internal/env/snapshot.go | 161 ++++++++++++++++++--------------------- pkg/opgroup/opgroup.go | 50 ++++++++++++ 2 files changed, 126 insertions(+), 85 deletions(-) create mode 100644 pkg/opgroup/opgroup.go diff --git a/internal/env/snapshot.go b/internal/env/snapshot.go index b160964..1328e72 100644 --- a/internal/env/snapshot.go +++ b/internal/env/snapshot.go @@ -8,13 +8,13 @@ import ( "os" "path/filepath" "strings" - "sync" "time" "github.com/FAU-CDI/wisski-distillery/pkg/bookkeeping" "github.com/FAU-CDI/wisski-distillery/pkg/countwriter" "github.com/FAU-CDI/wisski-distillery/pkg/fsx" "github.com/FAU-CDI/wisski-distillery/pkg/logging" + "github.com/FAU-CDI/wisski-distillery/pkg/opgroup" "github.com/FAU-CDI/wisski-distillery/pkg/password" "github.com/tkw1536/goprogram/stream" "golang.org/x/exp/slices" @@ -174,133 +174,127 @@ func (instance Instance) Snapshot(io stream.IOStream, desc SnapshotDescription) // do the create keeping track of time! logging.LogOperation(func() error { snapshot.StartTime = time.Now() - snapshot.makeBlackbox(io, instance) - snapshot.EndTime = time.Now() + snapshot.makeBlackbox(io, instance) + snapshot.makeWhitebox(io, instance) + + snapshot.EndTime = time.Now() return nil }, io, "Writing snapshot files") + slices.Sort(snapshot.Manifest) return } -// mainBlackbox runs the blackbox backup of the system. +// makeBlackbox runs the blackbox backup of the system. // It pauses the Instance, if a consistent state is required. func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, instance Instance) { stack := instance.Stack() + og := opgroup.NewOpGroup[string](4) + // stop the instance (unless it was explicitly asked to not do so!) if !snapshot.Description.Keepalive { logging.LogMessage(io, "Stopping instance") snapshot.ErrStop = stack.Down(io) + defer func() { logging.LogMessage(io, "Starting instance") snapshot.ErrStart = stack.Up(io) }() } - // create a wait group, and message channel - wg := &sync.WaitGroup{} - files := make(chan string, 4) - // write bookkeeping information - wg.Add(1) - go func() { - defer wg.Done() - + og.GoErr(func(files chan<- string) error { bkPath := filepath.Join(snapshot.Description.Dest, "bookkeeping.txt") files <- bkPath info, err := os.Create(bkPath) if err != nil { - snapshot.ErrBookkeep = err - return + return err } defer info.Close() // print whatever is in the database // TODO: This should be sql code, maybe gorm can do that? - _, snapshot.ErrBookkeep = fmt.Fprintf(info, "%#v\n", instance.Instance) - }() + _, err = fmt.Fprintf(info, "%#v\n", instance.Instance) + return err + }, &snapshot.ErrBookkeep) + + // backup the filesystem + og.GoErr(func(files chan<- string) error { + fsPath := filepath.Join(snapshot.Description.Dest, filepath.Base(instance.FilesystemBase)) + + // copy over whatever is in the base directory + return fsx.CopyDirectory(fsPath, instance.FilesystemBase, func(dst, src string) { + files <- dst + }) + }, &snapshot.ErrFilesystem) + + // backup the graph db repository + og.GoErr(func(files chan<- string) error { + tsPath := filepath.Join(snapshot.Description.Dest, instance.GraphDBRepository+".nq") + files <- tsPath + + nquads, err := os.Create(tsPath) + if err != nil { + return err + } + defer nquads.Close() + + // directly store the result + _, err = instance.dis.Triplestore().Backup(nquads, instance.GraphDBRepository) + return err + }, &snapshot.ErrTriplestore) + + // backup the sql database + og.GoErr(func(files chan<- string) error { + sqlPath := filepath.Join(snapshot.Description.Dest, snapshot.Instance.SqlDatabase+".sql") + files <- sqlPath + + sql, err := os.Create(sqlPath) + if err != nil { + return err + } + defer sql.Close() + + // directly store the result + return instance.dis.SQL().Backup(io, sql, instance.SqlDatabase) + }, &snapshot.ErrSQL) + + // wait for the group! + snapshot.waitGroup(io, og) +} + +// makeWhitebox runs the whitebox backup of the system. +// The instance should be running during this step. +func (snapshot *Snapshot) makeWhitebox(io stream.IOStream, instance Instance) { + og := opgroup.NewOpGroup[string](1) // write pathbuilders // TODO: Move this outside of the up/down stuff! - wg.Add(1) - go func() { - defer wg.Done() + og.GoErr(func(files chan<- string) error { pbPath := filepath.Join(snapshot.Description.Dest, "pathbuilders") files <- pbPath // create the directory! if err := os.Mkdir(pbPath, fs.ModeDir); err != nil { - snapshot.ErrPathbuilder = err - return + return err } // put in all the pathbuilders - snapshot.ErrPathbuilder = instance.ExportPathbuilders(pbPath) - }() + return instance.ExportPathbuilders(pbPath) + }, &snapshot.ErrPathbuilder) - // backup the filesystem - wg.Add(1) - go func() { - defer wg.Done() + // wait for the group! + snapshot.waitGroup(io, og) +} - fsPath := filepath.Join(snapshot.Description.Dest, filepath.Base(instance.FilesystemBase)) - - // copy over whatever is in the base directory - snapshot.ErrFilesystem = fsx.CopyDirectory(fsPath, instance.FilesystemBase, func(dst, src string) { - files <- dst - }) - }() - - // backup the graph db repository - wg.Add(1) - go func() { - defer wg.Done() - - tsPath := filepath.Join(snapshot.Description.Dest, instance.GraphDBRepository+".nq") - files <- tsPath - - nquads, err := os.Create(tsPath) - if err != nil { - snapshot.ErrTriplestore = err - return - } - defer nquads.Close() - - // directly store the result - _, snapshot.ErrTriplestore = instance.dis.Triplestore().Backup(nquads, instance.GraphDBRepository) - }() - - // backup the sql database - wg.Add(1) - go func() { - defer wg.Done() - - sqlPath := filepath.Join(snapshot.Description.Dest, snapshot.Instance.SqlDatabase+".sql") - files <- sqlPath - - sql, err := os.Create(sqlPath) - if err != nil { - snapshot.ErrSQL = err - return - } - defer sql.Close() - - // directly store the result - snapshot.ErrSQL = instance.dis.SQL().Backup(io, sql, instance.SqlDatabase) - }() - - // TODO: Backup the docker image - - // wait for the group, then close the message channel. - go func() { - wg.Wait() - close(files) - }() - - for file := range files { +// waitGroup waits for the +func (snapshot *Snapshot) waitGroup(io stream.IOStream, og *opgroup.OpGroup[string]) { + // wait for the messages to return + for file := range og.Wait() { // get the relative path to the root of the manifest. // nothing *should* go wrong, but in case it does, use the original path. path, err := filepath.Rel(snapshot.Description.Dest, file) @@ -314,9 +308,6 @@ func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, instance Instance) { snapshot.Manifest = append(snapshot.Manifest, path) } io.Println("") - - // make sure the manifest is sorted! - slices.Sort(snapshot.Manifest) } // WriteReport writes out the report belonging to this snapshot. diff --git a/pkg/opgroup/opgroup.go b/pkg/opgroup/opgroup.go new file mode 100644 index 0000000..a4b0306 --- /dev/null +++ b/pkg/opgroup/opgroup.go @@ -0,0 +1,50 @@ +// Package opgroup provides OpGroup +package opgroup + +import "sync" + +// OpGroup represents an operation group that can send messages to the waiting goroutine. +//The zero value is not ready for use, use [NewOpGroup] instead. +type OpGroup[M any] struct { + wg sync.WaitGroup + c chan M +} + +// NewOpGroup creates a new OpGroup. +// +// The internal buffer size for messages will be expectedSize. +// If unsure about buffer size, 0 is a valid choice. +func NewOpGroup[M any](expectedSize int) *OpGroup[M] { + return &OpGroup[M]{ + c: make(chan M, expectedSize), + } +} + +// Go schedules a new operation (implemented by worker) to run in a separate goroutine. +// worker is passed a send-only reference to the message channel which it can uszxe to send messages to. +func (op *OpGroup[M]) Go(worker func(c chan<- M)) { + op.wg.Add(1) + go func() { + defer op.wg.Done() + worker(op.c) + }() +} + +// GoErr is like Go, except that once the operation is finished, it writes the returned error into dest. +func (op *OpGroup[M]) GoErr(worker func(c chan<- M) error, dest *error) { + op.Go(func(c chan<- M) { + *dest = worker(c) + }) +} + +// Wait returns a receive-only reference to the message channel.// +// The message channel will be closed once all operations on this group have completed. +// +// The Wait function may only be called once. +func (op *OpGroup[M]) Wait() <-chan M { + go func() { + op.wg.Wait() + close(op.c) + }() + return op.c +}