Manager: keep track of when a Worker last worked on a task

This will be used for keeping track of stuck tasks.
This commit is contained in:
Sybren A. Stüvel 2022-06-03 16:33:50 +02:00
parent 0be1ca30dd
commit b4d2fc4231
8 changed files with 90 additions and 8 deletions

View File

@ -31,6 +31,8 @@ type PersistenceService interface {
SaveTask(ctx context.Context, task *persistence.Task) error
SaveTaskActivity(ctx context.Context, t *persistence.Task) error
FetchTasksOfWorkerInStatus(context.Context, *persistence.Worker, api.TaskStatus) ([]*persistence.Task, error)
// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection.
TaskTouchedByWorker(context.Context, *persistence.Task) error
CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)

View File

@ -244,6 +244,20 @@ func (mr *MockPersistenceServiceMockRecorder) StoreAuthoredJob(arg0, arg1 interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StoreAuthoredJob", reflect.TypeOf((*MockPersistenceService)(nil).StoreAuthoredJob), arg0, arg1)
}
// TaskTouchedByWorker mocks base method.
func (m *MockPersistenceService) TaskTouchedByWorker(arg0 context.Context, arg1 *persistence.Task) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TaskTouchedByWorker", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// TaskTouchedByWorker indicates an expected call of TaskTouchedByWorker.
func (mr *MockPersistenceServiceMockRecorder) TaskTouchedByWorker(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskTouchedByWorker", reflect.TypeOf((*MockPersistenceService)(nil).TaskTouchedByWorker), arg0, arg1)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct {
ctrl *gomock.Controller

View File

@ -385,8 +385,14 @@ func (f *Flamenco) TaskUpdate(e echo.Context, taskID string) error {
// TODO: check whether this task may undergo the requested status change.
if err := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate); err != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle status update: %v", err)
taskUpdateErr := f.doTaskUpdate(ctx, logger, worker, dbTask, taskUpdate)
workerUpdateErr := f.workerPingedTask(ctx, logger, dbTask)
if taskUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle task update: %v", taskUpdateErr)
}
if workerUpdateErr != nil {
return sendAPIError(e, http.StatusInternalServerError, "unable to handle worker update: %v", workerUpdateErr)
}
return e.NoContent(http.StatusNoContent)
@ -437,6 +443,19 @@ func (f *Flamenco) doTaskUpdate(
return dbErrActivity
}
func (f *Flamenco) workerPingedTask(
ctx context.Context,
logger zerolog.Logger,
task *persistence.Task,
) error {
err := f.persist.TaskTouchedByWorker(ctx, task)
if err != nil {
logger.Error().Err(err).Msg("error marking task as 'touched' by worker")
return err
}
return nil
}
// taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO.
func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) {
// Errors writing the log to file should be logged in our own logging
@ -484,7 +503,9 @@ func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
mkr := mayWorkerRun(worker, dbTask)
if mkr.MayKeepRunning {
// TODO: record that this worker "touched" this task, for timeout calculations.
// Errors saving the "worker pinged task" field in the database are just
// logged. It's not something to bother the worker with.
_ = f.workerPingedTask(ctx, logger, dbTask)
}
return e.JSON(http.StatusOK, mkr)

View File

@ -389,6 +389,14 @@ func TestTaskUpdate(t *testing.T) {
Log: "line1\nline2\n",
})
// Expect a 'touch' of the task.
var touchedTask persistence.Task
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, task *persistence.Task) error {
touchedTask = *task
return nil
})
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
@ -398,6 +406,7 @@ func TestTaskUpdate(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, mockTask.UUID, statusChangedtask.UUID)
assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID)
assert.Equal(t, mockTask.UUID, touchedTask.UUID)
assert.Equal(t, "testing", statusChangedtask.Activity)
assert.Equal(t, "testing", actUpdatedTask.Activity)
}
@ -440,6 +449,9 @@ func TestMayWorkerRun(t *testing.T) {
// Test: happy, task assigned to this worker.
{
// Expect a 'touch' of the task.
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &task).Return(nil)
echo := prepareRequest()
task.WorkerID = &worker.ID
err := mf.flamenco.MayWorkerRun(echo, task.UUID)

View File

@ -7,6 +7,7 @@ import (
"database/sql/driver"
"encoding/json"
"errors"
"time"
"gorm.io/gorm"
@ -43,8 +44,9 @@ type Task struct {
Status api.TaskStatus `gorm:"type:varchar(16);default:''"`
// Which worker is/was working on this.
WorkerID *uint
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
WorkerID *uint
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
LastTouchedAt time.Time
// Dependencies are tasks that need to be completed before this one can run.
Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"`
@ -385,3 +387,14 @@ func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
}
return nil
}
// TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection.
func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error {
tx := db.gormDB.WithContext(ctx).
Model(t).
Updates(Task{LastTouchedAt: db.gormDB.NowFunc()})
if err := tx.Error; err != nil {
return taskError(err, "saving task 'last touched at'")
}
return nil
}

View File

@ -213,6 +213,25 @@ func TestFetchTasksOfWorkerInStatus(t *testing.T) {
assert.Empty(t, tasks, "worker should have no task in status %q", w)
}
func TestTaskTouchedByWorker(t *testing.T) {
ctx, close, db, _, authoredJob := jobTasksTestFixtures(t)
defer close()
task, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
assert.NoError(t, err)
assert.True(t, task.LastTouchedAt.IsZero())
now := db.gormDB.NowFunc()
err = db.TaskTouchedByWorker(ctx, task)
assert.NoError(t, err)
// Test the task instance as well as the database entry.
dbTask, err := db.FetchTask(ctx, task.UUID)
assert.NoError(t, err)
assert.WithinDuration(t, now, task.LastTouchedAt, time.Second)
assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second)
}
func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob {
task1 := job_compilers.AuthoredTask{
Name: "render-1-3",

View File

@ -141,7 +141,6 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
}
func assignTaskToWorker(tx *gorm.DB, w *Worker, t *Task) error {
// Without the Select() call, Gorm will try and also store task.Job in the
// jobs database, which is not what we want.
return tx.Model(t).Select("worker_id").Updates(Task{WorkerID: &w.ID}).Error
return tx.Model(t).
Updates(Task{WorkerID: &w.ID, LastTouchedAt: tx.NowFunc()}).Error
}

View File

@ -51,6 +51,7 @@ func TestOneJobOneTask(t *testing.T) {
assert.Equal(t, w.ID, *task.WorkerID, "task must be assigned to the requesting worker")
// Check the task in the database.
now := db.gormDB.NowFunc()
dbTask, err := db.FetchTask(context.Background(), authTask.UUID)
assert.NoError(t, err)
if dbTask == nil {
@ -60,6 +61,7 @@ func TestOneJobOneTask(t *testing.T) {
t.Fatal("no worker assigned to task")
}
assert.Equal(t, w.ID, *dbTask.WorkerID, "task must be assigned to the requesting worker")
assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second, "task must be 'touched' by the worker after scheduling")
}
func TestOneJobThreeTasksByPrio(t *testing.T) {