Worker: add -flush CLI option to just flush the buffer and quit

Add `-flush` CLI option to just flush the upstream queue and then quit.
This commit is contained in:
Sybren A. Stüvel 2022-04-21 19:03:08 +02:00
parent 55d264632f
commit 1176d85496
2 changed files with 59 additions and 9 deletions

View File

@ -31,8 +31,11 @@ var (
) )
var cliArgs struct { var cliArgs struct {
// Do-and-quit flags.
version bool version bool
flush bool
// Logging level flags.
quiet, debug, trace bool quiet, debug, trace bool
managerURL *url.URL managerURL *url.URL
@ -94,8 +97,20 @@ func main() {
timeService := clock.New() timeService := clock.New()
buffer = upstreamBufferOrDie(client, timeService) buffer = upstreamBufferOrDie(client, timeService)
// Flush any updates before actually starting the Worker. if queueSize, err := buffer.QueueSize(); err != nil {
buffer.Flush(workerCtx) log.Fatal().Err(err).Msg("error checking upstream buffer")
} else if queueSize > 0 {
// Flush any updates before actually starting the Worker.
log.Info().Int("queueSize", queueSize).Msg("flushing upstream buffer")
buffer.Flush(workerCtx)
}
if cliArgs.flush {
log.Info().Msg("upstream buffer flushed, shutting down")
workerCtxCancel()
shutdown()
return
}
cliRunner := worker.NewCLIRunner() cliRunner := worker.NewCLIRunner()
listener = worker.NewListener(client, buffer) listener = worker.NewListener(client, buffer)
@ -109,9 +124,11 @@ func main() {
signal.Notify(c, syscall.SIGTERM) signal.Notify(c, syscall.SIGTERM)
go func() { go func() {
for signum := range c { for signum := range c {
workerCtxCancel() log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.")
// Run the shutdown sequence in a goroutine, so that multiple Ctrl+C presses can be handled in parallel. // Run the shutdown sequence in a goroutine, so that multiple Ctrl+C presses can be handled in parallel.
go shutdown(signum) workerCtxCancel()
go shutdown()
} }
}() }()
@ -123,11 +140,9 @@ func main() {
log.Debug().Msg("process shutting down") log.Debug().Msg("process shutting down")
} }
func shutdown(signum os.Signal) { func shutdown() {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.")
if w != nil { if w != nil {
shutdownCtx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) shutdownCtx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
defer cancelFunc() defer cancelFunc()
@ -155,6 +170,8 @@ func shutdown(signum os.Signal) {
func parseCliArgs() { func parseCliArgs() {
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
flag.BoolVar(&cliArgs.flush, "flush", false, "Flush any buffered task updates to the Manager, then exits.")
flag.BoolVar(&cliArgs.quiet, "quiet", false, "Only log warning-level and worse.") flag.BoolVar(&cliArgs.quiet, "quiet", false, "Only log warning-level and worse.")
flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.") flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.")
flag.BoolVar(&cliArgs.trace, "trace", false, "Enable trace-level logging.") flag.BoolVar(&cliArgs.trace, "trace", false, "Enable trace-level logging.")

View File

@ -220,6 +220,12 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(taskID string, update api.TaskUpdate
return nil return nil
} }
func (ub *UpstreamBufferDB) QueueSize() (int, error) {
ub.dbMutex.Lock()
defer ub.dbMutex.Unlock()
return ub.queueSize()
}
func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
ub.dbMutex.Lock() ub.dbMutex.Lock()
defer ub.dbMutex.Unlock() defer ub.dbMutex.Unlock()
@ -242,6 +248,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
var done bool var done bool
for !done { for !done {
done, err = ub.flushFirstItem(ctx) done, err = ub.flushFirstItem(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -251,6 +258,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error {
} }
func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) { func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) {
startTime := time.Now()
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout)
defer dbCtxCancel() defer dbCtxCancel()
@ -267,6 +275,8 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
var taskID string var taskID string
var blob []byte var blob []byte
beforeQuery := time.Now()
err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob) err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob)
switch { switch {
case err == sql.ErrNoRows: case err == sql.ErrNoRows:
@ -278,6 +288,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
} }
logger := log.With().Str("task", taskID).Logger() logger := log.With().Str("task", taskID).Logger()
beforeUnmarshal := time.Now()
var update api.TaskUpdateJSONRequestBody var update api.TaskUpdateJSONRequestBody
if err := json.Unmarshal(blob, &update); err != nil { if err := json.Unmarshal(blob, &update); err != nil {
@ -290,6 +301,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
} }
return false, tx.Commit() return false, tx.Commit()
} }
beforeAPICall := time.Now()
// actually attempt delivery. // actually attempt delivery.
resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update) resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update)
@ -298,6 +310,8 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
return true, err return true, err
} }
afterAPICall := time.Now()
// Regardless of the response, there is little else to do but to discard the // Regardless of the response, there is little else to do but to discard the
// update from the queue. // update from the queue.
switch resp.StatusCode() { switch resp.StatusCode() {
@ -312,10 +326,28 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err
Msg("queued task update discarded by Manager, unknown reason") Msg("queued task update discarded by Manager, unknown reason")
} }
beforeDiscard := time.Now()
if err := ub.discardRow(tx, rowID); err != nil { if err := ub.discardRow(tx, rowID); err != nil {
return false, err return false, err
} }
return false, tx.Commit()
beforeCommit := time.Now()
err = tx.Commit()
finalTime := time.Now()
log.Debug().
Stringer("prepare", beforeQuery.Sub(startTime)).
Stringer("query", beforeUnmarshal.Sub(beforeQuery)).
Stringer("unmarshal", beforeAPICall.Sub(beforeUnmarshal)).
Stringer("api", afterAPICall.Sub(beforeAPICall)).
Stringer("discard", beforeCommit.Sub(beforeDiscard)).
Stringer("commit", finalTime.Sub(beforeCommit)).
Stringer("total", finalTime.Sub(startTime)).
Msg("single flush")
return false, err
} }
func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error { func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error {
@ -355,6 +387,7 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() {
func rollbackTransaction(tx *sql.Tx) { func rollbackTransaction(tx *sql.Tx) {
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
log.Error().Err(err).Msg("rolling back transaction") // log.Error().Err(err).Msg("rolling back transaction")
log.Panic().Err(err).Msg("rolling back transaction")
} }
} }