From 0697f71b629c15705d63bbb7d5153488ee3716da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 18 Jul 2022 16:26:06 +0200 Subject: [PATCH] Manager: run some operations in a background context Run some API operations in a background context. This should prevent some of the SQLite "interrupted" errors, as those can occur when the context closes while a query is running. The API operations that Workers use are now mostly running in a separate background context, at least from the moment onward when they can run independently of the Worker connection. --- internal/manager/api_impl/background.go | 17 ++++++ .../manager/api_impl/worker_task_updates.go | 9 ++- internal/manager/api_impl/workers.go | 61 ++++++++++--------- internal/manager/api_impl/workers_test.go | 21 ++++--- 4 files changed, 68 insertions(+), 40 deletions(-) create mode 100644 internal/manager/api_impl/background.go diff --git a/internal/manager/api_impl/background.go b/internal/manager/api_impl/background.go new file mode 100644 index 00000000..3d0e6706 --- /dev/null +++ b/internal/manager/api_impl/background.go @@ -0,0 +1,17 @@ +package api_impl + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "time" +) + +const bgContextTimeout = 10 * time.Second + +// bgContext returns a background context for background processing. This +// context MUST be used when a database query is meant to be independent of any +// API call that triggered it. +func bgContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), bgContextTimeout) +} diff --git a/internal/manager/api_impl/worker_task_updates.go b/internal/manager/api_impl/worker_task_updates.go index f6f13eb9..f6767270 100644 --- a/internal/manager/api_impl/worker_task_updates.go +++ b/internal/manager/api_impl/worker_task_updates.go @@ -67,9 +67,12 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error { "task status %s not allowed to be sent by Worker", *taskUpdate.TaskStatus) } - taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate) - workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask) - workerSeenErr := f.workerSeen(ctx, logger, worker) + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + taskUpdateErr := f.doTaskUpdate(bgCtx, logger, worker, dbTask, taskUpdate) + workerUpdateErr := f.workerPingedTask(logger, dbTask) + workerSeenErr := f.workerSeen(logger, worker) if taskUpdateErr != nil { return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 64be08a9..3f704f3b 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "strings" - "time" "github.com/labstack/echo/v4" "github.com/rs/zerolog" @@ -130,7 +129,7 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON return nil, "", err } - err = f.workerSeen(ctx, logger, w) + err = f.workerSeen(logger, w) if err != nil { return nil, "", err } @@ -141,13 +140,6 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON func (f *Flamenco) SignOff(e echo.Context) error { logger := requestLogger(e) - var req api.SignOnJSONBody - err := e.Bind(&req) - if err != nil { - logger.Warn().Err(err).Msg("bad request received") - return sendAPIError(e, http.StatusBadRequest, "invalid format") - } - logger.Info().Msg("worker signing off") w := requestWorkerOrPanic(e) prevStatus := w.Status @@ -158,10 +150,10 @@ func (f *Flamenco) SignOff(e echo.Context) error { // Pass a generic background context, as these changes should be stored even // when the HTTP connection is aborted. - ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer ctxCancel() + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() - err = f.persist.SaveWorkerStatus(ctx, w) + err := f.persist.SaveWorkerStatus(bgCtx, w) if err != nil { logger.Warn(). Err(err). @@ -171,10 +163,10 @@ func (f *Flamenco) SignOff(e echo.Context) error { } // Ignore database errors here; the rest of the signoff process should just happen. - _ = f.workerSeen(ctx, logger, w) + _ = f.workerSeen(logger, w) // Re-queue all tasks (should be only one) this worker is now working on. - err = f.stateMachine.RequeueActiveTasksOfWorker(ctx, w, "worker signed off") + err = f.stateMachine.RequeueActiveTasksOfWorker(bgCtx, w, "worker signed off") if err != nil { return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") } @@ -230,8 +222,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { w.StatusChangeClear() } - ctx := e.Request().Context() - err = f.persist.SaveWorkerStatus(ctx, w) + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + err = f.persist.SaveWorkerStatus(bgCtx, w) if err != nil { logger.Warn().Err(err). Str("newStatus", string(w.Status)). @@ -239,7 +233,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") } - if err := f.workerSeen(ctx, logger, w); err != nil { + if err := f.workerSeen(logger, w); err != nil { return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database") } @@ -253,7 +247,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { func (f *Flamenco) ScheduleTask(e echo.Context) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) - ctx := e.Request().Context() + reqCtx := e.Request().Context() logger.Debug().Msg("worker requesting task") f.taskSchedulerMutex.Lock() @@ -262,7 +256,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { // The worker is actively asking for a task, so note that it was seen // regardless of any failures below, or whether there actually is a task to // run. - if err := f.workerSeen(ctx, logger, worker); err != nil { + if err := f.workerSeen(logger, worker); err != nil { return sendAPIError(e, http.StatusInternalServerError, "error storing worker 'last seen' timestamp in database") } @@ -289,7 +283,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { } // Get a task to execute: - dbTask, err := f.persist.ScheduleTask(e.Request().Context(), worker) + dbTask, err := f.persist.ScheduleTask(reqCtx, worker) if err != nil { if persistence.ErrIsDBBusy(err) { logger.Warn().Msg("database busy scheduling task for worker") @@ -302,6 +296,11 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { return e.NoContent(http.StatusNoContent) } + // The task is assigned to the Worker now. Even when it disconnects, the + // processing of the task should continue. + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + // Add a note to the task log about the worker assignment. msg := fmt.Sprintf("Task assigned to worker %s (%s)", worker.Name, worker.UUID) if err := f.logStorage.WriteTimestamped(logger, dbTask.Job.UUID, dbTask.UUID, msg); err != nil { @@ -310,12 +309,12 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { // Move the task to 'active' status so that it won't be assigned to another // worker. This also enables the task timeout monitoring. - if err := f.stateMachine.TaskStatusChange(ctx, dbTask, api.TaskStatusActive); err != nil { + if err := f.stateMachine.TaskStatusChange(bgCtx, dbTask, api.TaskStatusActive); err != nil { return sendAPIError(e, http.StatusInternalServerError, "internal error marking task as active: %v", err) } // Start timeout measurement as soon as the Worker gets the task assigned. - if err := f.workerPingedTask(ctx, logger, dbTask); err != nil { + if err := f.workerPingedTask(logger, dbTask); err != nil { return sendAPIError(e, http.StatusInternalServerError, "internal error updating task for timeout calculation: %v", err) } @@ -353,7 +352,7 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { Int64("imageSizeBytes", filesize). Logger() - err := f.workerSeen(ctx, logger, worker) + err := f.workerSeen(logger, worker) if err != nil { return sendAPIError(e, http.StatusInternalServerError, "error updating 'last seen' timestamp of worker: %v", err) } @@ -439,11 +438,13 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { } func (f *Flamenco) workerPingedTask( - ctx context.Context, logger zerolog.Logger, task *persistence.Task, ) error { - err := f.persist.TaskTouchedByWorker(ctx, task) + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + err := f.persist.TaskTouchedByWorker(bgCtx, task) if err != nil { logger.Error().Err(err).Msg("error marking task as 'touched' by worker") return err @@ -453,11 +454,13 @@ func (f *Flamenco) workerPingedTask( // workerSeen marks the worker as 'seen' and logs any database error that may occur. func (f *Flamenco) workerSeen( - ctx context.Context, logger zerolog.Logger, w *persistence.Worker, ) error { - err := f.persist.WorkerSeen(ctx, w) + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + err := f.persist.WorkerSeen(bgCtx, w) if err != nil { logger.Error().Err(err).Msg("error marking Worker as 'seen' in the database") return err @@ -499,9 +502,9 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { // Errors saving the "worker pinged task" and "worker seen" fields in the // database are just logged. It's not something to bother the worker with. if mkr.MayKeepRunning { - _ = f.workerPingedTask(ctx, logger, dbTask) + _ = f.workerPingedTask(logger, dbTask) } - _ = f.workerSeen(ctx, logger, worker) + _ = f.workerSeen(logger, worker) return e.JSON(http.StatusOK, mkr) } diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 1401abe6..d20c5c5b 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -38,14 +38,15 @@ func TestTaskScheduleHappy(t *testing.T) { } ctx := echo.Request().Context() + bgCtx := gomock.Not(ctx) mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(&task, nil) - mf.persistence.EXPECT().TaskTouchedByWorker(ctx, &task) - mf.persistence.EXPECT().WorkerSeen(ctx, &worker) + mf.persistence.EXPECT().TaskTouchedByWorker(bgCtx, &task) + mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) - mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, task.UUID, + mf.logStorage.EXPECT().WriteTimestamped(bgCtx, job.UUID, task.UUID, "Task assigned to worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c)") - mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &task, api.TaskStatusActive) + mf.stateMachine.EXPECT().TaskStatusChange(bgCtx, &task, api.TaskStatusActive) err := mf.flamenco.ScheduleTask(echo) assert.NoError(t, err) @@ -73,11 +74,12 @@ func TestTaskScheduleNoTaskAvailable(t *testing.T) { // Expect a call into the persistence layer, which should return nil. ctx := echo.Request().Context() + bgCtx := gomock.Not(ctx) mf.persistence.EXPECT().ScheduleTask(ctx, &worker).Return(nil, nil) // This call should still trigger a "worker seen" call, as the worker is // actively asking for tasks. - mf.persistence.EXPECT().WorkerSeen(ctx, &worker) + mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) err := mf.flamenco.ScheduleTask(echo) assert.NoError(t, err) @@ -97,7 +99,8 @@ func TestTaskScheduleNonActiveStatus(t *testing.T) { // The worker should be marked as 'seen', even when it's in a state that // doesn't allow task execution. - mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) + bgCtx := gomock.Not(echoCtx.Request().Context()) + mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) err := mf.flamenco.ScheduleTask(echoCtx) assert.NoError(t, err) @@ -119,7 +122,8 @@ func TestTaskScheduleOtherStatusRequested(t *testing.T) { // The worker should be marked as 'seen', even when it's in a state that // doesn't allow task execution. - mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) + bgCtx := gomock.Not(echoCtx.Request().Context()) + mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) err := mf.flamenco.ScheduleTask(echoCtx) assert.NoError(t, err) @@ -145,7 +149,8 @@ func TestTaskScheduleOtherStatusRequestedAndBadState(t *testing.T) { // The worker should be marked as 'seen', even when it's in a state that // doesn't allow task execution. - mf.persistence.EXPECT().WorkerSeen(echoCtx.Request().Context(), &worker) + bgCtx := gomock.Not(echoCtx.Request().Context()) + mf.persistence.EXPECT().WorkerSeen(bgCtx, &worker) err := mf.flamenco.ScheduleTask(echoCtx) assert.NoError(t, err)