diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 38bf1f56..7f29c4b6 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -258,7 +258,6 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { } func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err error) { - startTime := time.Now() dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), databaseContextTimeout) defer dbCtxCancel() @@ -275,8 +274,6 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err var taskID string var blob []byte - beforeQuery := time.Now() - err = tx.QueryRowContext(dbCtx, stmt).Scan(&rowID, &taskID, &blob) switch { case err == sql.ErrNoRows: @@ -288,7 +285,6 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err } logger := log.With().Str("task", taskID).Logger() - beforeUnmarshal := time.Now() var update api.TaskUpdateJSONRequestBody if err := json.Unmarshal(blob, &update); err != nil { @@ -301,7 +297,6 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err } return false, tx.Commit() } - beforeAPICall := time.Now() // actually attempt delivery. resp, err := ub.client.TaskUpdateWithResponse(ctx, taskID, update) @@ -310,8 +305,6 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err return true, err } - afterAPICall := time.Now() - // Regardless of the response, there is little else to do but to discard the // update from the queue. switch resp.StatusCode() { @@ -326,28 +319,10 @@ func (ub *UpstreamBufferDB) flushFirstItem(ctx context.Context) (done bool, err Msg("queued task update discarded by Manager, unknown reason") } - beforeDiscard := time.Now() - if err := ub.discardRow(tx, rowID); err != nil { return false, err } - - beforeCommit := time.Now() - err = tx.Commit() - - finalTime := time.Now() - - log.Debug(). - Stringer("prepare", beforeQuery.Sub(startTime)). - Stringer("query", beforeUnmarshal.Sub(beforeQuery)). - Stringer("unmarshal", beforeAPICall.Sub(beforeUnmarshal)). - Stringer("api", afterAPICall.Sub(beforeAPICall)). - Stringer("discard", beforeCommit.Sub(beforeDiscard)). - Stringer("commit", finalTime.Sub(beforeCommit)). - Stringer("total", finalTime.Sub(startTime)). - Msg("single flush") - - return false, err + return false, tx.Commit() } func (ub *UpstreamBufferDB) discardRow(tx *sql.Tx, rowID int64) error {