Worker: more granular locking when flushing upstream buffer
Only lock the database mutex when actual queries are performed, but not during the entire flush loop.
This commit is contained in:
parent
8937a6f06f
commit
e34a0ba6ea
@ -227,15 +227,15 @@ func (ub *UpstreamBufferDB) QueueSize() (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
|
func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
|
||||||
ub.dbMutex.Lock()
|
|
||||||
defer ub.dbMutex.Unlock()
|
|
||||||
|
|
||||||
if ub.db == nil {
|
if ub.db == nil {
|
||||||
log.Panic().Msg("no database opened, unable to queue task updates")
|
log.Panic().Msg("no database opened, unable to queue task updates")
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if we need to flush at all.
|
// See if we need to flush at all.
|
||||||
|
ub.dbMutex.Lock()
|
||||||
queueSize, err := ub.queueSize()
|
queueSize, err := ub.queueSize()
|
||||||
|
ub.dbMutex.Unlock()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err != nil:
|
case err != nil:
|
||||||
return fmt.Errorf("unable to determine queue size: %w", err)
|
return fmt.Errorf("unable to determine queue size: %w", err)
|
||||||
@ -247,7 +247,9 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
|
|||||||
// Keep flushing until the queue is empty or there is an error.
|
// Keep flushing until the queue is empty or there is an error.
|
||||||
var done bool
|
var done bool
|
||||||
for !done {
|
for !done {
|
||||||
|
ub.dbMutex.Lock()
|
||||||
done, err = ub.flushFirstItem(ctx)
|
done, err = ub.flushFirstItem(ctx)
|
||||||
|
ub.dbMutex.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user