snapshot: Separate into whitebox and blackbox backup
This commit is contained in:
parent
ac43221932
commit
f674e50e10
2 changed files with 126 additions and 85 deletions
161
internal/env/snapshot.go
vendored
161
internal/env/snapshot.go
vendored
|
|
@ -8,13 +8,13 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/bookkeeping"
|
"github.com/FAU-CDI/wisski-distillery/pkg/bookkeeping"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/countwriter"
|
"github.com/FAU-CDI/wisski-distillery/pkg/countwriter"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/fsx"
|
"github.com/FAU-CDI/wisski-distillery/pkg/fsx"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||||
|
"github.com/FAU-CDI/wisski-distillery/pkg/opgroup"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/password"
|
"github.com/FAU-CDI/wisski-distillery/pkg/password"
|
||||||
"github.com/tkw1536/goprogram/stream"
|
"github.com/tkw1536/goprogram/stream"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
|
@ -174,133 +174,127 @@ func (instance Instance) Snapshot(io stream.IOStream, desc SnapshotDescription)
|
||||||
// do the create keeping track of time!
|
// do the create keeping track of time!
|
||||||
logging.LogOperation(func() error {
|
logging.LogOperation(func() error {
|
||||||
snapshot.StartTime = time.Now()
|
snapshot.StartTime = time.Now()
|
||||||
snapshot.makeBlackbox(io, instance)
|
|
||||||
snapshot.EndTime = time.Now()
|
|
||||||
|
|
||||||
|
snapshot.makeBlackbox(io, instance)
|
||||||
|
snapshot.makeWhitebox(io, instance)
|
||||||
|
|
||||||
|
snapshot.EndTime = time.Now()
|
||||||
return nil
|
return nil
|
||||||
}, io, "Writing snapshot files")
|
}, io, "Writing snapshot files")
|
||||||
|
|
||||||
|
slices.Sort(snapshot.Manifest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// mainBlackbox runs the blackbox backup of the system.
|
// makeBlackbox runs the blackbox backup of the system.
|
||||||
// It pauses the Instance, if a consistent state is required.
|
// It pauses the Instance, if a consistent state is required.
|
||||||
func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, instance Instance) {
|
func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, instance Instance) {
|
||||||
stack := instance.Stack()
|
stack := instance.Stack()
|
||||||
|
|
||||||
|
og := opgroup.NewOpGroup[string](4)
|
||||||
|
|
||||||
// stop the instance (unless it was explicitly asked to not do so!)
|
// stop the instance (unless it was explicitly asked to not do so!)
|
||||||
if !snapshot.Description.Keepalive {
|
if !snapshot.Description.Keepalive {
|
||||||
logging.LogMessage(io, "Stopping instance")
|
logging.LogMessage(io, "Stopping instance")
|
||||||
snapshot.ErrStop = stack.Down(io)
|
snapshot.ErrStop = stack.Down(io)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
logging.LogMessage(io, "Starting instance")
|
logging.LogMessage(io, "Starting instance")
|
||||||
snapshot.ErrStart = stack.Up(io)
|
snapshot.ErrStart = stack.Up(io)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a wait group, and message channel
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
files := make(chan string, 4)
|
|
||||||
|
|
||||||
// write bookkeeping information
|
// write bookkeeping information
|
||||||
wg.Add(1)
|
og.GoErr(func(files chan<- string) error {
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
bkPath := filepath.Join(snapshot.Description.Dest, "bookkeeping.txt")
|
bkPath := filepath.Join(snapshot.Description.Dest, "bookkeeping.txt")
|
||||||
files <- bkPath
|
files <- bkPath
|
||||||
|
|
||||||
info, err := os.Create(bkPath)
|
info, err := os.Create(bkPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
snapshot.ErrBookkeep = err
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer info.Close()
|
defer info.Close()
|
||||||
|
|
||||||
// print whatever is in the database
|
// print whatever is in the database
|
||||||
// TODO: This should be sql code, maybe gorm can do that?
|
// TODO: This should be sql code, maybe gorm can do that?
|
||||||
_, snapshot.ErrBookkeep = fmt.Fprintf(info, "%#v\n", instance.Instance)
|
_, err = fmt.Fprintf(info, "%#v\n", instance.Instance)
|
||||||
}()
|
return err
|
||||||
|
}, &snapshot.ErrBookkeep)
|
||||||
|
|
||||||
|
// backup the filesystem
|
||||||
|
og.GoErr(func(files chan<- string) error {
|
||||||
|
fsPath := filepath.Join(snapshot.Description.Dest, filepath.Base(instance.FilesystemBase))
|
||||||
|
|
||||||
|
// copy over whatever is in the base directory
|
||||||
|
return fsx.CopyDirectory(fsPath, instance.FilesystemBase, func(dst, src string) {
|
||||||
|
files <- dst
|
||||||
|
})
|
||||||
|
}, &snapshot.ErrFilesystem)
|
||||||
|
|
||||||
|
// backup the graph db repository
|
||||||
|
og.GoErr(func(files chan<- string) error {
|
||||||
|
tsPath := filepath.Join(snapshot.Description.Dest, instance.GraphDBRepository+".nq")
|
||||||
|
files <- tsPath
|
||||||
|
|
||||||
|
nquads, err := os.Create(tsPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer nquads.Close()
|
||||||
|
|
||||||
|
// directly store the result
|
||||||
|
_, err = instance.dis.Triplestore().Backup(nquads, instance.GraphDBRepository)
|
||||||
|
return err
|
||||||
|
}, &snapshot.ErrTriplestore)
|
||||||
|
|
||||||
|
// backup the sql database
|
||||||
|
og.GoErr(func(files chan<- string) error {
|
||||||
|
sqlPath := filepath.Join(snapshot.Description.Dest, snapshot.Instance.SqlDatabase+".sql")
|
||||||
|
files <- sqlPath
|
||||||
|
|
||||||
|
sql, err := os.Create(sqlPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer sql.Close()
|
||||||
|
|
||||||
|
// directly store the result
|
||||||
|
return instance.dis.SQL().Backup(io, sql, instance.SqlDatabase)
|
||||||
|
}, &snapshot.ErrSQL)
|
||||||
|
|
||||||
|
// wait for the group!
|
||||||
|
snapshot.waitGroup(io, og)
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeWhitebox runs the whitebox backup of the system.
|
||||||
|
// The instance should be running during this step.
|
||||||
|
func (snapshot *Snapshot) makeWhitebox(io stream.IOStream, instance Instance) {
|
||||||
|
og := opgroup.NewOpGroup[string](1)
|
||||||
|
|
||||||
// write pathbuilders
|
// write pathbuilders
|
||||||
// TODO: Move this outside of the up/down stuff!
|
// TODO: Move this outside of the up/down stuff!
|
||||||
wg.Add(1)
|
og.GoErr(func(files chan<- string) error {
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
pbPath := filepath.Join(snapshot.Description.Dest, "pathbuilders")
|
pbPath := filepath.Join(snapshot.Description.Dest, "pathbuilders")
|
||||||
files <- pbPath
|
files <- pbPath
|
||||||
|
|
||||||
// create the directory!
|
// create the directory!
|
||||||
if err := os.Mkdir(pbPath, fs.ModeDir); err != nil {
|
if err := os.Mkdir(pbPath, fs.ModeDir); err != nil {
|
||||||
snapshot.ErrPathbuilder = err
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// put in all the pathbuilders
|
// put in all the pathbuilders
|
||||||
snapshot.ErrPathbuilder = instance.ExportPathbuilders(pbPath)
|
return instance.ExportPathbuilders(pbPath)
|
||||||
}()
|
}, &snapshot.ErrPathbuilder)
|
||||||
|
|
||||||
// backup the filesystem
|
// wait for the group!
|
||||||
wg.Add(1)
|
snapshot.waitGroup(io, og)
|
||||||
go func() {
|
}
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
fsPath := filepath.Join(snapshot.Description.Dest, filepath.Base(instance.FilesystemBase))
|
// waitGroup waits for the
|
||||||
|
func (snapshot *Snapshot) waitGroup(io stream.IOStream, og *opgroup.OpGroup[string]) {
|
||||||
// copy over whatever is in the base directory
|
// wait for the messages to return
|
||||||
snapshot.ErrFilesystem = fsx.CopyDirectory(fsPath, instance.FilesystemBase, func(dst, src string) {
|
for file := range og.Wait() {
|
||||||
files <- dst
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
// backup the graph db repository
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
tsPath := filepath.Join(snapshot.Description.Dest, instance.GraphDBRepository+".nq")
|
|
||||||
files <- tsPath
|
|
||||||
|
|
||||||
nquads, err := os.Create(tsPath)
|
|
||||||
if err != nil {
|
|
||||||
snapshot.ErrTriplestore = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer nquads.Close()
|
|
||||||
|
|
||||||
// directly store the result
|
|
||||||
_, snapshot.ErrTriplestore = instance.dis.Triplestore().Backup(nquads, instance.GraphDBRepository)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// backup the sql database
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
sqlPath := filepath.Join(snapshot.Description.Dest, snapshot.Instance.SqlDatabase+".sql")
|
|
||||||
files <- sqlPath
|
|
||||||
|
|
||||||
sql, err := os.Create(sqlPath)
|
|
||||||
if err != nil {
|
|
||||||
snapshot.ErrSQL = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer sql.Close()
|
|
||||||
|
|
||||||
// directly store the result
|
|
||||||
snapshot.ErrSQL = instance.dis.SQL().Backup(io, sql, instance.SqlDatabase)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// TODO: Backup the docker image
|
|
||||||
|
|
||||||
// wait for the group, then close the message channel.
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
close(files)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for file := range files {
|
|
||||||
// get the relative path to the root of the manifest.
|
// get the relative path to the root of the manifest.
|
||||||
// nothing *should* go wrong, but in case it does, use the original path.
|
// nothing *should* go wrong, but in case it does, use the original path.
|
||||||
path, err := filepath.Rel(snapshot.Description.Dest, file)
|
path, err := filepath.Rel(snapshot.Description.Dest, file)
|
||||||
|
|
@ -314,9 +308,6 @@ func (snapshot *Snapshot) makeBlackbox(io stream.IOStream, instance Instance) {
|
||||||
snapshot.Manifest = append(snapshot.Manifest, path)
|
snapshot.Manifest = append(snapshot.Manifest, path)
|
||||||
}
|
}
|
||||||
io.Println("")
|
io.Println("")
|
||||||
|
|
||||||
// make sure the manifest is sorted!
|
|
||||||
slices.Sort(snapshot.Manifest)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteReport writes out the report belonging to this snapshot.
|
// WriteReport writes out the report belonging to this snapshot.
|
||||||
|
|
|
||||||
50
pkg/opgroup/opgroup.go
Normal file
50
pkg/opgroup/opgroup.go
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
// Package opgroup provides OpGroup
|
||||||
|
package opgroup
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// OpGroup represents an operation group that can send messages to the waiting goroutine.
|
||||||
|
//The zero value is not ready for use, use [NewOpGroup] instead.
|
||||||
|
type OpGroup[M any] struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
c chan M
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOpGroup creates a new OpGroup.
|
||||||
|
//
|
||||||
|
// The internal buffer size for messages will be expectedSize.
|
||||||
|
// If unsure about buffer size, 0 is a valid choice.
|
||||||
|
func NewOpGroup[M any](expectedSize int) *OpGroup[M] {
|
||||||
|
return &OpGroup[M]{
|
||||||
|
c: make(chan M, expectedSize),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go schedules a new operation (implemented by worker) to run in a separate goroutine.
|
||||||
|
// worker is passed a send-only reference to the message channel which it can uszxe to send messages to.
|
||||||
|
func (op *OpGroup[M]) Go(worker func(c chan<- M)) {
|
||||||
|
op.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer op.wg.Done()
|
||||||
|
worker(op.c)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GoErr is like Go, except that once the operation is finished, it writes the returned error into dest.
|
||||||
|
func (op *OpGroup[M]) GoErr(worker func(c chan<- M) error, dest *error) {
|
||||||
|
op.Go(func(c chan<- M) {
|
||||||
|
*dest = worker(c)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait returns a receive-only reference to the message channel.//
|
||||||
|
// The message channel will be closed once all operations on this group have completed.
|
||||||
|
//
|
||||||
|
// The Wait function may only be called once.
|
||||||
|
func (op *OpGroup[M]) Wait() <-chan M {
|
||||||
|
go func() {
|
||||||
|
op.wg.Wait()
|
||||||
|
close(op.c)
|
||||||
|
}()
|
||||||
|
return op.c
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue