From 2639cda69bb30443c0f5c7180bf66d5a39e5c735 Mon Sep 17 00:00:00 2001 From: Tom Wiesing Date: Mon, 3 Oct 2022 14:15:39 +0200 Subject: [PATCH] cmd/blind_update: Refactor parallel code --- cmd/blind_update.go | 48 ++++++---------------------- pkg/smartp/smartp.go | 76 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 39 deletions(-) create mode 100644 pkg/smartp/smartp.go diff --git a/cmd/blind_update.go b/cmd/blind_update.go index a6e7434..a91c216 100644 --- a/cmd/blind_update.go +++ b/cmd/blind_update.go @@ -2,15 +2,14 @@ package cmd import ( "fmt" - "io" wisski_distillery "github.com/FAU-CDI/wisski-distillery" "github.com/FAU-CDI/wisski-distillery/internal/component/instances" "github.com/FAU-CDI/wisski-distillery/internal/core" "github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/slicesx" + "github.com/FAU-CDI/wisski-distillery/pkg/smartp" "github.com/tkw1536/goprogram/exit" - "github.com/tkw1536/goprogram/status" "github.com/tkw1536/goprogram/stream" ) @@ -41,6 +40,7 @@ var errBlindUpdateFailed = exit.Error{ } func (bu blindUpdate) Run(context wisski_distillery.Context) error { + // find all the instances! wissKIs, err := context.Environment.Instances().Load(bu.Positionals.Slug...) if err != nil { return err @@ -51,47 +51,17 @@ func (bu blindUpdate) Run(context wisski_distillery.Context) error { }) } - if bu.Parallel == 1 { - return bu.runSequential(wissKIs, context) - } - - return bu.runParallel(wissKIs, context) -} - -func (bu blindUpdate) runParallel(wissKIs []instances.WissKI, context wisski_distillery.Context) error { - return status.RunErrorGroup(context.Stdout, status.Group[instances.WissKI, error]{ - PrefixString: func(item instances.WissKI, index int) string { - return fmt.Sprintf("[instance %q]: ", item.Slug) - }, - PrefixAlign: true, - - Handler: func(instance instances.WissKI, index int, writer io.Writer) error { - io := stream.NewIOStream(writer, writer, nil, 0) - - code, err := instance.Shell(io, "/runtime/blind_update.sh") - if err != nil { - return errBlindUpdateFailed.WithMessageF(instance.Slug, environment.ExecCommandError) - } - if code != 0 { - return errBlindUpdateFailed.WithMessageF(instance.Slug, code) - } - return nil - }, - HandlerLimit: bu.Parallel, - }, wissKIs) -} - -func (bu blindUpdate) runSequential(wissKIs []instances.WissKI, context wisski_distillery.Context) error { - for _, instance := range wissKIs { - context.EPrintf("Updating instance %s\n", instance.Slug) - - code, err := instance.Shell(context.IOStream, "/runtime/blind_update.sh") + // and do the actual blind_update! + return smartp.Run(context.IOStream, bu.Parallel, func(instance instances.WissKI, io stream.IOStream) error { + code, err := instance.Shell(io, "/runtime/blind_update.sh") if err != nil { return errBlindUpdateFailed.WithMessageF(instance.Slug, environment.ExecCommandError) } if code != 0 { return errBlindUpdateFailed.WithMessageF(instance.Slug, code) } - } - return nil + return nil + }, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string { + return fmt.Sprintf("blind_update %q", item.Slug) + })) } diff --git a/pkg/smartp/smartp.go b/pkg/smartp/smartp.go new file mode 100644 index 0000000..c5a9128 --- /dev/null +++ b/pkg/smartp/smartp.go @@ -0,0 +1,76 @@ +package smartp + +import ( + "io" + + "github.com/FAU-CDI/wisski-distillery/pkg/logging" + "github.com/tkw1536/goprogram/status" + "github.com/tkw1536/goprogram/stream" +) + +// Run runs f over all items with the given paralllelism. +// When parallel is 1, runs items sequentially with a full input / output stream. +func Run[T any](ios stream.IOStream, parallel int, f func(value T, stream stream.IOStream) error, items []T, opts ...Option[T]) error { + + // create a group + var group status.Group[T, error] + group.HandlerLimit = parallel + + // apply all the options + isParallel := parallel != 1 + for _, opt := range opts { + group = opt(isParallel, group) + } + + // setup the default prefix string + if group.PrefixString == nil { + group.PrefixString = status.DefaultPrefixString[T] + } + + // if we are running sequentially + // then just iterate over the items + if !isParallel { + for index, item := range items { + err := logging.LogOperation(func() error { + return f(item, ios) + }, ios, "%v", group.PrefixString(item, index)) + if err != nil { + return err + } + } + + return nil + } + + // if we are running in parallel, setup a handler + group.Handler = func(item T, index int, writer io.Writer) error { + ios := stream.NewIOStream(writer, writer, nil, 0) + return f(item, ios) + } + + // create a new status display + st := status.NewWithCompat(ios.Stdout, 0) + st.Start() + defer st.Stop() + + // and use it! + return status.UseErrorGroup(st, group, items) +} + +// Option represents an option of a StatusGroup +type Option[T any] func(bool, status.Group[T, error]) status.Group[T, error] + +// SmartMessage returns an option that sets the display of the provided item to the given handler +func SmartMessage[T any](handler func(value T) string) Option[T] { + return func(p bool, s status.Group[T, error]) status.Group[T, error] { + s.PrefixString = func(item T, index int) string { + message := handler(item) + if p { + return "[" + message + "]: " + } + return message + } + s.PrefixAlign = true + return s + } +}