pkg: Add spawner package
This commit is contained in:
parent
2e47626900
commit
e17ab90269
1 changed files with 108 additions and 0 deletions
108
pkg/spawner/spawner.go
Normal file
108
pkg/spawner/spawner.go
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
package spawner
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Spawner[T io.Closer] struct {
|
||||
Spawn func() T
|
||||
Alive func(t T) bool
|
||||
|
||||
Limit int
|
||||
|
||||
initOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
tasks chan task[T]
|
||||
|
||||
instances []T
|
||||
alive []bool
|
||||
}
|
||||
|
||||
type task[T any] struct {
|
||||
f func(t T)
|
||||
done chan<- struct{}
|
||||
}
|
||||
|
||||
func (tt task[T]) Do(t T) {
|
||||
defer close(tt.done)
|
||||
tt.f(t)
|
||||
}
|
||||
|
||||
func (spawner *Spawner[T]) worker(i int) {
|
||||
spawner.wg.Add(1)
|
||||
go func() {
|
||||
defer spawner.wg.Done()
|
||||
|
||||
var zero T
|
||||
for task := range spawner.tasks {
|
||||
// spawn a new instance once finished
|
||||
if !spawner.alive[i] {
|
||||
log.Println("respawning instance ", i)
|
||||
spawner.instances[i] = spawner.Spawn()
|
||||
spawner.alive[i] = true
|
||||
}
|
||||
|
||||
task.Do(spawner.instances[i])
|
||||
|
||||
// if the instance has died during execution
|
||||
// then do a close (to deallocate)
|
||||
if !spawner.Alive(spawner.instances[i]) {
|
||||
spawner.instances[i].Close()
|
||||
spawner.instances[i] = zero
|
||||
spawner.alive[i] = false
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (spawner *Spawner[T]) start() {
|
||||
spawner.initOnce.Do(func() {
|
||||
limit := spawner.Limit
|
||||
if limit < 1 {
|
||||
limit = 1
|
||||
}
|
||||
|
||||
spawner.tasks = make(chan task[T], limit)
|
||||
spawner.alive = make([]bool, limit)
|
||||
spawner.instances = make([]T, limit)
|
||||
for i := 0; i < limit; i++ {
|
||||
spawner.worker(i)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Do performs f on an unspecified object from the spawner.
|
||||
// If no objects are available within time.Duration, a new object is spawned as there are at most limit objects.
|
||||
func (spawner *Spawner[T]) Do(f func(t T)) {
|
||||
spawner.start()
|
||||
|
||||
done := make(chan struct{})
|
||||
spawner.tasks <- task[T]{
|
||||
f: f,
|
||||
done: done,
|
||||
}
|
||||
<-done
|
||||
}
|
||||
|
||||
func (spawner *Spawner[T]) Close() {
|
||||
close(spawner.tasks)
|
||||
spawner.wg.Wait()
|
||||
|
||||
spawner.wg.Add(len(spawner.alive))
|
||||
|
||||
var zero T
|
||||
for i := range spawner.alive {
|
||||
go func(i int) {
|
||||
defer spawner.wg.Done()
|
||||
|
||||
if spawner.alive[i] {
|
||||
spawner.instances[i].Close()
|
||||
spawner.instances[i] = zero
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
spawner.wg.Wait()
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue