Add cron tasks to distillery
This commit is contained in:
parent
790460f9de
commit
f52fe6abf3
19 changed files with 353 additions and 141 deletions
|
|
@ -1,8 +1,11 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/bootstrap"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
|
|
@ -14,6 +17,7 @@ type Control struct {
|
|||
component.Base
|
||||
|
||||
Servables []component.Servable
|
||||
Cronables []component.Cronable
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
@ -28,7 +32,7 @@ func (control Control) Path() string {
|
|||
var resources embed.FS
|
||||
|
||||
func (control *Control) Stack(env environment.Environment) component.StackWithResources {
|
||||
stt := component.MakeStack(control, env, component.StackWithResources{
|
||||
return component.MakeStack(control, env, component.StackWithResources{
|
||||
Resources: resources,
|
||||
ContextPath: "control",
|
||||
EnvPath: "control.env",
|
||||
|
|
@ -48,7 +52,11 @@ func (control *Control) Stack(env environment.Environment) component.StackWithRe
|
|||
|
||||
CopyContextFiles: []string{bootstrap.Executable},
|
||||
})
|
||||
return stt
|
||||
}
|
||||
|
||||
// Trigger triggers the active cron run to immediatly invoke cron.
|
||||
func (control *Control) Trigger(ctx context.Context, env environment.Environment) error {
|
||||
return control.Stack(env).Kill(ctx, io.Discard, "control", syscall.SIGHUP)
|
||||
}
|
||||
|
||||
func (control Control) Context(parent component.InstallationContext) component.InstallationContext {
|
||||
|
|
|
|||
138
internal/dis/component/control/cron/cron.go
Normal file
138
internal/dis/component/control/cron/cron.go
Normal file
|
|
@ -0,0 +1,138 @@
|
|||
package cron
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type Cron struct {
|
||||
component.Base
|
||||
|
||||
Tasks []component.Cronable
|
||||
}
|
||||
|
||||
// Listen returns a channel that listens for triggers in the current process.
|
||||
// It is intended to be passed to Start.
|
||||
func (control *Cron) Listen(ctx context.Context) (<-chan struct{}, func()) {
|
||||
var (
|
||||
signals = make(chan os.Signal, 1)
|
||||
notify = make(chan struct{}, 1)
|
||||
)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-signals:
|
||||
notify <- struct{}{}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
signal.Notify(signals, syscall.SIGHUP)
|
||||
return notify, func() {
|
||||
signal.Ignore(syscall.SIGHUP)
|
||||
}
|
||||
}
|
||||
|
||||
// Once immediatly runs all cron jobs in the current thread.
|
||||
// Once returns once all cron jobs have returned.
|
||||
//
|
||||
// Once should not be called concurrently with Cron.
|
||||
func (control *Cron) Once(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(control.Tasks))
|
||||
|
||||
zerolog.Ctx(ctx).Info().Time("time", time.Now()).Msg("Starting Cron")
|
||||
|
||||
for _, task := range control.Tasks {
|
||||
go func(task component.Cronable) {
|
||||
defer wg.Done()
|
||||
|
||||
name := task.TaskName()
|
||||
|
||||
start := time.Now()
|
||||
zerolog.Ctx(ctx).Info().Str("task", name).Time("time", start).Msg("Calling Cron()")
|
||||
|
||||
panicked, panik, err := func() (panicked bool, panik any, err error) {
|
||||
defer func() {
|
||||
panik = recover()
|
||||
}()
|
||||
|
||||
panicked = true
|
||||
err = task.Cron(ctx)
|
||||
panicked = false
|
||||
|
||||
return
|
||||
}()
|
||||
|
||||
took := time.Since(start)
|
||||
|
||||
switch {
|
||||
case !panicked:
|
||||
zerolog.Ctx(ctx).Err(err).Str("task", name).Dur("took", took).Msg("Finished Cron()")
|
||||
case panicked:
|
||||
zerolog.Ctx(ctx).Error().Str("task", name).Dur("took", took).Str("panic", fmt.Sprint(panik)).Msg("Finished Cron()")
|
||||
}
|
||||
}(task)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
zerolog.Ctx(ctx).Info().Time("time", time.Now()).Msg("Finished Cron")
|
||||
}
|
||||
|
||||
// Start invokes all cron jobs regularly, waiting interval between invocations.
|
||||
//
|
||||
// The first run is invoked immediatly.
|
||||
// The call to Start returns after the first invocation of all cron tasks.
|
||||
//
|
||||
// The returned channel is closed once no more cron tasks are active.
|
||||
func (control *Cron) Start(ctx context.Context, interval time.Duration, signal <-chan struct{}) <-chan struct{} {
|
||||
// run runs cron tasks with the configured timeout
|
||||
run := func() {
|
||||
ctx, done := context.WithTimeout(ctx, interval)
|
||||
defer done()
|
||||
|
||||
control.Once(ctx)
|
||||
}
|
||||
|
||||
cleanup := make(chan struct{}) // closed once we have finished running everything
|
||||
|
||||
run() // run tasks immediatly
|
||||
|
||||
// start a new xgoroutine to run cron tasks
|
||||
go func() {
|
||||
defer close(cleanup)
|
||||
|
||||
timer := timex.NewTimer()
|
||||
for {
|
||||
timex.StopTimer(timer)
|
||||
timer.Reset(interval)
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
zerolog.Ctx(ctx).Debug().Msg("Cron() timer fired")
|
||||
case <-signal:
|
||||
zerolog.Ctx(ctx).Debug().Msg("Cron() received signal")
|
||||
case <-ctx.Done():
|
||||
timex.StopTimer(timer)
|
||||
return
|
||||
}
|
||||
|
||||
run()
|
||||
}
|
||||
}()
|
||||
|
||||
// and return the cleanup channel
|
||||
return cleanup
|
||||
}
|
||||
76
internal/dis/component/control/home/cron.go
Normal file
76
internal/dis/component/control/home/cron.go
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
package home
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
)
|
||||
|
||||
type UpdateInstanceList struct {
|
||||
component.Base
|
||||
Home *Home
|
||||
}
|
||||
|
||||
var (
|
||||
_ component.Cronable = (*UpdateInstanceList)(nil)
|
||||
)
|
||||
|
||||
func (*UpdateInstanceList) TaskName() string {
|
||||
return "instance list"
|
||||
}
|
||||
|
||||
func (ul *UpdateInstanceList) Cron(ctx context.Context) error {
|
||||
names, err := ul.Home.instanceMap(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ul.Home.instanceNames.Set(names)
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateRedirect struct {
|
||||
component.Base
|
||||
Home *Home
|
||||
}
|
||||
|
||||
var (
|
||||
_ component.Cronable = (*UpdateRedirect)(nil)
|
||||
)
|
||||
|
||||
func (ur *UpdateRedirect) TaskName() string {
|
||||
return "redirect"
|
||||
}
|
||||
|
||||
func (ur *UpdateRedirect) Cron(ctx context.Context) error {
|
||||
redirect, err := ur.Home.loadRedirect(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ur.Home.redirect.Set(&redirect)
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateHome struct {
|
||||
component.Base
|
||||
Home *Home
|
||||
}
|
||||
|
||||
var (
|
||||
_ component.Cronable = (*UpdateHome)(nil)
|
||||
)
|
||||
|
||||
func (ur *UpdateHome) TaskName() string {
|
||||
return "home render"
|
||||
}
|
||||
|
||||
func (ur *UpdateHome) Cron(ctx context.Context) error {
|
||||
bytes, err := ur.Home.homeRender(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ur.Home.homeBytes.Set(bytes)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -3,9 +3,7 @@ package home
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/instances"
|
||||
|
|
@ -17,8 +15,6 @@ type Home struct {
|
|||
|
||||
Instances *instances.Instances
|
||||
|
||||
RefreshInterval time.Duration
|
||||
|
||||
redirect lazy.Lazy[*Redirect]
|
||||
instanceNames lazy.Lazy[map[string]struct{}]
|
||||
homeBytes lazy.Lazy[[]byte]
|
||||
|
|
@ -30,10 +26,7 @@ var (
|
|||
|
||||
func (*Home) Routes() []string { return []string{"/"} }
|
||||
|
||||
func (home *Home) Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error) {
|
||||
home.updateRedirect(ctx, progress)
|
||||
home.updateInstances(ctx, progress)
|
||||
home.updateRender(ctx, progress)
|
||||
func (home *Home) Handler(ctx context.Context, route string) (http.Handler, error) {
|
||||
return home, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,42 +3,15 @@ package home
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
_ "embed"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component/control/static"
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/status"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (home *Home) updateInstances(ctx context.Context, progress io.Writer) {
|
||||
go func() {
|
||||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
logging.ProgressF(progress, ctx, "[%s]: reloading instance list\n", t.Format(time.Stamp))
|
||||
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
names, err := home.instanceMap(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.instanceNames.Set(names)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
logging.ProgressF(progress, ctx, "error reloading instances: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (home *Home) instanceMap(ctx context.Context) (map[string]struct{}, error) {
|
||||
wissKIs, err := home.Instances.All(ctx)
|
||||
if err != nil {
|
||||
|
|
@ -52,30 +25,6 @@ func (home *Home) instanceMap(ctx context.Context) (map[string]struct{}, error)
|
|||
return names, nil
|
||||
}
|
||||
|
||||
func (home *Home) updateRender(ctx context.Context, progress io.Writer) {
|
||||
go func() {
|
||||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
logging.ProgressF(progress, ctx, "[%s]: reloading home render list\n", t.Format(time.Stamp))
|
||||
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
bytes, err := home.homeRender(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.homeBytes.Set(bytes)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
logging.ProgressF(progress, ctx, "error reloading instances: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
//go:embed "home.html"
|
||||
var homeHTMLStr string
|
||||
var homeTemplate = static.AssetsHomeHome.MustParseShared("home.html", homeHTMLStr)
|
||||
|
|
|
|||
|
|
@ -3,40 +3,10 @@ package home
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||
)
|
||||
|
||||
func (home *Home) updateRedirect(ctx context.Context, progress io.Writer) {
|
||||
go func() {
|
||||
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||
logging.ProgressF(progress, ctx, "[%s]: reloading overrides\n", t.Format(time.Stamp))
|
||||
|
||||
err := (func() error {
|
||||
ctx, cancel := context.WithTimeout(ctx, home.RefreshInterval)
|
||||
defer cancel()
|
||||
|
||||
redirect, err := home.loadRedirect(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
home.redirect.Set(&redirect)
|
||||
return nil
|
||||
})()
|
||||
if err != nil {
|
||||
logging.ProgressF(progress, ctx, "error reloading overrides: %s", err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (home *Home) loadRedirect(ctx context.Context) (redirect Redirect, err error) {
|
||||
if redirect.Overrides == nil {
|
||||
redirect.Overrides = make(map[string]string)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package info
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/internal/dis/component"
|
||||
|
|
@ -33,7 +32,7 @@ var (
|
|||
|
||||
func (*Info) Routes() []string { return []string{"/dis/"} }
|
||||
|
||||
func (info *Info) Handler(ctx context.Context, route string, progress io.Writer) (handler http.Handler, err error) {
|
||||
func (info *Info) Handler(ctx context.Context, route string) (handler http.Handler, err error) {
|
||||
router := mux.NewRouter()
|
||||
{
|
||||
socket := &httpx.WebSocket{
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Server returns an http.Mux that implements the main server instance.
|
||||
|
|
@ -19,8 +19,8 @@ func (control *Control) Server(ctx context.Context, progress io.Writer) (*http.S
|
|||
// add all the servable routes!
|
||||
for _, s := range control.Servables {
|
||||
for _, route := range s.Routes() {
|
||||
logging.ProgressF(progress, ctx, "mounting %s\n", route)
|
||||
handler, err := s.Handler(ctx, route, progress)
|
||||
zerolog.Ctx(ctx).Info().Str("component", s.Name()).Str("route", route).Msg("mounting route")
|
||||
handler, err := s.Handler(ctx, route)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ package static
|
|||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
|
||||
|
|
@ -24,7 +23,7 @@ func (*Static) Routes() []string { return []string{"/static/"} }
|
|||
//go:embed dist
|
||||
var staticFS embed.FS
|
||||
|
||||
func (static *Static) Handler(ctx context.Context, route string, progress io.Writer) (http.Handler, error) {
|
||||
func (static *Static) Handler(ctx context.Context, route string) (http.Handler, error) {
|
||||
// take the filesystem
|
||||
fs, err := fs.Sub(staticFS, "dist")
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue