From c5debdeb70848da7cc4f65e35d689a4d9b7761e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 13 Jun 2022 16:51:19 +0200 Subject: [PATCH] Manager: add 'task failure list' to record workers failing tasks The persistence layer can now store which worker failed which task, as preparation for a blocklisting system. Such a system should be able to determine whether there are still any workers left to do the work. --- internal/manager/persistence/db_migration.go | 2 +- internal/manager/persistence/jobs.go | 48 ++++++++++++++++++++ internal/manager/persistence/jobs_test.go | 41 +++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 997dea58..cafd3f5e 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -7,7 +7,7 @@ import ( ) func (db *DB) migrate() error { - err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &Worker{}) + err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &TaskFailure{}, &Worker{}) if err != nil { return fmt.Errorf("failed to automigrate database: %v", err) } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index f02b2fe1..1fa9fc5f 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -7,9 +7,12 @@ import ( "database/sql/driver" "encoding/json" "errors" + "math" "time" + "github.com/rs/zerolog/log" "gorm.io/gorm" + "gorm.io/gorm/clause" "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/pkg/api" @@ -95,6 +98,17 @@ func (js *StringStringMap) Scan(value interface{}) error { return json.Unmarshal(b, &js) } +// TaskFailure keeps track of which Worker failed which Task. +type TaskFailure struct { + // Don't include the standard Gorm ID, UpdatedAt, or DeletedAt fields, as they're useless here. + // Entries will never be updated, and should never be soft-deleted but just purged from existence. + CreatedAt time.Time + TaskID uint `gorm:"primaryKey;autoIncrement:false"` + Task *Task `gorm:"foreignkey:TaskID;references:ID;constraint:OnDelete:CASCADE"` + WorkerID uint `gorm:"primaryKey;autoIncrement:false"` + Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` +} + // StoreJob stores an AuthoredJob and its tasks, and saves it to the database. // The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status. func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { @@ -411,3 +425,37 @@ func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error { } return nil } + +// AddWorkerToTaskFailedList records that the given worker failed the given task. +// This information is not used directly by the task scheduler. It's used to +// determine whether there are any workers left to perform this task, and thus +// whether it should be hard- or soft-failed. +// +// Calling this multiple times with the same task/worker is a no-op. +// +// Returns the new number of workers that failed this task. +func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) { + entry := TaskFailure{ + Task: t, + Worker: w, + } + tx := db.gormDB.WithContext(ctx). + Clauses(clause.OnConflict{DoNothing: true}). + Create(&entry) + if tx.Error != nil { + return 0, tx.Error + } + + var numFailed64 int64 + tx = db.gormDB.WithContext(ctx).Model(&TaskFailure{}). + Where("task_id=?", t.ID). + Count(&numFailed64) + + // Integer literals are of type `int`, so that's just a bit nicer to work with + // than `int64`. + if numFailed64 > math.MaxUint32 { + log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here") + return math.MaxUint32, tx.Error + } + return int(numFailed64), tx.Error +} diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index d8465dc5..2d48675e 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -263,6 +263,47 @@ func TestTaskTouchedByWorker(t *testing.T) { assert.WithinDuration(t, now, dbTask.LastTouchedAt, time.Second) } +func TestAddWorkerToTaskFailedList(t *testing.T) { + ctx, close, db, _, authoredJob := jobTasksTestFixtures(t) + defer close() + + task, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + assert.NoError(t, err) + + worker1 := createWorker(ctx, t, db) + + // Create another working, using the 1st as template: + newWorker := *worker1 + newWorker.ID = 0 + newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" + newWorker.Name = "Worker 2" + assert.NoError(t, db.SaveWorker(ctx, &newWorker)) + worker2, err := db.FetchWorker(ctx, newWorker.UUID) + assert.NoError(t, err) + + // First failure should be registered just fine. + numFailed, err := db.AddWorkerToTaskFailedList(ctx, task, worker1) + assert.NoError(t, err) + assert.Equal(t, 1, numFailed) + + // Calling again should be a no-op and not cause any errors. + numFailed, err = db.AddWorkerToTaskFailedList(ctx, task, worker1) + assert.NoError(t, err) + assert.Equal(t, 1, numFailed) + + // Another worker should be able to fail this task as well. + numFailed, err = db.AddWorkerToTaskFailedList(ctx, task, worker2) + assert.NoError(t, err) + assert.Equal(t, 2, numFailed) + + // Deleting the task should also delete the failures. + assert.NoError(t, db.DeleteJob(ctx, authoredJob.JobID)) + var num int64 + tx := db.gormDB.Model(&TaskFailure{}).Count(&num) + assert.NoError(t, tx.Error) + assert.Zero(t, num) +} + func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { task1 := job_compilers.AuthoredTask{ Name: "render-1-3",