components/resolver: Use database instead of file approach
This commit is contained in:
parent
f6b38f055d
commit
32107265d4
10 changed files with 165 additions and 103 deletions
|
|
@ -1,19 +1,22 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"io"
|
||||
"fmt"
|
||||
|
||||
wisski_distillery "github.com/FAU-CDI/wisski-distillery"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/core"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/environment"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/smartp"
|
||||
"github.com/tkw1536/goprogram/exit"
|
||||
"github.com/tkw1536/goprogram/stream"
|
||||
)
|
||||
|
||||
// Cron is the 'cron' command
|
||||
var UpdatePrefixConfig wisski_distillery.Command = updateprefixconfig{}
|
||||
|
||||
type updateprefixconfig struct{}
|
||||
type updateprefixconfig struct {
|
||||
Parallel int `short:"p" long:"parallel" description:"run on (at most) this many instances in parallel. 0 for no limit." default:"1"`
|
||||
}
|
||||
|
||||
func (updateprefixconfig) Description() wisski_distillery.Description {
|
||||
return wisski_distillery.Description{
|
||||
|
|
@ -26,55 +29,26 @@ func (updateprefixconfig) Description() wisski_distillery.Description {
|
|||
}
|
||||
|
||||
var errPrefixUpdateFailed = exit.Error{
|
||||
Message: "Failed to update the prefix configuration: %s",
|
||||
Message: "Failed to update the prefix configuration",
|
||||
ExitCode: exit.ExitGeneric,
|
||||
}
|
||||
|
||||
func (upc updateprefixconfig) Run(context wisski_distillery.Context) error {
|
||||
dis := context.Environment
|
||||
|
||||
instances, err := dis.Instances().All()
|
||||
wissKIs, err := dis.Instances().All()
|
||||
if err != nil {
|
||||
return errPrefixUpdateFailed.WithMessageF(err)
|
||||
return errPrefixUpdateFailed.Wrap(err)
|
||||
}
|
||||
|
||||
ddis := dis.Control()
|
||||
target := dis.Resolver().ConfigPath()
|
||||
|
||||
// print the configuration
|
||||
config, err := dis.Core.Environment.Create(target, environment.DefaultFilePerm)
|
||||
return smartp.Run(context.IOStream, upc.Parallel, func(instance instances.WissKI, io stream.IOStream) error {
|
||||
io.Println("reading prefixes")
|
||||
err := instance.UpdatePrefixes()
|
||||
if err != nil {
|
||||
return errPrefixUpdateFailed.WithMessageF(err)
|
||||
return errPrefixUpdateFailed.Wrap(err)
|
||||
}
|
||||
|
||||
// iterate over the instances and store the last value of error
|
||||
for _, instance := range instances {
|
||||
if err := logging.LogOperation(func() error {
|
||||
// read the prefix config
|
||||
data, err := instance.PrefixConfig()
|
||||
if err != nil {
|
||||
data = "# error, skipped\n"
|
||||
context.EPrintln(err)
|
||||
err = nil
|
||||
}
|
||||
context.IOStream.Printf("%s", data)
|
||||
|
||||
// and write it out!
|
||||
if _, err := io.WriteString(config, data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}, context.IOStream, "reading prefix config %s", instance.Slug); err != nil {
|
||||
return errPrefixUpdateFailed.WithMessageF(err)
|
||||
}
|
||||
}
|
||||
|
||||
// and restart the resolver to apply the config!
|
||||
logging.LogMessage(context.IOStream, "restarting resolver stack")
|
||||
if err := ddis.Stack(ddis.Environment).Restart(context.IOStream); err != nil {
|
||||
return errPrefixUpdateFailed.WithMessageF(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}, wissKIs, smartp.SmartMessage(func(item instances.WissKI) string {
|
||||
return fmt.Sprintf("update_prefix %q", item.Slug)
|
||||
}))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,8 +14,6 @@ type Control struct {
|
|||
component.ComponentBase
|
||||
|
||||
Servables []component.Servable
|
||||
|
||||
ResolverFile string // TODO: this shouldn't be needed!
|
||||
}
|
||||
|
||||
func (control Control) Name() string {
|
||||
|
|
@ -48,7 +46,6 @@ func (control *Control) Stack(env environment.Environment) component.StackWithRe
|
|||
"SELF_RESOLVER_BLOCK_FILE": control.Config.SelfResolverBlockFile,
|
||||
},
|
||||
|
||||
TouchFiles: []string{control.ResolverFile},
|
||||
CopyContextFiles: []string{core.Executable},
|
||||
})
|
||||
return stt
|
||||
|
|
|
|||
|
|
@ -35,9 +35,9 @@ type MetaStorage interface {
|
|||
// Any other metadata with the same key is deleted.
|
||||
Set(key MetaKey, value any) error
|
||||
|
||||
// Add serializes values and stores each as associated with the provided key.
|
||||
// Already existing metadata is left intact.
|
||||
Add(key MetaKey, values ...any) error
|
||||
// Set serializes values and stores them with the provided key.
|
||||
// Any other metadata with the same key is deleted.
|
||||
SetAll(key MetaKey, values ...any) error
|
||||
|
||||
// Purge removes all metadata, regardless of key.
|
||||
Purge() error
|
||||
|
|
@ -163,13 +163,19 @@ func (s *storage) Set(key MetaKey, value any) error {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *storage) Add(key MetaKey, values ...any) error {
|
||||
func (s *storage) SetAll(key MetaKey, values ...any) error {
|
||||
table, err := s.SQL.QueryTable(true, models.MetadataTable)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return table.Transaction(func(tx *gorm.DB) error {
|
||||
// delete the old values
|
||||
status := tx.Where(&models.Metadatum{Slug: s.Slug, Key: string(key)}).Delete(&models.Metadatum{})
|
||||
if err := status.Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
bytes, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -82,6 +82,29 @@ func hasAnyPrefix(candidate string, prefixes []string) bool {
|
|||
)
|
||||
}
|
||||
|
||||
var PrefixConfigKey MetaKey = "prefix"
|
||||
|
||||
// Prefixes returns the prefixes for the instance
|
||||
func (wisski *WissKI) PrefixesCached() (results []string, err error) {
|
||||
err = wisski.Metadata().GetAll(PrefixConfigKey, func(index, total int) any {
|
||||
if results == nil {
|
||||
results = make([]string, total)
|
||||
}
|
||||
return &results[index]
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// UpdatePrefixes updates the cached prefixes of this instance
|
||||
func (wisski *WissKI) UpdatePrefixes() error {
|
||||
prefixes, err := wisski.Prefixes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return wisski.Metadata().SetAll(PrefixConfigKey, slicesx.AsAny(prefixes)...)
|
||||
}
|
||||
|
||||
// PrefixConfig returns the prefix config belonging to this instance.
|
||||
func (wisski *WissKI) PrefixConfig() (config string, err error) {
|
||||
// if the user requested to skip the prefix, then don't do anything with it!
|
||||
|
|
|
|||
|
|
@ -8,10 +8,6 @@ import (
|
|||
"github.com/tkw1536/goprogram/lib/reflectx"
|
||||
)
|
||||
|
||||
//
|
||||
// META
|
||||
//
|
||||
|
||||
var metaCache sync.Map
|
||||
|
||||
// getMeta gets the component belonging to a component type
|
||||
|
|
@ -118,7 +114,7 @@ func filterSubtype(sliceS reflect.Value, iface reflect.Type) reflect.Value {
|
|||
result := reflect.MakeSlice(reflect.SliceOf(iface), 0, len)
|
||||
for i := 0; i < len; i++ {
|
||||
element := sliceS.Index(i)
|
||||
if element.Type().Implements(iface) {
|
||||
if element.Elem().Type().Implements(iface) {
|
||||
result = reflect.Append(result, element.Elem().Convert(iface))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,15 +2,14 @@ package resolver
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/FAU-CDI/wdresolve"
|
||||
"github.com/FAU-CDI/wdresolve/resolvers"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component/control"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component/instances"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/lazy"
|
||||
"github.com/tkw1536/goprogram/stream"
|
||||
)
|
||||
|
|
@ -18,13 +17,13 @@ import (
|
|||
type Resolver struct {
|
||||
component.ComponentBase
|
||||
|
||||
Control *control.Control
|
||||
ResolverFile string
|
||||
Instances *instances.Instances
|
||||
|
||||
handler lazy.Lazy[wdresolve.ResolveHandler]
|
||||
prefixes lazy.Lazy[map[string]string] // cached prefixes (from the server)
|
||||
handler lazy.Lazy[wdresolve.ResolveHandler] // handler
|
||||
}
|
||||
|
||||
func (Resolver) Name() string { return "resolver" }
|
||||
func (*Resolver) Name() string { return "resolver" }
|
||||
|
||||
func (resolver *Resolver) Routes() []string { return []string{"/go/", "/wisski/get/"} }
|
||||
|
||||
|
|
@ -50,35 +49,48 @@ func (resolver *Resolver) Handler(route string, io stream.IOStream) (http.Handle
|
|||
io.Printf("registering legacy domain %s\n", domain)
|
||||
}
|
||||
|
||||
configPath := resolver.ConfigPath()
|
||||
{
|
||||
// load the prefix path!
|
||||
var fs fs.File
|
||||
fs, err = resolver.Environment.Open(configPath)
|
||||
io.Println("loading prefixes from ", configPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer fs.Close()
|
||||
|
||||
// read the file
|
||||
var prefixes resolvers.Prefix
|
||||
prefixes, err = resolvers.ReadPrefixes(fs)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// and use that as the resolver!
|
||||
// resolve the prefixes
|
||||
p.Resolver = resolvers.InOrder{
|
||||
prefixes,
|
||||
resolver,
|
||||
fallback,
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
}), err
|
||||
}
|
||||
|
||||
func (resolver *Resolver) ConfigPath() string {
|
||||
return filepath.Join(resolver.Control.Path(), resolver.ResolverFile)
|
||||
func (resolver *Resolver) Target(uri string) string {
|
||||
return wdresolve.PrefixTarget(resolver, uri)
|
||||
}
|
||||
|
||||
// allow reloading prefixes from the server every minute
|
||||
const prefixesRefresh = time.Minute
|
||||
|
||||
func (resolver *Resolver) Prefixes() (prefixes map[string]string) {
|
||||
// reset the prefixes after a specific time, but only if requested
|
||||
resolver.prefixes.ResetAfter(prefixesRefresh)
|
||||
return resolver.prefixes.Get(resolver.freshPrefixes)
|
||||
}
|
||||
|
||||
func (resolver *Resolver) freshPrefixes() map[string]string {
|
||||
instances, err := resolver.Instances.All()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
gPrefixes := make(map[string]string)
|
||||
for _, instance := range instances {
|
||||
url := instance.URL().String()
|
||||
|
||||
// failed to fetch prefixes for this particular instance
|
||||
// => skip it!
|
||||
prefixes, err := instance.PrefixesCached()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, p := range prefixes {
|
||||
gPrefixes[url] = p
|
||||
}
|
||||
}
|
||||
return gPrefixes
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ const ConfigFile = ".env"
|
|||
const OverridesJSON = "overrides.json"
|
||||
|
||||
// DefaultOverridesJSON contains a template for a new 'overrides.json' file
|
||||
//
|
||||
//go:embed bootstrap/overrides.json
|
||||
var DefaultOverridesJSON []byte
|
||||
|
||||
|
|
@ -28,6 +29,7 @@ var DefaultOverridesJSON []byte
|
|||
const ResolverBlockedTXT = "resolver-blocked.txt"
|
||||
|
||||
// ResolverBlockTXT contains a template for 'resolver-blocked' file
|
||||
//
|
||||
//go:embed bootstrap/resolver-blocked.txt
|
||||
var DefaultResolverBlockedTXT []byte
|
||||
|
||||
|
|
@ -35,9 +37,6 @@ var DefaultResolverBlockedTXT []byte
|
|||
const AuthorizedKeys = "authorized_keys"
|
||||
|
||||
// DefaultAuthorizedKeys contains a template for a new 'global_authorized_keys' file
|
||||
//
|
||||
//go:embed bootstrap/global_authorized_keys
|
||||
var DefaultAuthorizedKeys []byte
|
||||
|
||||
// PrefixConfig is the name for the global resolver prefix configuration.
|
||||
// It should be found within the prefix component directory.
|
||||
const PrefixConfig = "prefix.cfg"
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/FAU-CDI/wisski-distillery/internal/component/ssh"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component/triplestore"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/component/web"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/core"
|
||||
)
|
||||
|
||||
// register returns all components of the distillery
|
||||
|
|
@ -43,13 +42,9 @@ func (dis *Distillery) register(context *component.PoolContext) []component.Comp
|
|||
ra[*snapshots.Pathbuilders](dis, context),
|
||||
|
||||
// Control server
|
||||
r(dis, context, func(control *control.Control) {
|
||||
control.ResolverFile = core.PrefixConfig
|
||||
}),
|
||||
ra[*control.Control](dis, context),
|
||||
ra[*control.SelfHandler](dis, context),
|
||||
r(dis, context, func(resolver *resolver.Resolver) {
|
||||
resolver.ResolverFile = core.PrefixConfig
|
||||
}),
|
||||
ra[*resolver.Resolver](dis, context),
|
||||
ra[*control.Info](dis, context),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
package lazy
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Lazy is an object that a lazily-initialized value of type T.
|
||||
//
|
||||
|
|
@ -8,6 +11,9 @@ import "sync"
|
|||
type Lazy[T any] struct {
|
||||
once sync.Once
|
||||
value T
|
||||
|
||||
m sync.RWMutex // m protects resetting this lazy
|
||||
lastReset time.Time // last time this mutex was reset
|
||||
}
|
||||
|
||||
// Get returns the value associated with this Lazy.
|
||||
|
|
@ -20,8 +26,53 @@ type Lazy[T any] struct {
|
|||
//
|
||||
// Get may safely be called concurrently.
|
||||
func (lazy *Lazy[T]) Get(init func() T) T {
|
||||
lazy.m.RLock()
|
||||
defer lazy.m.RUnlock()
|
||||
|
||||
lazy.once.Do(func() {
|
||||
lazy.value = init()
|
||||
})
|
||||
return lazy.value
|
||||
}
|
||||
|
||||
// Reset resets this Lazy, deleting any previously associated value.
|
||||
//
|
||||
// May be called concurrently with [Get].
|
||||
// Future calls to [Get] will invoke init.
|
||||
func (lazy *Lazy[T]) Reset() {
|
||||
lazy.m.Lock()
|
||||
defer lazy.m.Unlock()
|
||||
|
||||
lazy.reset()
|
||||
}
|
||||
|
||||
// ResetAfter resets this lazy if more than d time has passed since the last reset.
|
||||
// If ResetAfter cannot lock, then it does not reset.
|
||||
//
|
||||
// May be called concurrently with other functions on this lazy.
|
||||
func (lazy *Lazy[T]) ResetAfter(d time.Duration) {
|
||||
if !lazy.m.TryLock() {
|
||||
return
|
||||
}
|
||||
defer lazy.m.Unlock()
|
||||
|
||||
if time.Since(lazy.lastReset) < d {
|
||||
return
|
||||
}
|
||||
|
||||
lazy.reset()
|
||||
}
|
||||
|
||||
// reset implements resetting this lazy.
|
||||
// m myst be held for writing.
|
||||
func (lazy *Lazy[T]) reset() {
|
||||
// reset the once
|
||||
lazy.once = sync.Once{}
|
||||
|
||||
// reset the value
|
||||
var t T
|
||||
lazy.value = t
|
||||
|
||||
// time of the last reset
|
||||
lazy.lastReset = time.Now()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,15 @@ func Filter[T any](values []T, filter func(T) bool) []T {
|
|||
return results
|
||||
}
|
||||
|
||||
// AsAny returns the same slice as values, but as any
|
||||
func AsAny[T any](values []T) []any {
|
||||
results := make([]any, len(values))
|
||||
for i, v := range values {
|
||||
results[i] = v
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
// Partition partitions values in T by the given functions.
|
||||
func Partition[T any, P comparable](values []T, partition func(value T) P) map[P][]T {
|
||||
result := make(map[P][]T)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue