Update to goprogram 0.1.0

This commit is contained in:
Tom Wiesing 2022-10-06 13:38:29 +02:00
parent d2d681a4f2
commit 7cda92b342
No known key found for this signature in database
31 changed files with 141 additions and 244 deletions

View file

@ -7,9 +7,9 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
"github.com/FAU-CDI/wisski-distillery/internal/core"
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/FAU-CDI/wisski-distillery/pkg/smartp"
"github.com/tkw1536/goprogram/exit"
"github.com/tkw1536/goprogram/lib/collection"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
@ -46,13 +46,13 @@ func (bu blindUpdate) Run(context wisski_distillery.Context) error {
return err
}
if !bu.Force {
wissKIs = slicesx.Filter(wissKIs, func(instance instances.WissKI) bool {
wissKIs = collection.Filter(wissKIs, func(instance instances.WissKI) bool {
return bool(instance.AutoBlindUpdateEnabled)
})
}
// and do the actual blind_update!
return smartp.Run(context.IOStream, bu.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
return status.StreamGroup(context.IOStream, bu.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
code, err := instance.Shell(io, "/runtime/blind_update.sh")
if err != nil {
return errBlindUpdateFailed.WithMessageF(instance.Slug, environment.ExecCommandError)
@ -61,7 +61,7 @@ func (bu blindUpdate) Run(context wisski_distillery.Context) error {
return errBlindUpdateFailed.WithMessageF(instance.Slug, code)
}
return nil
}, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string {
}, wissKIs, status.SmartMessage(func(item instances.WissKI) string {
return fmt.Sprintf("blind_update %q", item.Slug)
}))
}

View file

@ -6,8 +6,8 @@ import (
wisski_distillery "github.com/FAU-CDI/wisski-distillery"
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
"github.com/FAU-CDI/wisski-distillery/internal/core"
"github.com/FAU-CDI/wisski-distillery/pkg/smartp"
"github.com/tkw1536/goprogram/exit"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
@ -45,7 +45,7 @@ func (cr cron) Run(context wisski_distillery.Context) error {
}
// and do the actual blind_update!
return smartp.Run(context.IOStream, cr.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
return status.StreamGroup(context.IOStream, cr.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
code, err := instance.Shell(io, "/runtime/cron.sh")
if err != nil {
io.EPrintln(err)
@ -57,7 +57,7 @@ func (cr cron) Run(context wisski_distillery.Context) error {
}
return nil
}, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string {
}, wissKIs, status.SmartMessage(func(item instances.WissKI) string {
return fmt.Sprintf("cron %q", item.Slug)
}))
}

View file

@ -5,7 +5,7 @@ import (
wisski_distillery "github.com/FAU-CDI/wisski-distillery"
"github.com/FAU-CDI/wisski-distillery/internal/core"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/tkw1536/goprogram/lib/collection"
)
// Info is then 'info' command
@ -72,7 +72,7 @@ func (i info) Run(context wisski_distillery.Context) error {
}
context.Printf("Pathbuilders: (count %d)\n", len(info.Pathbuilders))
slicesx.ForSorted(info.Pathbuilders, func(name, data string) {
collection.IterateSorted(info.Pathbuilders, func(name, data string) {
context.Printf("- %s (%d bytes)\n", name, len(data))
})

View file

@ -6,8 +6,8 @@ import (
wisski_distillery "github.com/FAU-CDI/wisski-distillery"
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
"github.com/FAU-CDI/wisski-distillery/internal/core"
"github.com/FAU-CDI/wisski-distillery/pkg/smartp"
"github.com/tkw1536/goprogram/exit"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
@ -46,9 +46,9 @@ func (rb rebuild) Run(context wisski_distillery.Context) error {
}
// and do the actual rebuild
return smartp.Run(context.IOStream, rb.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
return status.StreamGroup(context.IOStream, rb.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
return instance.Build(io, true)
}, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string {
}, wissKIs, status.SmartMessage(func(item instances.WissKI) string {
return fmt.Sprintf("rebuild %q", item.Slug)
}))
}

View file

@ -33,7 +33,7 @@ var errServerListen = exit.Error{
func (s server) Run(context wisski_distillery.Context) error {
dis := context.Environment
handler, err := dis.Control().Server(context.IOStream)
handler, err := dis.Control().Server(dis.Context(), context.IOStream)
if err != nil {
return err
}
@ -46,6 +46,11 @@ func (s server) Run(context wisski_distillery.Context) error {
return errServerListen.Wrap(err)
}
go func() {
<-dis.Context().Done()
listener.Close()
}()
// and serve that listener
err = http.Serve(listener, http.StripPrefix(s.Prefix, handler))
if err == nil {

View file

@ -6,8 +6,9 @@ import (
wisski_distillery "github.com/FAU-CDI/wisski-distillery"
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
"github.com/FAU-CDI/wisski-distillery/internal/core"
"github.com/FAU-CDI/wisski-distillery/pkg/smartp"
"github.com/tkw1536/goprogram/exit"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
@ -41,14 +42,14 @@ func (upc updateprefixconfig) Run(context wisski_distillery.Context) error {
return errPrefixUpdateFailed.Wrap(err)
}
return smartp.Run(context.IOStream, upc.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
return status.StreamGroup(context.IOStream, upc.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
io.Println("reading prefixes")
err := instance.UpdatePrefixes()
if err != nil {
return errPrefixUpdateFailed.Wrap(err)
}
return nil
}, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string {
}, wissKIs, status.SmartMessage(func(item instances.WissKI) string {
return fmt.Sprintf("update_prefix %q", item.Slug)
}))
}

4
go.mod
View file

@ -9,8 +9,8 @@ require (
github.com/feiin/sqlstring v0.3.0
github.com/go-sql-driver/mysql v1.6.0
github.com/pkg/errors v0.9.1
github.com/tkw1536/goprogram v0.0.17
golang.org/x/exp v0.0.0-20220929160808-de9c53c655b9
github.com/tkw1536/goprogram v0.1.0
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
gorm.io/driver/mysql v1.3.6
gorm.io/gorm v1.23.10

4
go.sum
View file

@ -23,8 +23,12 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/tkw1536/goprogram v0.0.17 h1:SAD/rHtxm7CTEdUV1a37LUyE6G0MdcKhLzO8fZ8cFKI=
github.com/tkw1536/goprogram v0.0.17/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE=
github.com/tkw1536/goprogram v0.1.0 h1:cP+Z7VKgRF93JApsau1azKxGrH/nTwcGLnSYfZP9XjE=
github.com/tkw1536/goprogram v0.1.0/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE=
golang.org/x/exp v0.0.0-20220929160808-de9c53c655b9 h1:lNtcVz/3bOstm7Vebox+5m3nLh/BYWnhmc3AhXOW6oI=
golang.org/x/exp v0.0.0-20220929160808-de9c53c655b9/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 h1:fGZugkZk2UgYBxtpKmvub51Yno1LJDeEsRp2xGD+0gY=
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View file

@ -1,6 +1,7 @@
package control
import (
"context"
"embed"
"html/template"
"io/fs"
@ -27,7 +28,7 @@ func (Info) Name() string { return "control-info" }
func (*Info) Routes() []string { return []string{"/dis/"} }
func (info *Info) Handler(route string, io stream.IOStream) (http.Handler, error) {
func (info *Info) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
mux := http.NewServeMux()
// handle everything under /dis/!

View file

@ -1,6 +1,7 @@
package control
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -22,7 +23,7 @@ func (SelfHandler) Name() string { return "control-self" }
func (*SelfHandler) Routes() []string { return []string{"/"} }
func (sh *SelfHandler) Handler(route string, io stream.IOStream) (http.Handler, error) {
func (sh *SelfHandler) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
// create a redirect
var redirect Redirect
var err error

View file

@ -1,14 +1,17 @@
package control
import (
"context"
"net/http"
"github.com/tkw1536/goprogram/stream"
)
// Server returns an http.Mux that implements the main server instance
// Server returns an http.Mux that implements the main server instance.
// The server may spawn background tasks, but these should be terminated once context closes.
//
// Logging messages are directed to io.
func (control *Control) Server(io stream.IOStream) (*http.ServeMux, error) {
func (control *Control) Server(context context.Context, io stream.IOStream) (*http.ServeMux, error) {
// create a new mux
mux := http.NewServeMux()
@ -16,7 +19,7 @@ func (control *Control) Server(io stream.IOStream) (*http.ServeMux, error) {
for _, s := range control.Servables {
for _, route := range s.Routes() {
io.Printf("mounting %s\n", route)
handler, err := s.Handler(route, io)
handler, err := s.Handler(route, context, io)
if err != nil {
return nil, err
}

View file

@ -3,7 +3,7 @@ package instances
import (
"github.com/FAU-CDI/wisski-distillery/internal/models"
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/tkw1536/goprogram/lib/collection"
)
// SnapshotLogFor retrieves (and prunes) the SnapshotLog for the provided slug.
@ -14,7 +14,7 @@ func (instances *Instances) SnapshotLogFor(slug string) (snapshots []models.Snap
return nil, err
}
return slicesx.Filter(snapshots, func(s models.Snapshot) bool {
return collection.Filter(snapshots, func(s models.Snapshot) bool {
return s.Slug == slug
}), nil
}
@ -35,7 +35,7 @@ func (instances *Instances) SnapshotLog() ([]models.Snapshot, error) {
}
// partition out the snapshots that have been deleted!
parts := slicesx.Partition(snapshots, func(s models.Snapshot) bool {
parts := collection.Partition(snapshots, func(s models.Snapshot) bool {
_, err := instances.Core.Environment.Stat(s.Path)
return !environment.IsNotExist(err)
})

View file

@ -12,7 +12,7 @@ var exportPathbuilderPHP string
// Pathbuilders returns the ids of all pathbuilders in consistent order.
func (wisski *WissKI) Pathbuilders() (ids []string, err error) {
err = wisski.ExecPHPScript(stream.FromDebug(), &ids, exportPathbuilderPHP, "all_list")
err = wisski.ExecPHPScript(stream.FromNil(), &ids, exportPathbuilderPHP, "all_list")
slices.Sort(ids)
return
}
@ -20,12 +20,12 @@ func (wisski *WissKI) Pathbuilders() (ids []string, err error) {
// Pathbuilder returns a single pathbuilder as xml.
// If it does not exist, it returns the empty string and nil error.
func (wisski *WissKI) Pathbuilder(id string) (xml string, err error) {
err = wisski.ExecPHPScript(stream.FromDebug(), &xml, exportPathbuilderPHP, "one_xml", id)
err = wisski.ExecPHPScript(stream.FromNil(), &xml, exportPathbuilderPHP, "one_xml", id)
return
}
// AllPathbuilders returns all pathbuilders serialized as xml
func (wisski *WissKI) AllPathbuilders() (pathbuilders map[string]string, err error) {
err = wisski.ExecPHPScript(stream.FromDebug(), &pathbuilders, exportPathbuilderPHP, "all_xml")
err = wisski.ExecPHPScript(stream.FromNil(), &pathbuilders, exportPathbuilderPHP, "all_xml")
return
}

View file

@ -132,10 +132,10 @@ func marshalPHP(data any) (string, error) {
var settingsPHP string
func (wisski *WissKI) GetSettingsPHP(key string) (value any, err error) {
err = wisski.ExecPHPScript(stream.FromDebug(), &value, settingsPHP, "get_setting", key)
err = wisski.ExecPHPScript(stream.FromNil(), &value, settingsPHP, "get_setting", key)
return
}
func (wisski *WissKI) SetSettingsPHP(key string, value any) error {
return wisski.ExecPHPScript(stream.FromDebug(), nil, settingsPHP, "set_setting", key, value)
return wisski.ExecPHPScript(stream.FromNil(), nil, settingsPHP, "set_setting", key, value)
}

View file

@ -6,7 +6,7 @@ import (
"strings"
"github.com/FAU-CDI/wisski-distillery/pkg/fsx"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/tkw1536/goprogram/lib/collection"
"github.com/tkw1536/goprogram/stream"
_ "embed"
@ -38,13 +38,13 @@ func (wisski *WissKI) Prefixes() ([]string, error) {
func (wisski *WissKI) dbPrefixes() (prefixes []string, err error) {
// get all the ugly prefixes
err = wisski.ExecPHPScript(stream.FromDebug(), &prefixes, listURIPrefixesPHP, "list_prefixes")
err = wisski.ExecPHPScript(stream.FromNil(), &prefixes, listURIPrefixesPHP, "list_prefixes")
if err != nil {
return nil, err
}
// filter out sequential prefixes
prefixes = slicesx.NonSequential(prefixes, func(prev, now string) bool {
prefixes = collection.NonSequential(prefixes, func(prev, now string) bool {
return strings.HasPrefix(now, prev)
})
@ -55,7 +55,7 @@ func (wisski *WissKI) dbPrefixes() (prefixes []string, err error) {
}
// filter out blocked prefixes
return slicesx.Filter(prefixes, func(uri string) bool { return !hasAnyPrefix(uri, blocks) }), nil
return collection.Filter(prefixes, func(uri string) bool { return !hasAnyPrefix(uri, blocks) }), nil
}
func (instances *Instances) blockedPrefixes() ([]string, error) {
@ -87,7 +87,7 @@ func (instances *Instances) blockedPrefixes() ([]string, error) {
}
func hasAnyPrefix(candidate string, prefixes []string) bool {
return slicesx.Any(
return collection.Any(
prefixes,
func(prefix string) bool {
return strings.HasPrefix(candidate, prefix)
@ -144,5 +144,5 @@ func (wisski *WissKI) UpdatePrefixes() error {
return err
}
return wisski.Metadata().SetAll(PrefixConfigKey, slicesx.AsAny(prefixes)...)
return wisski.Metadata().SetAll(PrefixConfigKey, collection.AsAny(prefixes)...)
}

View file

@ -4,7 +4,7 @@ import (
"reflect"
"sync"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/tkw1536/goprogram/lib/collection"
"github.com/tkw1536/goprogram/lib/reflectx"
)
@ -88,7 +88,7 @@ func (m meta) InitComponent(instance reflect.Value, all []Component) {
// assign the component fields
for field, eType := range m.CFields {
c := slicesx.First(all, func(c Component) bool {
c := collection.First(all, func(c Component) bool {
return reflect.TypeOf(c).AssignableTo(eType)
})

View file

@ -1,6 +1,7 @@
package resolver
import (
"context"
"fmt"
"net/http"
"regexp"
@ -20,6 +21,8 @@ type Resolver struct {
Instances *instances.Instances
prefixes lazy.Lazy[map[string]string] // cached prefixes (from the server)
RefreshInterval time.Duration
handler lazy.Lazy[wdresolve.ResolveHandler] // handler
}
@ -27,7 +30,7 @@ func (*Resolver) Name() string { return "resolver" }
func (resolver *Resolver) Routes() []string { return []string{"/go/", "/wisski/get/"} }
func (resolver *Resolver) Handler(route string, io stream.IOStream) (http.Handler, error) {
func (resolver *Resolver) Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error) {
var err error
return resolver.handler.Get(func() (p wdresolve.ResolveHandler) {
p.TrustXForwardedProto = true
@ -49,6 +52,8 @@ func (resolver *Resolver) Handler(route string, io stream.IOStream) (http.Handle
io.Printf("registering legacy domain %s\n", domain)
}
go resolver.updatePrefixes(io, context)
// resolve the prefixes
p.Resolver = resolvers.InOrder{
resolver,
@ -58,26 +63,44 @@ func (resolver *Resolver) Handler(route string, io stream.IOStream) (http.Handle
}), err
}
func (resolver *Resolver) updatePrefixes(io stream.IOStream, ctx context.Context) {
t := time.NewTicker(resolver.RefreshInterval)
defer t.Stop()
for {
select {
case <-t.C:
io.Println("resolver: Reloading prefixes from database")
prefixes, _ := resolver.AllPrefixes()
resolver.prefixes.Set(prefixes)
case <-ctx.Done():
return
}
}
}
func (resolver *Resolver) Target(uri string) string {
return wdresolve.PrefixTarget(resolver, uri)
}
// allow reloading prefixes from the server every minute
const prefixesRefresh = time.Minute
// Prefixes returns a cached list of prefixes
func (resolver *Resolver) Prefixes() (prefixes map[string]string) {
// reset the prefixes after a specific time, but only if requested
resolver.prefixes.ResetAfter(prefixesRefresh)
return resolver.prefixes.Get(resolver.freshPrefixes)
return resolver.prefixes.Get(func() map[string]string {
prefixes, _ := resolver.AllPrefixes()
return prefixes
})
}
func (resolver *Resolver) freshPrefixes() map[string]string {
// AllPrefixes returns a list of all prefixes from the server.
// Prefixes may be cached on the server
func (resolver *Resolver) AllPrefixes() (map[string]string, error) {
instances, err := resolver.Instances.All()
if err != nil {
return nil
return nil, err
}
gPrefixes := make(map[string]string)
var lastErr error
for _, instance := range instances {
if instance.NoPrefix() {
continue
@ -88,6 +111,7 @@ func (resolver *Resolver) freshPrefixes() map[string]string {
// => skip it!
prefixes, err := instance.PrefixesCached()
if err != nil {
lastErr = err
continue
}
@ -95,5 +119,6 @@ func (resolver *Resolver) freshPrefixes() map[string]string {
gPrefixes[p] = url
}
}
return gPrefixes
return gPrefixes, lastErr
}

View file

@ -1,6 +1,7 @@
package component
import (
"context"
"net/http"
"github.com/tkw1536/goprogram/stream"
@ -14,5 +15,5 @@ type Servable interface {
Routes() []string
// Handler returns the handler for the requested route
Handler(route string, io stream.IOStream) (http.Handler, error)
Handler(route string, context context.Context, io stream.IOStream) (http.Handler, error)
}

View file

@ -10,7 +10,7 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
"github.com/FAU-CDI/wisski-distillery/internal/models"
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/FAU-CDI/wisski-distillery/pkg/slicesx"
"github.com/tkw1536/goprogram/lib/collection"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
"golang.org/x/exp/slices"
@ -91,7 +91,7 @@ func (snapshot *Snapshot) makeParts(ios stream.IOStream, snapshots *Manager, ins
defer st.Stop()
// get all the components
comps := slicesx.FilterClone(snapshots.Snapshotable, func(sc component.Snapshotable) bool {
comps := collection.FilterClone(snapshots.Snapshotable, func(sc component.Snapshotable) bool {
return sc.SnapshotNeedsRunning() == needsRunning
})

View file

@ -91,7 +91,7 @@ func (sql *SQL) QueryTable(silent bool, table string) (*gorm.DB, error) {
// WaitQueryTable waits for a connection to succeed via QueryTable
func (sql *SQL) WaitQueryTable() error {
// TODO: Establish a convention on when to wait for this!
n := stream.FromDebug()
n := stream.FromNil()
return wait.Wait(func() bool {
_, err := sql.QueryTable(true, models.InstanceTable)
n.EPrintf("[SQL.WaitQueryTable]: %s\n", err)

View file

@ -8,9 +8,9 @@ import (
"github.com/tkw1536/goprogram/stream"
)
func (SQL) SnapshotNeedsRunning() bool { return false }
func (*SQL) SnapshotNeedsRunning() bool { return false }
func (SQL) SnapshotName() string { return "sql" }
func (*SQL) SnapshotName() string { return "sql" }
func (sql *SQL) Snapshot(wisski models.Instance, context component.StagingContext) error {
return context.AddDirectory(".", func() error {

View file

@ -22,15 +22,15 @@ type SQL struct {
lazyNetwork lazy.Lazy[string]
}
func (SQL) Name() string {
func (*SQL) Name() string {
return "sql"
}
func (sql SQL) Path() string {
func (sql *SQL) Path() string {
return filepath.Join(sql.Core.Config.DeployRoot, "core", sql.Name())
}
func (SQL) Context(parent component.InstallationContext) component.InstallationContext {
func (*SQL) Context(parent component.InstallationContext) component.InstallationContext {
return parent
}

View file

@ -21,7 +21,7 @@ func (sql *SQL) Shell(io stream.IOStream, argv ...string) (int, error) {
// unsafeWaitShell waits for a connection via the database shell to succeed
func (sql *SQL) unsafeWaitShell() error {
n := stream.FromDebug()
n := stream.FromNil()
return wait.Wait(func() bool {
code, err := sql.Shell(n, "-e", "select 1;")
n.EPrintf("[SQL.unsafeWaitShell]: %d %s\n", code, err)

View file

@ -86,7 +86,7 @@ func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName str
// Wait waits for the connection to the Triplestore to succeed.
// This is achieved using a polling strategy.
func (ts Triplestore) Wait() error {
n := stream.FromDebug()
n := stream.FromNil()
return wait.Wait(func() bool {
res, err := ts.OpenRaw("GET", "/rest/repositories", nil, "", "")
n.EPrintf("[Triplestore.Wait]: %s\n", err)

View file

@ -1,6 +1,9 @@
package core
import (
"context"
"os"
"os/signal"
"path/filepath"
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
@ -9,6 +12,7 @@ import (
// Params are used to initialize the excutable.
type Params struct {
ConfigPath string // ConfigPath is the path to the configuration file for the distillery
Context context.Context // Context for the distillery
}
// ParamsFromEnv creates a new set of parameters from the environment.
@ -28,5 +32,19 @@ func ParamsFromEnv() (params Params, err error) {
// and add the configuration file name to it!
params.ConfigPath = filepath.Join(params.ConfigPath, ConfigFile)
// generate a new context
ctx, cancel := context.WithCancel(context.Background())
params.Context = ctx
// cancel the context on an interrupt
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
cancel()
}()
// and return the params!
return params, nil
}

View file

@ -44,7 +44,9 @@ func (dis *Distillery) register(context *component.PoolContext) []component.Comp
// Control server
ra[*control.Control](dis, context),
ra[*control.SelfHandler](dis, context),
ra[*resolver.Resolver](dis, context),
r(dis, context, func(resolver *resolver.Resolver) {
resolver.RefreshInterval = time.Minute
}),
ra[*control.Info](dis, context),
}
}

View file

@ -21,6 +21,9 @@ type Distillery struct {
// core holds the core of the distillery
component.Core
// internal context for the distillery
context context.Context
// Upstream holds information to connect to the various running
// distillery components.
//
@ -40,7 +43,7 @@ type Upstream struct {
// Context returns a new Context belonging to this distillery
func (dis *Distillery) Context() context.Context {
return context.Background()
return dis.context
}
//

View file

@ -21,6 +21,7 @@ var errOpenConfig = exit.Error{
// NewDistillery creates a new distillery from the provided flags
func NewDistillery(params core.Params, flags core.Flags, req core.Requirements) (dis *Distillery, err error) {
dis = &Distillery{
context: params.Context,
Core: component.Core{
Environment: environment.Native{},
},

View file

@ -35,6 +35,16 @@ func (lazy *Lazy[T]) Get(init func() T) T {
return lazy.value
}
// Set atomically sets the value of this lazy, preventing future calls to get from invoking init.
// It may be called concurrently with calls to [Get] and [Reset].
func (lazy *Lazy[T]) Set(value T) {
lazy.m.Lock()
defer lazy.m.Unlock()
lazy.value = value
lazy.once.Do(func() {})
}
// Reset resets this Lazy, deleting any previously associated value.
//
// May be called concurrently with [Get].

View file

@ -1,102 +0,0 @@
package slicesx
import (
"golang.org/x/exp/constraints"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
// ForSorted iterates over the map in an ordered fashion
func ForSorted[K constraints.Ordered, V any](mp map[K]V, callback func(k K, v V)) {
keys := maps.Keys(mp)
slices.Sort(keys)
for _, key := range keys {
callback(key, mp[key])
}
}
// First returns the first value of type V
// When no such value exists, returns the zero value
func First[V any](values []V, test func(v V) bool) V {
for _, v := range values {
if test(v) {
return v
}
}
var v V
return v
}
// Any returns true if test returns true for any of values.
func Any[T any](values []T, test func(T) bool) bool {
for _, v := range values {
if test(v) {
return true
}
}
return false
}
// Filter filters values in place
func Filter[T any](values []T, filter func(T) bool) []T {
results := values[:0]
for _, value := range values {
if filter(value) {
results = append(results, value)
}
}
return results
}
// AsAny returns the same slice as values, but as any
func AsAny[T any](values []T) []any {
results := make([]any, len(values))
for i, v := range values {
results[i] = v
}
return results
}
// Partition partitions values in T by the given functions.
func Partition[T any, P comparable](values []T, partition func(value T) P) map[P][]T {
result := make(map[P][]T)
for _, v := range values {
part := partition(v)
result[part] = append(result[part], v)
}
return result
}
// FilterClone is like [Filter], but creates a new slice
func FilterClone[T any](values []T, filter func(T) bool) (results []T) {
for _, value := range values {
if filter(value) {
results = append(results, value)
}
}
return
}
// NonSequential sorts values, and then removes elements for which test() returns true.
// NonSequential does not re-allocate, but uses the existing slice.
func NonSequential[T constraints.Ordered](values []T, test func(prev, current T) bool) []T {
if len(values) < 2 {
return values
}
// sort the values and make a results array
slices.Sort(values)
results := values[:1]
// do the filter loop
prev := results[0]
for _, current := range values[1:] {
if !test(prev, current) {
results = append(results, current)
}
prev = current
}
return results
}

View file

@ -1,76 +0,0 @@
package smartp
import (
"io"
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
"github.com/tkw1536/goprogram/status"
"github.com/tkw1536/goprogram/stream"
)
// Run runs f over all items with the given paralllelism.
// When parallel is 1, runs items sequentially with a full input / output stream.
func Run[T any](ios stream.IOStream, parallel int, f func(value T, stream stream.IOStream) error, items []T, opts ...Option[T]) error {
// create a group
var group status.Group[T, error]
group.HandlerLimit = parallel
// apply all the options
isParallel := parallel != 1
for _, opt := range opts {
group = opt(isParallel, group)
}
// setup the default prefix string
if group.PrefixString == nil {
group.PrefixString = status.DefaultPrefixString[T]
}
// if we are running sequentially
// then just iterate over the items
if !isParallel {
for index, item := range items {
err := logging.LogOperation(func() error {
return f(item, ios)
}, ios, "%v", group.PrefixString(item, index))
if err != nil {
return err
}
}
return nil
}
// if we are running in parallel, setup a handler
group.Handler = func(item T, index int, writer io.Writer) error {
ios := stream.NewIOStream(writer, writer, nil, 0)
return f(item, ios)
}
// create a new status display
st := status.NewWithCompat(ios.Stdout, 0)
st.Start()
defer st.Stop()
// and use it!
return status.UseErrorGroup(st, group, items)
}
// Option represents an option of a StatusGroup
type Option[T any] func(bool, status.Group[T, error]) status.Group[T, error]
// SmartMessage returns an option that sets the display of the provided item to the given handler
func SmartMessage[T any](handler func(value T) string) Option[T] {
return func(p bool, s status.Group[T, error]) status.Group[T, error] {
s.PrefixString = func(item T, index int) string {
message := handler(item)
if p {
return "[" + message + "]: "
}
return message
}
s.PrefixAlign = true
return s
}
}