pkg/{timex,wait}: Unify code
This commit is contained in:
parent
59fff07b59
commit
8701fab93b
10 changed files with 77 additions and 74 deletions
1
go.mod
1
go.mod
|
|
@ -12,7 +12,6 @@ require (
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/tkw1536/goprogram v0.1.1
|
github.com/tkw1536/goprogram v0.1.1
|
||||||
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741
|
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741
|
||||||
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458
|
|
||||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
|
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
|
||||||
gorm.io/driver/mysql v1.3.6
|
gorm.io/driver/mysql v1.3.6
|
||||||
gorm.io/gorm v1.23.10
|
gorm.io/gorm v1.23.10
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -27,8 +27,6 @@ github.com/tkw1536/goprogram v0.1.1 h1:gamK9OuRqoX2yQlA/nkgfVHHZWd/u2uUj6vJMYrYa
|
||||||
github.com/tkw1536/goprogram v0.1.1/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE=
|
github.com/tkw1536/goprogram v0.1.1/go.mod h1:Jqs0sTMzhrAGCX3JQrlEwQ0WRWQACCvuQQkaBDp65pE=
|
||||||
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 h1:fGZugkZk2UgYBxtpKmvub51Yno1LJDeEsRp2xGD+0gY=
|
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741 h1:fGZugkZk2UgYBxtpKmvub51Yno1LJDeEsRp2xGD+0gY=
|
||||||
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
|
golang.org/x/exp v0.0.0-20221004215720-b9f4876ce741/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
|
||||||
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458 h1:MgJ6t2zo8v0tbmLCueaCbF1RM+TtB0rs3Lv8DGtOIpY=
|
|
||||||
golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
|
|
||||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
|
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
|
||||||
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
|
|
||||||
|
|
@ -15,12 +15,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) {
|
func (home *Home) updateInstances(ctx context.Context, io stream.IOStream) {
|
||||||
timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) {
|
go func() {
|
||||||
|
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||||
io.Printf("[%s]: reloading instance list\n", t.Format(time.Stamp))
|
io.Printf("[%s]: reloading instance list\n", t.Format(time.Stamp))
|
||||||
|
|
||||||
names, _ := home.instanceMap()
|
names, _ := home.instanceMap()
|
||||||
home.instanceNames.Set(names)
|
home.instanceNames.Set(names)
|
||||||
})
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (home *Home) instanceMap() (map[string]struct{}, error) {
|
func (home *Home) instanceMap() (map[string]struct{}, error) {
|
||||||
|
|
@ -37,12 +39,14 @@ func (home *Home) instanceMap() (map[string]struct{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
|
func (home *Home) updateRender(ctx context.Context, io stream.IOStream) {
|
||||||
timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) {
|
go func() {
|
||||||
|
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||||
io.Printf("[%s]: reloading home render\n", t.Format(time.Stamp))
|
io.Printf("[%s]: reloading home render\n", t.Format(time.Stamp))
|
||||||
|
|
||||||
bytes, _ := home.homeRender()
|
bytes, _ := home.homeRender()
|
||||||
home.homeBytes.Set(bytes)
|
home.homeBytes.Set(bytes)
|
||||||
})
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
//go:embed "home.html"
|
//go:embed "home.html"
|
||||||
|
|
|
||||||
|
|
@ -12,11 +12,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) {
|
func (home *Home) updateRedirect(ctx context.Context, io stream.IOStream) {
|
||||||
timex.SetInterval(ctx, home.RefreshInterval, func(t time.Time) {
|
go func() {
|
||||||
|
for t := range timex.TickContext(ctx, home.RefreshInterval) {
|
||||||
io.Printf("[%s]: reloading overrides\n", t.Format(time.Stamp))
|
io.Printf("[%s]: reloading overrides\n", t.Format(time.Stamp))
|
||||||
|
|
||||||
redirect, _ := home.loadRedirect()
|
redirect, _ := home.loadRedirect()
|
||||||
home.redirect.Set(&redirect)
|
home.redirect.Set(&redirect)
|
||||||
})
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (home *Home) loadRedirect() (redirect Redirect, err error) {
|
func (home *Home) loadRedirect() (redirect Redirect, err error) {
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,13 @@ import (
|
||||||
|
|
||||||
// updatePrefixes starts updating prefixes
|
// updatePrefixes starts updating prefixes
|
||||||
func (resolver *Resolver) updatePrefixes(io stream.IOStream, ctx context.Context) {
|
func (resolver *Resolver) updatePrefixes(io stream.IOStream, ctx context.Context) {
|
||||||
timex.SetInterval(ctx, resolver.RefreshInterval, func(t time.Time) {
|
go func() {
|
||||||
|
for t := range timex.TickContext(ctx, resolver.RefreshInterval) {
|
||||||
io.Printf("[%s]: reloading prefixes\n", t.Format(time.Stamp))
|
io.Printf("[%s]: reloading prefixes\n", t.Format(time.Stamp))
|
||||||
prefixes, _ := resolver.AllPrefixes()
|
prefixes, _ := resolver.AllPrefixes()
|
||||||
resolver.prefixes.Set(prefixes)
|
resolver.prefixes.Set(prefixes)
|
||||||
})
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AllPrefixes returns a list of all prefixes from the server.
|
// AllPrefixes returns a list of all prefixes from the server.
|
||||||
|
|
|
||||||
|
|
@ -6,16 +6,16 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
mysqldriver "github.com/go-sql-driver/mysql"
|
mysqldriver "github.com/go-sql-driver/mysql"
|
||||||
"github.com/tkw1536/goprogram/stream"
|
|
||||||
|
|
||||||
"gorm.io/driver/mysql"
|
"gorm.io/driver/mysql"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"gorm.io/gorm/logger"
|
"gorm.io/gorm/logger"
|
||||||
|
|
||||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/wait"
|
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||||
)
|
)
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
@ -42,10 +42,10 @@ func (sql *SQL) Exec(query string, args ...interface{}) error {
|
||||||
|
|
||||||
// WaitExec waits for the query interface to be able to connect to the database
|
// WaitExec waits for the query interface to be able to connect to the database
|
||||||
func (sql *SQL) WaitExec() error {
|
func (sql *SQL) WaitExec() error {
|
||||||
return wait.Wait(func() bool {
|
return timex.TickUntilFunc(func(time.Time) bool {
|
||||||
err := sql.Exec("select 1;")
|
err := sql.Exec("select 1;")
|
||||||
return err == nil
|
return err == nil
|
||||||
}, sql.PollInterval, sql.PollContext)
|
}, sql.PollContext, sql.PollInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
@ -91,12 +91,10 @@ func (sql *SQL) QueryTable(silent bool, table string) (*gorm.DB, error) {
|
||||||
// WaitQueryTable waits for a connection to succeed via QueryTable
|
// WaitQueryTable waits for a connection to succeed via QueryTable
|
||||||
func (sql *SQL) WaitQueryTable() error {
|
func (sql *SQL) WaitQueryTable() error {
|
||||||
// TODO: Establish a convention on when to wait for this!
|
// TODO: Establish a convention on when to wait for this!
|
||||||
n := stream.FromNil()
|
return timex.TickUntilFunc(func(time.Time) bool {
|
||||||
return wait.Wait(func() bool {
|
|
||||||
_, err := sql.QueryTable(true, models.InstanceTable)
|
_, err := sql.QueryTable(true, models.InstanceTable)
|
||||||
n.EPrintf("[SQL.WaitQueryTable]: %s\n", err)
|
|
||||||
return err == nil
|
return err == nil
|
||||||
}, sql.PollInterval, sql.PollContext)
|
}, sql.PollContext, sql.PollInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,12 @@ package sql
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
"github.com/FAU-CDI/wisski-distillery/internal/models"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
"github.com/FAU-CDI/wisski-distillery/pkg/logging"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/sqle"
|
"github.com/FAU-CDI/wisski-distillery/pkg/sqle"
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/wait"
|
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||||
"github.com/tkw1536/goprogram/exit"
|
"github.com/tkw1536/goprogram/exit"
|
||||||
"github.com/tkw1536/goprogram/stream"
|
"github.com/tkw1536/goprogram/stream"
|
||||||
)
|
)
|
||||||
|
|
@ -22,11 +23,10 @@ func (sql *SQL) Shell(io stream.IOStream, argv ...string) (int, error) {
|
||||||
// unsafeWaitShell waits for a connection via the database shell to succeed
|
// unsafeWaitShell waits for a connection via the database shell to succeed
|
||||||
func (sql *SQL) unsafeWaitShell() error {
|
func (sql *SQL) unsafeWaitShell() error {
|
||||||
n := stream.FromNil()
|
n := stream.FromNil()
|
||||||
return wait.Wait(func() bool {
|
return timex.TickUntilFunc(func(time.Time) bool {
|
||||||
code, err := sql.Shell(n, "-e", "select 1;")
|
code, err := sql.Shell(n, "-e", "select 1;")
|
||||||
n.EPrintf("[SQL.unsafeWaitShell]: %d %s\n", code, err)
|
|
||||||
return err == nil && code == 0
|
return err == nil && code == 0
|
||||||
}, sql.PollInterval, sql.PollContext)
|
}, sql.PollContext, sql.PollInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unsafeQuery shell executes a raw database query.
|
// unsafeQuery shell executes a raw database query.
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,9 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/FAU-CDI/wisski-distillery/pkg/wait"
|
"github.com/FAU-CDI/wisski-distillery/pkg/timex"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/tkw1536/goprogram/stream"
|
"github.com/tkw1536/goprogram/stream"
|
||||||
)
|
)
|
||||||
|
|
@ -87,7 +88,7 @@ func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName str
|
||||||
// This is achieved using a polling strategy.
|
// This is achieved using a polling strategy.
|
||||||
func (ts Triplestore) Wait() error {
|
func (ts Triplestore) Wait() error {
|
||||||
n := stream.FromNil()
|
n := stream.FromNil()
|
||||||
return wait.Wait(func() bool {
|
return timex.TickUntilFunc(func(time.Time) bool {
|
||||||
res, err := ts.OpenRaw("GET", "/rest/repositories", nil, "", "")
|
res, err := ts.OpenRaw("GET", "/rest/repositories", nil, "", "")
|
||||||
n.EPrintf("[Triplestore.Wait]: %s\n", err)
|
n.EPrintf("[Triplestore.Wait]: %s\n", err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -95,7 +96,7 @@ func (ts Triplestore) Wait() error {
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
return true
|
return true
|
||||||
}, ts.PollInterval, ts.PollContext)
|
}, ts.PollContext, ts.PollInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TriplestorePurgeUser deletes the specified user from the triplestore
|
// TriplestorePurgeUser deletes the specified user from the triplestore
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
// Package timex provides Interval and Wait
|
||||||
package timex
|
package timex
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
@ -5,21 +6,49 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetInterval invokes f with the current time and then spawns a new goroutine that runs f every d, until context is closed.
|
// TickContext is like [time.Tick], but closes the returned channel once the context closes.
|
||||||
func SetInterval(ctx context.Context, d time.Duration, f func(t time.Time)) {
|
// As such it can be recovered by the garbage collector; see [time.TickContext].
|
||||||
f(time.Now())
|
//
|
||||||
|
// Unlike [time.Tick], immediatly send the current time on the given channel.
|
||||||
|
func TickContext(c context.Context, d time.Duration) <-chan time.Time {
|
||||||
|
if d < 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := make(chan time.Time, 1)
|
||||||
|
timer <- time.Now()
|
||||||
go func() {
|
go func() {
|
||||||
t := time.NewTicker(d)
|
t := time.NewTicker(d)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
defer close(timer)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case tick := <-t.C:
|
case tick := <-t.C:
|
||||||
f(tick)
|
timer <- tick
|
||||||
case <-ctx.Done():
|
case <-c.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
return timer
|
||||||
|
}
|
||||||
|
|
||||||
|
// TickUntilFunc invokes f every d until either context is closed, or f returns true.
|
||||||
|
// f is invoked once immediatly when the timer starts.
|
||||||
|
//
|
||||||
|
// TickUntilFunc blocks until f is no longer invoked.
|
||||||
|
//
|
||||||
|
// Returns the error of the context (if any).
|
||||||
|
func TickUntilFunc(f func(t time.Time) bool, c context.Context, d time.Duration) error {
|
||||||
|
context, cancel := context.WithCancel(c)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for t := range TickContext(context, d) {
|
||||||
|
if f(t) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return c.Err()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
package wait
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Wait repeatedly invokes f, until it returns true or the context is closed.
|
|
||||||
// The invocation interval is determined by interval.
|
|
||||||
func Wait(f func() bool, interval time.Duration, context context.Context) error {
|
|
||||||
// create a new timer
|
|
||||||
timer := time.NewTimer(interval)
|
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
defer timer.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if f() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset the timer, and wait for it again!
|
|
||||||
timer.Reset(interval)
|
|
||||||
select {
|
|
||||||
case <-timer.C:
|
|
||||||
case <-context.Done():
|
|
||||||
return context.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue