From e34a0ba6eaeac00256e84555d49e4a302df1d44e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 21 Apr 2022 19:19:01 +0200 Subject: [PATCH] Worker: more granular locking when flushing upstream buffer Only lock the database mutex when actual queries are performed, but not during the entire flush loop. --- internal/worker/upstream_buffer.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 7f29c4b6..37f4ba5b 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -227,15 +227,15 @@ func (ub *UpstreamBufferDB) QueueSize() (int, error) { } func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { - ub.dbMutex.Lock() - defer ub.dbMutex.Unlock() - if ub.db == nil { log.Panic().Msg("no database opened, unable to queue task updates") } // See if we need to flush at all. + ub.dbMutex.Lock() queueSize, err := ub.queueSize() + ub.dbMutex.Unlock() + switch { case err != nil: return fmt.Errorf("unable to determine queue size: %w", err) @@ -247,7 +247,9 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { // Keep flushing until the queue is empty or there is an error. var done bool for !done { + ub.dbMutex.Lock() done, err = ub.flushFirstItem(ctx) + ub.dbMutex.Unlock() if err != nil { return err