Use environment DialContext and Listen everywhere

This commit is contained in:
Tom Wiesing 2022-09-19 12:42:33 +02:00
parent f19619ef9f
commit b0d3c686ba
No known key found for this signature in database
11 changed files with 64 additions and 30 deletions

View file

@ -5,7 +5,6 @@
- Why a factory? - Why a factory?
- First steps after provisioning - First steps after provisioning
- Use `environment.Dial()` and `environment.Listen()`
- Move `provision_entrypoint.sh` into go - Move `provision_entrypoint.sh` into go
- Enhance Snapshots - Enhance Snapshots
- Export the Docker Images - Export the Docker Images

View file

@ -32,13 +32,22 @@ var errServerListen = exit.Error{
} }
func (s server) Run(context wisski_distillery.Context) error { func (s server) Run(context wisski_distillery.Context) error {
handler, err := context.Environment.Dis().Server(context.IOStream) dis := context.Environment
handler, err := dis.Control().Server(context.IOStream)
if err != nil { if err != nil {
return err return err
} }
context.Printf("Listening on %s\n", s.Bind) context.Printf("Listening on %s\n", s.Bind)
err = http.ListenAndServe(s.Bind, http.StripPrefix(s.Prefix, handler))
// make a new listener
listener, err := dis.Core.Environment.Listen("tcp", s.Bind)
if err != nil {
return errServerListen.Wrap(err)
}
// and serve that listener
err = http.Serve(listener, http.StripPrefix(s.Prefix, handler))
if err == nil { if err == nil {
return nil return nil
} }

View file

@ -38,7 +38,7 @@ func (upc updateprefixconfig) Run(context wisski_distillery.Context) error {
return errPrefixUpdateFailed.WithMessageF(err) return errPrefixUpdateFailed.WithMessageF(err)
} }
ddis := dis.Dis() ddis := dis.Control()
target := ddis.ResolverConfigPath() target := ddis.ResolverConfigPath()
// print the configuration // print the configuration

2
go.mod
View file

@ -7,6 +7,7 @@ require (
github.com/Showmax/go-fqdn v1.0.0 github.com/Showmax/go-fqdn v1.0.0
github.com/alessio/shellescape v1.4.1 github.com/alessio/shellescape v1.4.1
github.com/feiin/sqlstring v0.3.0 github.com/feiin/sqlstring v0.3.0
github.com/go-sql-driver/mysql v1.6.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/tkw1536/goprogram v0.0.12 github.com/tkw1536/goprogram v0.0.12
golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561
@ -16,7 +17,6 @@ require (
) )
require ( require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect

View file

@ -9,7 +9,7 @@ import (
"github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/environment"
) )
// Control represents the control server // Control represents the running control server.
type Control struct { type Control struct {
component.ComponentBase component.ComponentBase

View file

@ -1,9 +1,14 @@
package sql package sql
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"sync/atomic"
mysqldriver "github.com/go-sql-driver/mysql"
"github.com/FAU-CDI/wisski-distillery/internal/bookkeeping" "github.com/FAU-CDI/wisski-distillery/internal/bookkeeping"
"github.com/FAU-CDI/wisski-distillery/pkg/logging" "github.com/FAU-CDI/wisski-distillery/pkg/logging"
@ -15,13 +20,29 @@ import (
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
) )
var proxyNameCounter uint64
// registerDialingProxy registers a new custom network protocol with the underlying sql driver.
// The new protocol will call dialer with the provided network argument.
// The name of the new protocol is returned.
func registerDialingProxy(network string, dialer func(context context.Context, network, address string) (net.Conn, error)) (name string) {
name = fmt.Sprintf("sql-proxy-%d", atomic.AddUint64(&proxyNameCounter, 1))
mysqldriver.RegisterDialContext(name, func(ctx context.Context, addr string) (net.Conn, error) {
return dialer(ctx, network, addr)
})
return
}
// sqlOpen opens a new sql connection to the provided database using the administrative credentials // sqlOpen opens a new sql connection to the provided database using the administrative credentials
func (sql SQL) openDatabase(database string, config *gorm.Config) (*gorm.DB, error) { func (sql *SQL) openDatabase(database string, config *gorm.Config) (*gorm.DB, error) {
network := sql.sqlNetwork.Get(func() string {
return registerDialingProxy("tcp", sql.Core.Environment.DialContext)
})
cfg := mysql.Config{ cfg := mysql.Config{
DSN: fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True&loc=Local", sql.Config.MysqlAdminUser, sql.Config.MysqlAdminPassword, sql.ServerURL, database), DriverName: "mysql",
DSN: fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8&parseTime=True&loc=Local", sql.Config.MysqlAdminUser, sql.Config.MysqlAdminPassword, network, sql.ServerURL, database),
DefaultStringSize: 256, DefaultStringSize: 256,
} }
// TODO: Use sql.Core.Environment.Dial
db, err := gorm.Open(mysql.New(cfg), config) db, err := gorm.Open(mysql.New(cfg), config)
if err != nil { if err != nil {
@ -38,7 +59,7 @@ func (sql SQL) openDatabase(database string, config *gorm.Config) (*gorm.DB, err
} }
// OpenBookkeeping opens a connection to the bookkeeping database // OpenBookkeeping opens a connection to the bookkeeping database
func (sql SQL) OpenBookkeeping(silent bool) (*gorm.DB, error) { func (sql *SQL) OpenBookkeeping(silent bool) (*gorm.DB, error) {
config := &gorm.Config{} config := &gorm.Config{}
if silent { if silent {
@ -75,12 +96,12 @@ func (sql SQL) Snapshot(io stream.IOStream, dest io.Writer, database string) err
} }
// OpenShell executes a mysql shell command // OpenShell executes a mysql shell command
func (sql SQL) OpenShell(io stream.IOStream, argv ...string) (int, error) { func (sql *SQL) OpenShell(io stream.IOStream, argv ...string) (int, error) {
return sql.Stack(sql.Environment).Exec(io, "sql", "mysql", argv...) return sql.Stack(sql.Environment).Exec(io, "sql", "mysql", argv...)
} }
// WaitShell waits for the sql database to be reachable via a docker-compose shell // WaitShell waits for the sql database to be reachable via a docker-compose shell
func (sql SQL) WaitShell() error { func (sql *SQL) WaitShell() error {
n := stream.FromNil() n := stream.FromNil()
return wait.Wait(func() bool { return wait.Wait(func() bool {
code, err := sql.OpenShell(n, "-e", "show databases;") code, err := sql.OpenShell(n, "-e", "show databases;")
@ -89,7 +110,7 @@ func (sql SQL) WaitShell() error {
} }
// Wait waits for a connection to the bookkeeping table to suceed // Wait waits for a connection to the bookkeeping table to suceed
func (sql SQL) Wait() error { func (sql *SQL) Wait() error {
return wait.Wait(func() bool { return wait.Wait(func() bool {
_, err := sql.OpenBookkeeping(true) _, err := sql.OpenBookkeeping(true)
return err == nil return err == nil
@ -98,14 +119,14 @@ func (sql SQL) Wait() error {
var errInvalidDatabaseName = errors.New("SQLProvision: Invalid database name") var errInvalidDatabaseName = errors.New("SQLProvision: Invalid database name")
func (sql SQL) Query(query string, args ...interface{}) bool { func (sql *SQL) Query(query string, args ...interface{}) bool {
raw := sqle.Format(query, args...) raw := sqle.Format(query, args...)
code, err := sql.OpenShell(stream.FromNil(), "-e", raw) code, err := sql.OpenShell(stream.FromNil(), "-e", raw)
return err == nil && code == 0 return err == nil && code == 0
} }
// SQLProvision provisions a new sql database and user // SQLProvision provisions a new sql database and user
func (sql SQL) Provision(name, user, password string) error { func (sql *SQL) Provision(name, user, password string) error {
// wait for the database // wait for the database
if err := sql.WaitShell(); err != nil { if err := sql.WaitShell(); err != nil {
return err return err
@ -128,7 +149,7 @@ func (sql SQL) Provision(name, user, password string) error {
var errSQLPurgeUser = errors.New("unable to delete user") var errSQLPurgeUser = errors.New("unable to delete user")
// SQLPurgeUser deletes the specified user from the database // SQLPurgeUser deletes the specified user from the database
func (sql SQL) PurgeUser(user string) error { func (sql *SQL) PurgeUser(user string) error {
if !sql.Query("DROP USER IF EXISTS ?@`%`; FLUSH PRIVILEGES; ", user) { if !sql.Query("DROP USER IF EXISTS ?@`%`; FLUSH PRIVILEGES; ", user) {
return errSQLPurgeUser return errSQLPurgeUser
} }
@ -139,7 +160,7 @@ func (sql SQL) PurgeUser(user string) error {
var errSQLPurgeDB = errors.New("unable to drop database") var errSQLPurgeDB = errors.New("unable to drop database")
// SQLPurgeDatabase deletes the specified db from the database // SQLPurgeDatabase deletes the specified db from the database
func (sql SQL) PurgeDatabase(db string) error { func (sql *SQL) PurgeDatabase(db string) error {
if !sqle.IsSafeDatabaseName(db) { if !sqle.IsSafeDatabaseName(db) {
return errSQLPurgeDB return errSQLPurgeDB
} }
@ -154,7 +175,7 @@ var errSQLUnsafeDatabaseName = errors.New("bookkeeping database has an unsafe na
var errSQLUnableToCreate = errors.New("unable to create bookkeeping database") var errSQLUnableToCreate = errors.New("unable to create bookkeeping database")
// Bootstrap bootstraps the SQL database, and makes sure that the bookkeeping table is up-to-date // Bootstrap bootstraps the SQL database, and makes sure that the bookkeeping table is up-to-date
func (sql SQL) Bootstrap(io stream.IOStream) error { func (sql *SQL) Bootstrap(io stream.IOStream) error {
if err := sql.WaitShell(); err != nil { if err := sql.WaitShell(); err != nil {
return err return err
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/FAU-CDI/wisski-distillery/internal/component" "github.com/FAU-CDI/wisski-distillery/internal/component"
"github.com/FAU-CDI/wisski-distillery/pkg/environment" "github.com/FAU-CDI/wisski-distillery/pkg/environment"
"github.com/FAU-CDI/wisski-distillery/pkg/lazy"
) )
type SQL struct { type SQL struct {
@ -16,6 +17,8 @@ type SQL struct {
PollContext context.Context // context to abort polling with PollContext context.Context // context to abort polling with
PollInterval time.Duration // duration to wait for during wait PollInterval time.Duration // duration to wait for during wait
sqlNetwork lazy.Lazy[string]
} }
func (SQL) Name() string { func (SQL) Name() string {
@ -25,7 +28,7 @@ func (SQL) Name() string {
//go:embed all:sql //go:embed all:sql
var resources embed.FS var resources embed.FS
func (ssh SQL) Stack(env environment.Environment) component.StackWithResources { func (ssh *SQL) Stack(env environment.Environment) component.StackWithResources {
return ssh.ComponentBase.MakeStack(env, component.StackWithResources{ return ssh.ComponentBase.MakeStack(env, component.StackWithResources{
Resources: resources, Resources: resources,
ContextPath: "sql", ContextPath: "sql",

View file

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"mime/multipart" "mime/multipart"
"net"
"net/http" "net/http"
"github.com/FAU-CDI/wisski-distillery/pkg/logging" "github.com/FAU-CDI/wisski-distillery/pkg/logging"
@ -64,10 +63,8 @@ func (ts Triplestore) OpenRaw(method, url string, body interface{}, bodyName str
// create the request object // create the request object
client := &http.Client{ client := &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
Dial: ts.Environment.Dial, DialContext: ts.Environment.DialContext,
DialTLS: func(network, addr string) (net.Conn, error) { DisableKeepAlives: true,
return nil, errors.New("not implemented")
},
}, },
} }
req, err := http.NewRequest(method, ts.BaseURL+url, reader) req, err := http.NewRequest(method, ts.BaseURL+url, reader)

View file

@ -70,7 +70,7 @@ func makeComponent[C component.Component](dis *Distillery, field *lazy.Lazy[C],
func (dis *Distillery) ComponentsX() []component.Component { func (dis *Distillery) ComponentsX() []component.Component {
return []component.Component{ return []component.Component{
dis.Web(), dis.Web(),
dis.Dis(), dis.Control(),
dis.SSH(), dis.SSH(),
dis.Triplestore(), dis.Triplestore(),
dis.SQL(), dis.SQL(),
@ -107,7 +107,7 @@ func (dis *Distillery) Web() *web.Web {
return makeComponent(dis, &dis.components.web, nil) return makeComponent(dis, &dis.components.web, nil)
} }
func (d *Distillery) Dis() *control.Control { func (d *Distillery) Control() *control.Control {
return makeComponent(d, &d.components.control, func(ddis *control.Control) { return makeComponent(d, &d.components.control, func(ddis *control.Control) {
ddis.ResolverFile = core.PrefixConfig ddis.ResolverFile = core.PrefixConfig
ddis.Instances = d.Instances() ddis.Instances = d.Instances()

View file

@ -1,6 +1,7 @@
package environment package environment
import ( import (
"context"
"io" "io"
"io/fs" "io/fs"
"net" "net"
@ -41,7 +42,7 @@ type Environment interface {
Abs(path string) (string, error) Abs(path string) (string, error)
Listen(network, address string) (net.Listener, error) Listen(network, address string) (net.Listener, error)
Dial(network, address string) (net.Conn, error) DialContext(context context.Context, network, address string) (net.Conn, error)
Executable() (string, error) Executable() (string, error)
Exec(io stream.IOStream, workdir string, exe string, argv ...string) int Exec(io stream.IOStream, workdir string, exe string, argv ...string) int

View file

@ -1,11 +1,15 @@
package environment package environment
import "net" import (
"context"
"net"
)
func (Native) Listen(network, address string) (net.Listener, error) { func (Native) Listen(network, address string) (net.Listener, error) {
return net.Listen(network, address) return net.Listen(network, address)
} }
func (Native) Dial(network, address string) (net.Conn, error) { func (Native) DialContext(context context.Context, network, address string) (net.Conn, error) {
return net.Dial(network, address) var d net.Dialer
return d.DialContext(context, network, address)
} }