diff --git a/pkg/spawner/spawner.go b/pkg/spawner/spawner.go new file mode 100644 index 0000000..9c39ccf --- /dev/null +++ b/pkg/spawner/spawner.go @@ -0,0 +1,108 @@ +package spawner + +import ( + "io" + "log" + "sync" +) + +type Spawner[T io.Closer] struct { + Spawn func() T + Alive func(t T) bool + + Limit int + + initOnce sync.Once + wg sync.WaitGroup + tasks chan task[T] + + instances []T + alive []bool +} + +type task[T any] struct { + f func(t T) + done chan<- struct{} +} + +func (tt task[T]) Do(t T) { + defer close(tt.done) + tt.f(t) +} + +func (spawner *Spawner[T]) worker(i int) { + spawner.wg.Add(1) + go func() { + defer spawner.wg.Done() + + var zero T + for task := range spawner.tasks { + // spawn a new instance once finished + if !spawner.alive[i] { + log.Println("respawning instance ", i) + spawner.instances[i] = spawner.Spawn() + spawner.alive[i] = true + } + + task.Do(spawner.instances[i]) + + // if the instance has died during execution + // then do a close (to deallocate) + if !spawner.Alive(spawner.instances[i]) { + spawner.instances[i].Close() + spawner.instances[i] = zero + spawner.alive[i] = false + } + } + }() +} + +func (spawner *Spawner[T]) start() { + spawner.initOnce.Do(func() { + limit := spawner.Limit + if limit < 1 { + limit = 1 + } + + spawner.tasks = make(chan task[T], limit) + spawner.alive = make([]bool, limit) + spawner.instances = make([]T, limit) + for i := 0; i < limit; i++ { + spawner.worker(i) + } + }) +} + +// Do performs f on an unspecified object from the spawner. +// If no objects are available within time.Duration, a new object is spawned as there are at most limit objects. +func (spawner *Spawner[T]) Do(f func(t T)) { + spawner.start() + + done := make(chan struct{}) + spawner.tasks <- task[T]{ + f: f, + done: done, + } + <-done +} + +func (spawner *Spawner[T]) Close() { + close(spawner.tasks) + spawner.wg.Wait() + + spawner.wg.Add(len(spawner.alive)) + + var zero T + for i := range spawner.alive { + go func(i int) { + defer spawner.wg.Done() + + if spawner.alive[i] { + spawner.instances[i].Close() + spawner.instances[i] = zero + } + }(i) + } + + spawner.wg.Wait() +}