diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 37f4ba5b..19f3952a 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -43,6 +43,7 @@ type UpstreamBufferDB struct { const defaultUpstreamFlushInterval = 30 * time.Second const databaseContextTimeout = 10 * time.Second +const flushOnShutdownTimeout = 5 * time.Second var _ UpstreamBuffer = (*UpstreamBufferDB)(nil) @@ -124,7 +125,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u } } -// Close releases the database. It does not try to flush any pending items. +// Close performs one final flush, then releases the database. func (ub *UpstreamBufferDB) Close() error { if ub.db == nil { return nil @@ -134,6 +135,14 @@ func (ub *UpstreamBufferDB) Close() error { close(ub.done) ub.wg.Wait() + // Attempt one final flush, if it's fast enough: + log.Info().Msg("upstream buffer shutting down, doing one final flush") + flushCtx, ctxCancel := context.WithTimeout(context.Background(), flushOnShutdownTimeout) + defer ctxCancel() + if err := ub.Flush(flushCtx); err != nil { + log.Warn().Err(err).Msg("error flushing upstream buffer at shutdown") + } + // Close the database. return ub.db.Close() }