Manager: run some operations in a background context

Run some API operations in a background context. This should prevent some
of the SQLite "interrupted" errors, as those can occur when the context
closes while a query is running.

The API operations that Workers use are now mostly running in a separate
background context, at least from the moment onward when they can run
independently of the Worker connection.
This commit is contained in:
Sybren A. Stüvel 2022-07-18 16:26:06 +02:00
parent 778ad6927b
commit 0697f71b62
4 changed files with 68 additions and 40 deletions

View File

@ -0,0 +1,17 @@
package api_impl
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"time"
)
const bgContextTimeout = 10 * time.Second
// bgContext returns a background context for background processing. This
// context MUST be used when a database query is meant to be independent of any
// API call that triggered it.
func bgContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), bgContextTimeout)
}

View File

@ -67,9 +67,12 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
"task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus)
} }
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) bgCtx, bgCtxCancel := bgContext()
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) defer bgCtxCancel()
workerSeenErr := f.workerSeen(ctx, logger, worker)
taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(logger, dbTask)
workerSeenErr := f.workerSeen(logger, worker)
if taskUpdateErr != nil { if taskUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)

View File

@ -9,7 +9,6 @@ import (
"io" "io"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -130,7 +129,7 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
return nil, "", err return nil, "", err
} }
err = f.workerSeen(ctx, logger, w) err = f.workerSeen(logger, w)
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
@ -141,13 +140,6 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
func (f *Flamenco) SignOff(e echo.Context) error { func (f *Flamenco) SignOff(e echo.Context) error {
logger := requestLogger(e) logger := requestLogger(e)
var req api.SignOnJSONBody
err := e.Bind(&req)
if err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
logger.Info().Msg("worker signing off") logger.Info().Msg("worker signing off")
w := requestWorkerOrPanic(e) w := requestWorkerOrPanic(e)
prevStatus := w.Status prevStatus := w.Status
@ -158,10 +150,10 @@ func (f *Flamenco) SignOff(e echo.Context) error {
// Pass a generic background context, as these changes should be stored even // Pass a generic background context, as these changes should be stored even
// when the HTTP connection is aborted. // when the HTTP connection is aborted.
ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second) bgCtx, bgCtxCancel := bgContext()
defer ctxCancel() defer bgCtxCancel()
err = f.persist.SaveWorkerStatus(ctx, w) err := f.persist.SaveWorkerStatus(bgCtx, w)
if err != nil { if err != nil {
logger.Warn(). logger.Warn().
Err(err). Err(err).
@ -171,10 +163,10 @@ func (f *Flamenco) SignOff(e echo.Context) error {
} }
// Ignore database errors here; the rest of the signoff process should just happen. // Ignore database errors here; the rest of the signoff process should just happen.
_ = f.workerSeen(ctx, logger, w) _ = f.workerSeen(logger, w)
// Re-queue all tasks (should be only one) this worker is now working on. // Re-queue all tasks (should be only one) this worker is now working on.
err = f.stateMachine.RequeueActiveTasksOfWorker(ctx, w, "worker signed off") err = f.stateMachine.RequeueActiveTasksOfWorker(bgCtx, w, "worker signed off")
if err != nil { if err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks")
} }
@ -230,8 +222,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
w.StatusChangeClear() w.StatusChangeClear()
} }
ctx := e.Request().Context() bgCtx, bgCtxCancel := bgContext()
err = f.persist.SaveWorkerStatus(ctx, w) defer bgCtxCancel()
err = f.persist.SaveWorkerStatus(bgCtx, w)
if err != nil { if err != nil {
logger.Warn().Err(err). logger.Warn().Err(err).
Str("newStatus", string(w.Status)). Str("newStatus", string(w.Status)).
@ -239,7 +233,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
} }
if err := f.workerSeen(ctx, logger, w); err != nil { if err := f.workerSeen(logger, w); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database") return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database")
} }
@ -253,7 +247,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
func (f *Flamenco) ScheduleTask(e echo.Context) error { func (f *Flamenco) ScheduleTask(e echo.Context) error {
logger := requestLogger(e) logger := requestLogger(e)
worker := requestWorkerOrPanic(e) worker := requestWorkerOrPanic(e)
ctx := e.Request().Context() reqCtx := e.Request().Context()
logger.Debug().Msg("worker requesting task") logger.Debug().Msg("worker requesting task")
f.taskSchedulerMutex.Lock() f.taskSchedulerMutex.Lock()
@ -262,7 +256,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
// The worker is actively asking for a task, so note that it was seen // The worker is actively asking for a task, so note that it was seen
// regardless of any failures below, or whether there actually is a task to // regardless of any failures below, or whether there actually is a task to
// run. // run.
if err := f.workerSeen(ctx, logger, worker); err != nil { if err := f.workerSeen(logger, worker); err != nil {
return sendAPIError(e, http.StatusInternalServerError, return sendAPIError(e, http.StatusInternalServerError,
"error storing worker 'last seen' timestamp in database") "error storing worker 'last seen' timestamp in database")
} }
@ -289,7 +283,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
} }
// Get a task to execute: // Get a task to execute:
dbTask, err := f.persist.ScheduleTask(e.Request().Context(), worker) dbTask, err := f.persist.ScheduleTask(reqCtx, worker)
if err != nil { if err != nil {
if persistence.ErrIsDBBusy(err) { if persistence.ErrIsDBBusy(err) {
logger.Warn().Msg("database busy scheduling task for worker") logger.Warn().Msg("database busy scheduling task for worker")
@ -302,6 +296,11 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
return e.NoContent(http.StatusNoContent) return e.NoContent(http.StatusNoContent)
} }
// The task is assigned to the Worker now. Even when it disconnects, the
// processing of the task should continue.
bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
// Add a note to the task log about the worker assignment. // Add a note to the task log about the worker assignment.
msg := fmt.Sprintf("Task assigned to worker %s (%s)", worker.Name, worker.UUID) msg := fmt.Sprintf("Task assigned to worker %s (%s)", worker.Name, worker.UUID)
if err := f.logStorage.WriteTimestamped(logger, dbTask.Job.UUID, dbTask.UUID, msg); err != nil { if err := f.logStorage.WriteTimestamped(logger, dbTask.Job.UUID, dbTask.UUID, msg); err != nil {
@ -310,12 +309,12 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error {
// Move the task to 'active' status so that it won't be assigned to another // Move the task to 'active' status so that it won't be assigned to another
// worker. This also enables the task timeout monitoring. // worker. This also enables the task timeout monitoring.
if err := f.stateMachine.TaskStatusChange(ctx, dbTask, api.TaskStatusActive); err != nil { if err := f.stateMachine.TaskStatusChange(bgCtx, dbTask, api.TaskStatusActive); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "internal error marking task as active: %v", err) return sendAPIError(e, http.StatusInternalServerError, "internal error marking task as active: %v", err)
} }
// Start timeout measurement as soon as the Worker gets the task assigned. // Start timeout measurement as soon as the Worker gets the task assigned.
if err := f.workerPingedTask(ctx, logger, dbTask); err != nil { if err := f.workerPingedTask(logger, dbTask); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err) return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err)
} }
@ -353,7 +352,7 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
Int64("imageSizeBytes", filesize). Int64("imageSizeBytes", filesize).
Logger() Logger()
err := f.workerSeen(ctx, logger, worker) err := f.workerSeen(logger, worker)
if err != nil { if err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error updating 'last seen' timestamp of worker: %v", err) return sendAPIError(e, http.StatusInternalServerError, "error updating 'last seen' timestamp of worker: %v", err)
} }
@ -439,11 +438,13 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error {
} }
func (f *Flamenco) workerPingedTask( func (f *Flamenco) workerPingedTask(
ctx context.Context,
logger zerolog.Logger, logger zerolog.Logger,
task *persistence.Task, task *persistence.Task,
) error { ) error {
err := f.persist.TaskTouchedByWorker(ctx, task) bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
err := f.persist.TaskTouchedByWorker(bgCtx, task)
if err != nil { if err != nil {
logger.Error().Err(err).Msg("error marking task as 'touched' by worker") logger.Error().Err(err).Msg("error marking task as 'touched' by worker")
return err return err
@ -453,11 +454,13 @@ func (f *Flamenco) workerPingedTask(
// workerSeen marks the worker as 'seen' and logs any database error that may occur. // workerSeen marks the worker as 'seen' and logs any database error that may occur.
func (f *Flamenco) workerSeen( func (f *Flamenco) workerSeen(
ctx context.Context,
logger zerolog.Logger, logger zerolog.Logger,
w *persistence.Worker, w *persistence.Worker,
) error { ) error {
err := f.persist.WorkerSeen(ctx, w) bgCtx, bgCtxCancel := bgContext()
defer bgCtxCancel()
err := f.persist.WorkerSeen(bgCtx, w)
if err != nil { if err != nil {
logger.Error().Err(err).Msg("error marking Worker as 'seen' in the database") logger.Error().Err(err).Msg("error marking Worker as 'seen' in the database")
return err return err
@ -499,9 +502,9 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
// Errors saving the "worker pinged task" and "worker seen" fields in the // Errors saving the "worker pinged task" and "worker seen" fields in the
// database are just logged. It's not something to bother the worker with. // database are just logged. It's not something to bother the worker with.
if mkr.MayKeepRunning { if mkr.MayKeepRunning {
_ = f.workerPingedTask(ctx, logger, dbTask) _ = f.workerPingedTask(logger, dbTask)
} }
_ = f.workerSeen(ctx, logger, worker) _ = f.workerSeen(logger, worker)
return e.JSON(http.StatusOK, mkr) return e.JSON(http.StatusOK, mkr)
} }

View File

@ -38,14 +38,15 @@ func TestTaskScheduleHappy(t *testing.T) {
} }
ctx := echo.Request().Context() ctx := echo.Request().Context()
bgCtx := gomock.Not(ctx)
mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil) mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil)
mf.persistence.EXPECT().TaskTouchedByWorker(ctx, &task) mf.persistence.EXPECT().TaskTouchedByWorker(bgCtx, &task)
mf.persistence.EXPECT().WorkerSeen(ctx, &worker) mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID, mf.logStorage.EXPECT().WriteTimestamped(bgCtx, job.UUID, task.UUID,
"Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)") "Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)")
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &task, api.TaskStatusActive) mf.stateMachine.EXPECT().TaskStatusChange(bgCtx, &task, api.TaskStatusActive)
err := mf.flamenco.ScheduleTask(echo) err := mf.flamenco.ScheduleTask(echo)
assert.NoError(t, err) assert.NoError(t, err)
@ -73,11 +74,12 @@ func TestTaskScheduleNoTaskAvailable(t *testing.T) {
// Expect a call into the persistence layer, which should return nil. // Expect a call into the persistence layer, which should return nil.
ctx := echo.Request().Context() ctx := echo.Request().Context()
bgCtx := gomock.Not(ctx)
mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(nil, nil) mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(nil, nil)
// This call should still trigger a "worker seen" call, as the worker is // This call should still trigger a "worker seen" call, as the worker is
// actively asking for tasks. // actively asking for tasks.
mf.persistence.EXPECT().WorkerSeen(ctx, &worker) mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
err := mf.flamenco.ScheduleTask(echo) err := mf.flamenco.ScheduleTask(echo)
assert.NoError(t, err) assert.NoError(t, err)
@ -97,7 +99,8 @@ func TestTaskScheduleNonActiveStatus(t *testing.T) {
// The worker should be marked as 'seen', even when it's in a state that // The worker should be marked as 'seen', even when it's in a state that
// doesn't allow task execution. // doesn't allow task execution.
mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) bgCtx := gomock.Not(echoCtx.Request().Context())
mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
err := mf.flamenco.ScheduleTask(echoCtx) err := mf.flamenco.ScheduleTask(echoCtx)
assert.NoError(t, err) assert.NoError(t, err)
@ -119,7 +122,8 @@ func TestTaskScheduleOtherStatusRequested(t *testing.T) {
// The worker should be marked as 'seen', even when it's in a state that // The worker should be marked as 'seen', even when it's in a state that
// doesn't allow task execution. // doesn't allow task execution.
mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) bgCtx := gomock.Not(echoCtx.Request().Context())
mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
err := mf.flamenco.ScheduleTask(echoCtx) err := mf.flamenco.ScheduleTask(echoCtx)
assert.NoError(t, err) assert.NoError(t, err)
@ -145,7 +149,8 @@ func TestTaskScheduleOtherStatusRequestedAndBadState(t *testing.T) {
// The worker should be marked as 'seen', even when it's in a state that // The worker should be marked as 'seen', even when it's in a state that
// doesn't allow task execution. // doesn't allow task execution.
mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) bgCtx := gomock.Not(echoCtx.Request().Context())
mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker)
err := mf.flamenco.ScheduleTask(echoCtx) err := mf.flamenco.ScheduleTask(echoCtx)
assert.NoError(t, err) assert.NoError(t, err)