From 1176d854963cf42a939670b9eea2337de41b5343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 21 Apr 2022 19:03:08 +0200 Subject: [PATCH] Worker: add `-flush` CLI option to just flush the buffer and quit Add `-flush` CLI option to just flush the upstream queue and then quit. --- cmd/flamenco-worker/main.go | 31 +++++++++++++++++++------ internal/worker/upstream_buffer.go | 37 ++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/cmd/flamenco-worker/main.go b/cmd/flamenco-worker/main.go index e7e4eb47..efc6e428 100644 --- a/cmd/flamenco-worker/main.go +++ b/cmd/flamenco-worker/main.go @@ -31,8 +31,11 @@ var ( ) var cliArgs struct { + // Do-and-quit flags. version bool + flush bool + // Logging level flags. quiet, debug, trace bool managerURL *url.URL @@ -94,8 +97,20 @@ func main() { timeService := clock.New() buffer = upstreamBufferOrDie(client, timeService) - // Flush any updates before actually starting the Worker. - buffer.Flush(workerCtx) + if queueSize, err := buffer.QueueSize(); err != nil { + log.Fatal().Err(err).Msg("error checking upstream buffer") + } else if queueSize > 0 { + // Flush any updates before actually starting the Worker. + log.Info().Int("queueSize", queueSize).Msg("flushing upstream buffer") + buffer.Flush(workerCtx) + } + + if cliArgs.flush { + log.Info().Msg("upstream buffer flushed, shutting down") + workerCtxCancel() + shutdown() + return + } cliRunner := worker.NewCLIRunner() listener = worker.NewListener(client, buffer) @@ -109,9 +124,11 @@ func main() { signal.Notify(c, syscall.SIGTERM) go func() { for signum := range c { - workerCtxCancel() + log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.") + // Run the shutdown sequence in a goroutine, so that multiple Ctrl+C presses can be handled in parallel. - go shutdown(signum) + workerCtxCancel() + go shutdown() } }() @@ -123,11 +140,9 @@ func main() { log.Debug().Msg("process shutting down") } -func shutdown(signum os.Signal) { +func shutdown() { done := make(chan struct{}) go func() { - log.Info().Str("signal", signum.String()).Msg("signal received, shutting down.") - if w != nil { shutdownCtx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) defer cancelFunc() @@ -155,6 +170,8 @@ func shutdown(signum os.Signal) { func parseCliArgs() { flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") + flag.BoolVar(&cliArgs.flush, "flush", false, "Flush any buffered task updates to the Manager, then exits.") + flag.BoolVar(&cliArgs.quiet, "quiet", false, "Only log warning-level and worse.") flag.BoolVar(&cliArgs.debug, "debug", false, "Enable debug-level logging.") flag.BoolVar(&cliArgs.trace, "trace", false, "Enable trace-level logging.") diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index a30b13a8..38bf1f56 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -220,6 +220,12 @@ func (ub *UpstreamBufferDB) queueTaskUpdate(taskID string, update api.TaskUpdate return nil } +func (ub *UpstreamBufferDB) QueueSize() (int, error) { + ub.dbMutex.Lock() + defer ub.dbMutex.Unlock() + return ub.queueSize() +} + func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { ub.dbMutex.Lock() defer ub.dbMutex.Unlock() @@ -242,6 +248,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { var done bool for !done { done, err = ub.flushFirstItem(ctx) + if err != nil { return err } @@ -251,6 +258,7 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { } func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) { + startTime := time.Now() dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) defer dbCtxCancel() @@ -267,6 +275,8 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err var taskID string var blob []byte + beforeQuery := time.Now() + err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob) switch { case err == sql.ErrNoRows: @@ -278,6 +288,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err } logger := log.With().Str("task", taskID).Logger() + beforeUnmarshal := time.Now() var update api.TaskUpdateJSONRequestBody if err := json.Unmarshal(blob, &update); err != nil { @@ -290,6 +301,7 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err } return false, tx.Commit() } + beforeAPICall := time.Now() // actually attempt delivery. resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update) @@ -298,6 +310,8 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err return true, err } + afterAPICall := time.Now() + // Regardless of the response, there is little else to do but to discard the // update from the queue. switch resp.StatusCode() { @@ -312,10 +326,28 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err Msg("queued task update discarded by Manager, unknown reason") } + beforeDiscard := time.Now() + if err := ub.discardRow(tx, rowID); err != nil { return false, err } - return false, tx.Commit() + + beforeCommit := time.Now() + err = tx.Commit() + + finalTime := time.Now() + + log.Debug(). + Stringer("prepare", beforeQuery.Sub(startTime)). + Stringer("query", beforeUnmarshal.Sub(beforeQuery)). + Stringer("unmarshal", beforeAPICall.Sub(beforeUnmarshal)). + Stringer("api", afterAPICall.Sub(beforeAPICall)). + Stringer("discard", beforeCommit.Sub(beforeDiscard)). + Stringer("commit", finalTime.Sub(beforeCommit)). + Stringer("total", finalTime.Sub(startTime)). + Msg("single flush") + + return false, err } func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error { @@ -355,6 +387,7 @@ func (ub *UpstreamBufferDB) periodicFlushLoop() { func rollbackTransaction(tx *sql.Tx) { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - log.Error().Err(err).Msg("rolling back transaction") + // log.Error().Err(err).Msg("rolling back transaction") + log.Panic().Err(err).Msg("rolling back transaction") } }