From e1309ad8fc04577c849ce8fcf11beff6f05b091f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 16 Jun 2022 12:21:17 +0200 Subject: [PATCH] Worker: flush upstream buffer when shutting down When shutting down, the worker now tries to flush any buffered task updates before closing. --- internal/worker/upstream_buffer.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 37f4ba5b..19f3952a 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -43,6 +43,7 @@ type UpstreamBufferDB struct { const defaultUpstreamFlushInterval = 30 * time.Second const databaseContextTimeout = 10 * time.Second +const flushOnShutdownTimeout = 5 * time.Second var _ UpstreamBuffer = (*UpstreamBufferDB)(nil) @@ -124,7 +125,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u } } -// Close releases the database. It does not try to flush any pending items. +// Close performs one final flush, then releases the database. func (ub *UpstreamBufferDB) Close() error { if ub.db == nil { return nil @@ -134,6 +135,14 @@ func (ub *UpstreamBufferDB) Close() error { close(ub.done) ub.wg.Wait() + // Attempt one final flush, if it's fast enough: + log.Info().Msg("upstream buffer shutting down, doing one final flush") + flushCtx, ctxCancel := context.WithTimeout(context.Background(), flushOnShutdownTimeout) + defer ctxCancel() + if err := ub.Flush(flushCtx); err != nil { + log.Warn().Err(err).Msg("error flushing upstream buffer at shutdown") + } + // Close the database. return ub.db.Close() }