Manager: add blocking of workers when they fail certain tasks too much

When a worker fails too many tasks, of the same task type, on the same job,
it'll get blocked from doing those.
This commit is contained in:
Sybren A. Stüvel 2022-06-17 15:03:15 +02:00
parent 56abc825a6
commit fd31a85bcd
10 changed files with 566 additions and 8 deletions

View File

@ -53,6 +53,13 @@ type PersistenceService interface {
// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of workers that failed those tasks. // ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of workers that failed those tasks.
ClearFailureListOfJob(context.Context, *persistence.Job) error ClearFailureListOfJob(context.Context, *persistence.Job) error
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
AddWorkerToJobBlocklist(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) error
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error)
// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type.
CountTaskFailuresOfWorker(ctx context.Context, job *persistence.Job, worker *persistence.Worker, taskType string) (int, error)
// Database queries. // Database queries.
QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error)
QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*persistence.Task, error)

View File

@ -40,6 +40,20 @@ func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder {
return m.recorder return m.recorder
} }
// AddWorkerToJobBlocklist mocks base method.
func (m *MockPersistenceService) AddWorkerToJobBlocklist(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddWorkerToJobBlocklist", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// AddWorkerToJobBlocklist indicates an expected call of AddWorkerToJobBlocklist.
func (mr *MockPersistenceServiceMockRecorder) AddWorkerToJobBlocklist(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToJobBlocklist), arg0, arg1, arg2, arg3)
}
// AddWorkerToTaskFailedList mocks base method. // AddWorkerToTaskFailedList mocks base method.
func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) { func (m *MockPersistenceService) AddWorkerToTaskFailedList(arg0 context.Context, arg1 *persistence.Task, arg2 *persistence.Worker) (int, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -83,6 +97,21 @@ func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1)
} }
// CountTaskFailuresOfWorker mocks base method.
func (m *MockPersistenceService) CountTaskFailuresOfWorker(arg0 context.Context, arg1 *persistence.Job, arg2 *persistence.Worker, arg3 string) (int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CountTaskFailuresOfWorker", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CountTaskFailuresOfWorker indicates an expected call of CountTaskFailuresOfWorker.
func (mr *MockPersistenceServiceMockRecorder) CountTaskFailuresOfWorker(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountTaskFailuresOfWorker", reflect.TypeOf((*MockPersistenceService)(nil).CountTaskFailuresOfWorker), arg0, arg1, arg2, arg3)
}
// CreateWorker mocks base method. // CreateWorker mocks base method.
func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -315,6 +344,21 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{})
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1)
} }
// WorkersLeftToRun mocks base method.
func (m *MockPersistenceService) WorkersLeftToRun(arg0 context.Context, arg1 *persistence.Job, arg2 string) (map[string]bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WorkersLeftToRun", arg0, arg1, arg2)
ret0, _ := ret[0].(map[string]bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WorkersLeftToRun indicates an expected call of WorkersLeftToRun.
func (mr *MockPersistenceServiceMockRecorder) WorkersLeftToRun(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkersLeftToRun", reflect.TypeOf((*MockPersistenceService)(nil).WorkersLeftToRun), arg0, arg1, arg2)
}
// MockChangeBroadcaster is a mock of ChangeBroadcaster interface. // MockChangeBroadcaster is a mock of ChangeBroadcaster interface.
type MockChangeBroadcaster struct { type MockChangeBroadcaster struct {
ctrl *gomock.Controller ctrl *gomock.Controller

View File

@ -159,7 +159,17 @@ func (f *Flamenco) onTaskFailed(
if err != nil { if err != nil {
return fmt.Errorf("adding worker to failure list of task: %w", err) return fmt.Errorf("adding worker to failure list of task: %w", err)
} }
// f.maybeBlocklistWorker(ctx, logger, worker, task)
logger = logger.With().Str("taskType", task.Type).Logger()
shouldHardFail, err := f.maybeBlocklistWorker(ctx, logger, worker, task)
if err != nil {
return fmt.Errorf("block-listing worker: %w", err)
}
if shouldHardFail {
// Hard failure because of blocklisting should simply fail the entire job.
// There are no more workers left to finish it.
return f.failJobAfterCatastroficTaskFailure(ctx, logger, worker, task)
}
// Determine whether this is soft or hard failure. // Determine whether this is soft or hard failure.
threshold := f.config.Get().TaskFailAfterSoftFailCount threshold := f.config.Get().TaskFailAfterSoftFailCount
@ -173,6 +183,120 @@ func (f *Flamenco) onTaskFailed(
return f.hardFailTask(ctx, logger, worker, task, numFailed) return f.hardFailTask(ctx, logger, worker, task, numFailed)
} }
// maybeBlocklistWorker potentially block-lists the Worker, and checks whether
// there are any workers left to run tasks of this type.
//
// Returns "must hard-fail". That is, returns `false` if there are still workers
// left to run tasks of this type, on this job.
//
// If the worker is NOT block-listed at this moment, always returns `false`.
//
// Returns `true` if ALL workers that can execute this task type are blocked
// from working on this job.
func (f *Flamenco) maybeBlocklistWorker(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) (bool, error) {
numFailures, err := f.persist.CountTaskFailuresOfWorker(ctx, task.Job, worker, task.Type)
if err != nil {
return false, fmt.Errorf("counting failures of worker on job %q, task type %q: %w", task.Job.UUID, task.Type, err)
}
// The received task update hasn't been persisted in the database yet,
// so we should count that too.
numFailures++
threshold := f.config.Get().BlocklistThreshold
if numFailures < threshold {
logger.Info().Int("numFailedTasks", numFailures).Msg("not enough failed tasks to blocklist worker")
// TODO: This might need special handling, as this worker will be blocked
// from retrying this particular task. It could have been the last worker to
// be allowed this task type; if that is the case, the job is now stuck.
return false, nil
}
// Blocklist the Worker.
if err := f.blocklistWorker(ctx, logger, worker, task); err != nil {
return false, err
}
// Return hard-failure if there are no workers left for this task type.
numWorkers, err := f.numWorkersCapableOfRunningTask(ctx, task)
return numWorkers == 0, err
}
func (f *Flamenco) blocklistWorker(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) error {
logger.Warn().
Str("job", task.Job.UUID).
Msg("block-listing worker")
err := f.persist.AddWorkerToJobBlocklist(ctx, task.Job, worker, task.Type)
if err != nil {
return fmt.Errorf("adding worker to block list: %w", err)
}
// TODO: requeue all tasks of this job & task type that were hard-failed by this worker.
return nil
}
func (f *Flamenco) numWorkersCapableOfRunningTask(ctx context.Context, task *persistence.Task) (int, error) {
// See which workers are left to run tasks of this type, on this job,
workersLeft, err := f.persist.WorkersLeftToRun(ctx, task.Job, task.Type)
if err != nil {
return 0, fmt.Errorf("fetching workers available to run tasks of type %q on job %q: %w",
task.Job.UUID, task.Type, err)
}
// Remove (from the list of available workers) those who failed this task before.
failers, err := f.persist.FetchTaskFailureList(ctx, task)
if err != nil {
return 0, fmt.Errorf("fetching failure list of task %q: %w", task.UUID, err)
}
for _, failure := range failers {
delete(workersLeft, failure.UUID)
}
return len(workersLeft), nil
}
// failJobAfterCatastroficTaskFailure fails the entire job.
// This function is meant to be called when a task is failed, causing a block of
// the worker, and leaving no workers any more to do tasks of this type.
func (f *Flamenco) failJobAfterCatastroficTaskFailure(
ctx context.Context,
logger zerolog.Logger,
worker *persistence.Worker,
task *persistence.Task,
) error {
taskLog := fmt.Sprintf(
"Task failed by worker %s, Manager will fail the entire job as there are no more workers left for tasks of type %q.",
worker.Identifier(), task.Type,
)
if err := f.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, taskLog); err != nil {
logger.Error().Err(err).Msg("error writing failure notice to task log")
}
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed); err != nil {
logger.Error().
Err(err).
Str("newStatus", string(api.TaskStatusFailed)).
Msg("error changing task status")
}
newJobStatus := api.JobStatusFailed
logger.Info().
Str("job", task.Job.UUID).
Str("newJobStatus", string(newJobStatus)).
Msg("no more workers left to run tasks of this type, failing the entire job")
reason := fmt.Sprintf("no more workers left to run tasks of type %q", task.Type)
return f.stateMachine.JobStatusChange(ctx, task.Job, newJobStatus, reason)
}
func (f *Flamenco) hardFailTask( func (f *Flamenco) hardFailTask(
ctx context.Context, ctx context.Context,
logger zerolog.Logger, logger zerolog.Logger,

View File

@ -464,24 +464,30 @@ func TestTaskUpdateFailed(t *testing.T) {
WorkerID: &worker.ID, WorkerID: &worker.ID,
Job: &mockJob, Job: &mockJob,
Activity: "pre-update activity", Activity: "pre-update activity",
Type: "misc",
} }
conf := config.Conf{ conf := config.Conf{
Base: config.Base{ Base: config.Base{
TaskFailAfterSoftFailCount: 3, TaskFailAfterSoftFailCount: 3,
BlocklistThreshold: 65535, // This test doesn't cover blocklisting.
}, },
} }
mf.config.EXPECT().Get().Return(&conf).AnyTimes() mf.config.EXPECT().Get().Return(&conf).AnyTimes()
const numSubTests = 2
// Expect the task to be fetched for each sub-test: // Expect the task to be fetched for each sub-test:
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(2) mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(numSubTests)
// Expect a 'touch' of the task for each sub-test: // Expect a 'touch' of the task for each sub-test:
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(2) mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(numSubTests)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(2) mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(numSubTests)
// Mimick that this is always first failure of this worker/job/tasktype combo:
mf.persistence.EXPECT().CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").Return(0, nil).Times(numSubTests)
{ {
// Expect the Worker to be added to the list of workers. // Expect the Worker to be added to the list of failed workers.
// This returns 1, which is less than the failure threshold -> soft failure expected. // This returns 1, which is less than the failure threshold -> soft failure expected.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil) mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
@ -515,6 +521,146 @@ func TestTaskUpdateFailed(t *testing.T) {
} }
} }
func TestBlockingAfterFailure(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
// Construct the JSON request object.
taskUpdate := api.TaskUpdateJSONRequestBody{
TaskStatus: ptr(api.TaskStatusFailed),
}
// Construct the task that's supposed to be updated.
taskID := "181eab68-1123-4790-93b1-94309a899411"
jobID := "e4719398-7cfa-4877-9bab-97c2d6c158b5"
mockJob := persistence.Job{UUID: jobID}
mockTask := persistence.Task{
UUID: taskID,
Worker: &worker,
WorkerID: &worker.ID,
Job: &mockJob,
Activity: "pre-update activity",
Type: "misc",
}
conf := config.Conf{
Base: config.Base{
TaskFailAfterSoftFailCount: 3,
BlocklistThreshold: 3,
},
}
mf.config.EXPECT().Get().Return(&conf).AnyTimes()
const numSubTests = 3
// Expect the task to be fetched for each sub-test:
mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&mockTask, nil).Times(numSubTests)
// Expect a 'touch' of the task for each sub-test:
mf.persistence.EXPECT().TaskTouchedByWorker(gomock.Any(), &mockTask).Times(numSubTests)
mf.persistence.EXPECT().WorkerSeen(gomock.Any(), &worker).Times(numSubTests)
// Mimick that this is the 3rd of this worker/job/tasktype combo, and thus should trigger a block.
// Returns 2 because there have been 2 previous failures.
mf.persistence.EXPECT().
CountTaskFailuresOfWorker(gomock.Any(), &mockJob, &worker, "misc").
Return(2, nil).
Times(numSubTests)
// Expect the worker to be blocked.
mf.persistence.EXPECT().
AddWorkerToJobBlocklist(gomock.Any(), &mockJob, &worker, "misc").
Times(numSubTests)
{
// Mimick that there is another worker to work on this task, so the job should continue happily.
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{"60453eec-5a26-43e9-9da2-d00506d492cc": true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect the Worker to be added to the list of failed workers for this task.
// This returns 1, which is less than the failure threshold -> soft failure.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
// Expect soft failure of the task.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusSoftFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by 1 worker, Manager will mark it as soft failure. 2 more failures will cause hard failure.")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
{
// Test without any workers left to run these tasks on this job due to blocklisting. This should fail the entire job.
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{ /* It shouldn't matter whether the failing worker is here or not. */ }, nil)
// Expect the Worker to be added to the list of failed workers for this task.
// This returns 1, which is less than the failure threshold -> soft failure if it were only based on this metric.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
// Expect hard failure of the task, because there are no workers left to perfom it.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+
"as there are no more workers left for tasks of type \"misc\".")
// Expect failure of the job.
mf.stateMachine.EXPECT().
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
{
// Test that no worker has been blocklisted, but the one available one did fail this task.
// This also makes the task impossible to run, and should just fail the entire job.
theOtherFailingWorker := persistence.Worker{
UUID: "ce312357-29cd-4389-81ab-4d43e30945f8",
}
mf.persistence.EXPECT().WorkersLeftToRun(gomock.Any(), &mockJob, "misc").
Return(map[string]bool{theOtherFailingWorker.UUID: true}, nil)
mf.persistence.EXPECT().FetchTaskFailureList(gomock.Any(), &mockTask).
Return([]*persistence.Worker{&theOtherFailingWorker}, nil)
// Expect the Worker to be added to the list of failed workers for this task.
// This returns 1, which is less than the failure threshold -> soft failure if it were only based on this metric.
mf.persistence.EXPECT().AddWorkerToTaskFailedList(gomock.Any(), &mockTask, &worker).Return(1, nil)
// Expect hard failure of the task, because there are no workers left to perfom it.
mf.stateMachine.EXPECT().TaskStatusChange(gomock.Any(), &mockTask, api.TaskStatusFailed)
mf.logStorage.EXPECT().WriteTimestamped(gomock.Any(), jobID, taskID,
"Task failed by worker дрон (e7632d62-c3b8-4af0-9e78-01752928952c), Manager will fail the entire job "+
"as there are no more workers left for tasks of type \"misc\".")
// Expect failure of the job.
mf.stateMachine.EXPECT().
JobStatusChange(gomock.Any(), &mockJob, api.JobStatusFailed, "no more workers left to run tasks of type \"misc\"")
// Do the call.
echoCtx := mf.prepareMockedJSONRequest(taskUpdate)
requestWorkerStore(echoCtx, &worker)
err := mf.flamenco.TaskUpdate(echoCtx, taskID)
assert.NoError(t, err)
assertResponseEmpty(t, echoCtx)
}
}
func TestMayWorkerRun(t *testing.T) { func TestMayWorkerRun(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()

View File

@ -92,7 +92,7 @@ type Base struct {
/* This many failures (on a given job+task type combination) will ban a worker /* This many failures (on a given job+task type combination) will ban a worker
* from that task type on that job. */ * from that task type on that job. */
// BlocklistThreshold int `yaml:"blocklist_threshold"` BlocklistThreshold int `yaml:"blocklist_threshold"`
// When this many workers have tried the task and failed, it will be hard-failed // When this many workers have tried the task and failed, it will be hard-failed
// (even when there are workers left that could technically retry the task). // (even when there are workers left that could technically retry the task).

View File

@ -39,7 +39,7 @@ var defaultConfig = Conf{
// // be accurate enough for this type of cleanup. // // be accurate enough for this type of cleanup.
// TaskCleanupMaxAge: 14 * 24 * time.Hour, // TaskCleanupMaxAge: 14 * 24 * time.Hour,
// BlocklistThreshold: 3, BlocklistThreshold: 3,
TaskFailAfterSoftFailCount: 3, TaskFailAfterSoftFailCount: 3,
// WorkerCleanupStatus: []string{string(api.WorkerStatusOffline)}, // WorkerCleanupStatus: []string{string(api.WorkerStatusOffline)},

View File

@ -20,6 +20,8 @@ func TestDefaultSettings(t *testing.T) {
assert.Equal(t, false, config.Variables["ffmpeg"].IsTwoWay) assert.Equal(t, false, config.Variables["ffmpeg"].IsTwoWay)
assert.Equal(t, "ffmpeg", config.Variables["ffmpeg"].Values[0].Value) assert.Equal(t, "ffmpeg", config.Variables["ffmpeg"].Values[0].Value)
assert.Equal(t, VariablePlatformLinux, config.Variables["ffmpeg"].Values[0].Platform) assert.Equal(t, VariablePlatformLinux, config.Variables["ffmpeg"].Values[0].Platform)
assert.Greater(t, config.BlocklistThreshold, 0)
} }
func TestVariableValidation(t *testing.T) { func TestVariableValidation(t *testing.T) {

View File

@ -7,7 +7,13 @@ import (
) )
func (db *DB) migrate() error { func (db *DB) migrate() error {
err := db.gormDB.AutoMigrate(&Job{}, &Task{}, &TaskFailure{}, &Worker{}) err := db.gormDB.AutoMigrate(
&Job{},
&JobBlock{},
&Task{},
&TaskFailure{},
&Worker{},
)
if err != nil { if err != nil {
return fmt.Errorf("failed to automigrate database: %v", err) return fmt.Errorf("failed to automigrate database: %v", err)
} }

View File

@ -0,0 +1,92 @@
package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"math"
"time"
"gorm.io/gorm/clause"
)
// JobBlock keeps track of which Worker is not allowed to run which task type on which job.
type JobBlock struct {
// Don't include the standard Gorm UpdatedAt or DeletedAt fields, as they're useless here.
// Entries will never be updated, and should never be soft-deleted but just purged from existence.
ID uint
CreatedAt time.Time
JobID uint `gorm:"default:0;uniqueIndex:job_worker_tasktype"`
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"`
WorkerID uint `gorm:"default:0;uniqueIndex:job_worker_tasktype"`
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
TaskType string `gorm:"uniqueIndex:job_worker_tasktype"`
}
// AddWorkerToJobBlocklist prevents this Worker of getting any task, of this type, on this job, from the task scheduler.
func (db *DB) AddWorkerToJobBlocklist(ctx context.Context, job *Job, worker *Worker, taskType string) error {
entry := JobBlock{
Job: job,
Worker: worker,
TaskType: taskType,
}
tx := db.gormDB.WithContext(ctx).
Clauses(clause.OnConflict{DoNothing: true}).
Create(&entry)
return tx.Error
}
// WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job.
//
// NOTE: this does NOT consider the task failure list, which blocks individual
// workers from individual tasks. This is ONLY concerning the job blocklist.
func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (map[string]bool, error) {
// Find the IDs of the workers blocked on this job + tasktype combo.
blockedWorkers := db.gormDB.
Table("workers as blocked_workers").
Select("blocked_workers.id").
Joins("inner join job_blocks JB on blocked_workers.id = JB.worker_id").
Where("JB.job_id = ?", job.ID).
Where("JB.task_type = ?", taskType)
// Find the workers NOT blocked.
workers := []*Worker{}
tx := db.gormDB.WithContext(ctx).
Model(&Worker{}).
Select("uuid").
Where("id not in (?)", blockedWorkers).
Scan(&workers)
if tx.Error != nil {
return nil, tx.Error
}
// From the list of workers, construct the map of UUIDs.
uuidMap := map[string]bool{}
for _, worker := range workers {
uuidMap[worker.UUID] = true
}
return uuidMap, nil
}
// CountTaskFailuresOfWorker returns the number of task failures of this worker, on this particular job and task type.
func (db *DB) CountTaskFailuresOfWorker(ctx context.Context, job *Job, worker *Worker, taskType string) (int, error) {
var numFailures int64
tx := db.gormDB.WithContext(ctx).
Model(&TaskFailure{}).
Joins("inner join tasks T on task_failures.task_id = T.id").
Where("task_failures.worker_id = ?", worker.ID).
Where("T.job_id = ?", job.ID).
Where("T.type = ?", taskType).
Count(&numFailures)
if numFailures > math.MaxInt {
panic("overflow error in number of failures")
}
return int(numFailures), tx.Error
}

View File

@ -0,0 +1,137 @@
package persistence
import (
"testing"
"github.com/stretchr/testify/assert"
)
// SPDX-License-Identifier: GPL-3.0-or-later
func TestAddWorkerToJobBlocklist(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()
worker := createWorker(ctx, t, db)
{
// Add a worker to the block list.
err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender")
assert.NoError(t, err)
list := []JobBlock{}
tx := db.gormDB.Model(&JobBlock{}).Scan(&list)
assert.NoError(t, tx.Error)
if assert.Len(t, list, 1) {
entry := list[0]
assert.Equal(t, entry.JobID, job.ID)
assert.Equal(t, entry.WorkerID, worker.ID)
assert.Equal(t, entry.TaskType, "blender")
}
}
{
// Adding the same worker again should be a no-op.
err := db.AddWorkerToJobBlocklist(ctx, job, worker, "blender")
assert.NoError(t, err)
list := []JobBlock{}
tx := db.gormDB.Model(&JobBlock{}).Scan(&list)
assert.NoError(t, tx.Error)
assert.Len(t, list, 1, "No new entry should have been created")
}
}
func TestWorkersLeftToRun(t *testing.T) {
ctx, close, db, job, _ := jobTasksTestFixtures(t)
defer close()
// No workers.
left, err := db.WorkersLeftToRun(ctx, job, "blender")
assert.NoError(t, err)
assert.Empty(t, left)
worker1 := createWorker(ctx, t, db)
worker2 := createWorkerFrom(ctx, t, db, *worker1)
uuidMap := func(workers ...*Worker) map[string]bool {
theMap := map[string]bool{}
for _, worker := range workers {
theMap[worker.UUID] = true
}
return theMap
}
// Two workers, no blocklist.
left, err = db.WorkersLeftToRun(ctx, job, "blender")
if assert.NoError(t, err) {
assert.Equal(t, uuidMap(worker1, worker2), left)
}
// Two workers, one blocked.
_ = db.AddWorkerToJobBlocklist(ctx, job, worker1, "blender")
left, err = db.WorkersLeftToRun(ctx, job, "blender")
if assert.NoError(t, err) {
assert.Equal(t, uuidMap(worker2), left)
}
// Two workers, both blocked.
_ = db.AddWorkerToJobBlocklist(ctx, job, worker2, "blender")
left, err = db.WorkersLeftToRun(ctx, job, "blender")
assert.NoError(t, err)
assert.Empty(t, left)
// Two workers, unknown job.
fakeJob := Job{Model: Model{ID: 327}}
left, err = db.WorkersLeftToRun(ctx, &fakeJob, "blender")
if assert.NoError(t, err) {
assert.Equal(t, uuidMap(worker1, worker2), left)
}
}
func TestCountTaskFailuresOfWorker(t *testing.T) {
ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t)
defer close()
task0, _ := db.FetchTask(ctx, authoredJob.Tasks[0].UUID)
task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID)
task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID)
// Sanity check on the test data.
assert.Equal(t, "blender", task0.Type)
assert.Equal(t, "blender", task1.Type)
assert.Equal(t, "ffmpeg", task2.Type)
worker1 := createWorker(ctx, t, db)
worker2 := createWorkerFrom(ctx, t, db, *worker1)
// Store some failures for different tasks
_, _ = db.AddWorkerToTaskFailedList(ctx, task0, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker1)
_, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker2)
_, _ = db.AddWorkerToTaskFailedList(ctx, task2, worker1)
// Multiple failures.
numBlender1, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker1, "blender")
if assert.NoError(t, err) {
assert.Equal(t, 2, numBlender1)
}
// Single failure, but multiple tasks exist of this type.
numBlender2, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker2, "blender")
if assert.NoError(t, err) {
assert.Equal(t, 1, numBlender2)
}
// Single failure, only one task of this type exists.
numFFMpeg1, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker1, "ffmpeg")
if assert.NoError(t, err) {
assert.Equal(t, 1, numFFMpeg1)
}
// No failure.
numFFMpeg2, err := db.CountTaskFailuresOfWorker(ctx, dbJob, worker2, "ffmpeg")
if assert.NoError(t, err) {
assert.Equal(t, 0, numFFMpeg2)
}
}