Worker: flush upstream buffer when shutting down
When shutting down, the worker now tries to flush any buffered task updates before closing.
This commit is contained in:
parent
9ddf72fa37
commit
e1309ad8fc
@ -43,6 +43,7 @@ type UpstreamBufferDB struct {
|
||||
|
||||
const defaultUpstreamFlushInterval = 30 * time.Second
|
||||
const databaseContextTimeout = 10 * time.Second
|
||||
const flushOnShutdownTimeout = 5 * time.Second
|
||||
|
||||
var _ UpstreamBuffer = (*UpstreamBufferDB)(nil)
|
||||
|
||||
@ -124,7 +125,7 @@ func (ub *UpstreamBufferDB) SendTaskUpdate(ctx context.Context, taskID string, u
|
||||
}
|
||||
}
|
||||
|
||||
// Close releases the database. It does not try to flush any pending items.
|
||||
// Close performs one final flush, then releases the database.
|
||||
func (ub *UpstreamBufferDB) Close() error {
|
||||
if ub.db == nil {
|
||||
return nil
|
||||
@ -134,6 +135,14 @@ func (ub *UpstreamBufferDB) Close() error {
|
||||
close(ub.done)
|
||||
ub.wg.Wait()
|
||||
|
||||
// Attempt one final flush, if it's fast enough:
|
||||
log.Info().Msg("upstream buffer shutting down, doing one final flush")
|
||||
flushCtx, ctxCancel := context.WithTimeout(context.Background(), flushOnShutdownTimeout)
|
||||
defer ctxCancel()
|
||||
if err := ub.Flush(flushCtx); err != nil {
|
||||
log.Warn().Err(err).Msg("error flushing upstream buffer at shutdown")
|
||||
}
|
||||
|
||||
// Close the database.
|
||||
return ub.db.Close()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user