snapshots: Handle as separate components

This commit is contained in:
Tom Wiesing 2022-10-02 18:17:47 +02:00
parent 698f04e13e
commit 3b112f1b8e
No known key found for this signature in database
27 changed files with 960 additions and 789 deletions

View file

@ -1,50 +0,0 @@
// Package opgroup provides OpGroup
package opgroup
import "sync"
// OpGroup represents an operation group that can send messages to the waiting goroutine.
// The zero value is not ready for use, use [NewOpGroup] instead.
type OpGroup[M any] struct {
wg sync.WaitGroup
c chan M
}
// NewOpGroup creates a new OpGroup.
//
// The internal buffer size for messages will be expectedSize.
// If unsure about buffer size, 0 is a valid choice.
func NewOpGroup[M any](expectedSize int) *OpGroup[M] {
return &OpGroup[M]{
c: make(chan M, expectedSize),
}
}
// Go schedules a new operation (implemented by worker) to run in a separate goroutine.
// worker is passed a send-only reference to the message channel which it can uszxe to send messages to.
func (op *OpGroup[M]) Go(worker func(c chan<- M)) {
op.wg.Add(1)
go func() {
defer op.wg.Done()
worker(op.c)
}()
}
// GoErr is like Go, except that once the operation is finished, it writes the returned error into dest.
func (op *OpGroup[M]) GoErr(worker func(c chan<- M) error, dest *error) {
op.Go(func(c chan<- M) {
*dest = worker(c)
})
}
// Wait returns a receive-only reference to the message channel.
// The message channel will be closed once all operations on this group have completed.
//
// The Wait function may only be called once.
func (op *OpGroup[M]) Wait() <-chan M {
go func() {
op.wg.Wait()
close(op.c)
}()
return op.c
}

44
pkg/rlock/rlock.go Normal file
View file

@ -0,0 +1,44 @@
package rlock
import (
"sync"
"time"
)
type RLock struct {
m sync.Mutex // m is held internally
held bool
holder int
counter uint64
}
func (rm *RLock) Lock(id int) {
for {
rm.m.Lock()
if !rm.held {
rm.held = true
rm.holder = id
break
} else if rm.held && rm.holder == id {
break
} else {
rm.m.Unlock()
time.Sleep(time.Millisecond)
continue
}
}
rm.counter++
rm.m.Unlock()
}
func (rm *RLock) Unlock() {
rm.m.Lock()
rm.counter--
if rm.counter == 0 {
rm.held = false
rm.holder = 0
}
rm.m.Unlock()
}

View file

@ -26,6 +26,16 @@ func Filter[T any](values []T, filter func(T) bool) []T {
return results
}
// FilterClone is like [Filter], but creates a new slice
func FilterClone[T any](values []T, filter func(T) bool) (results []T) {
for _, value := range values {
if filter(value) {
results = append(results, value)
}
}
return
}
// NonSequential sorts values, and then removes elements for which test() returns true.
// NonSequential does not re-allocate, but uses the existing slice.
func NonSequential[T constraints.Ordered](values []T, test func(prev, current T) bool) []T {