diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 66b632b1..08f135f9 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -26,13 +26,18 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { logger := log.With().Str("worker", w.UUID).Logger() logger.Trace().Msg("finding task for worker") + hasWorkerClusters, err := db.HasWorkerClusters(ctx) + if err != nil { + return nil, err + } + // Run two queries in one transaction: // 1. find task, and // 2. assign the task to the worker. var task *Task txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var err error - task, err = findTaskForWorker(tx, w) + task, err = findTaskForWorker(tx, w, hasWorkerClusters) if err != nil { if isDatabaseBusyError(err) { logger.Trace().Err(err).Msg("database busy while finding task for worker") @@ -79,7 +84,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { return task, nil } -func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { +func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, error) { task := Task{} // If a task is alreay active & assigned to this worker, return just that. @@ -124,15 +129,22 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) { Where("TF.worker_id is NULL"). // Not failed before Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted - if len(w.Clusters) > 0 { - // Worker is assigned to one or more clusters, so limit the available jobs - // to those that have no cluster, or overlap with the Worker's clusters. - clusterIDs := []uint{} - for _, cluster := range w.Clusters { - clusterIDs = append(clusterIDs, cluster.ID) + if checkWorkerClusters { + // The system has one or more clusters, so limit the available jobs to those + // that have no cluster, or overlap with the Worker's clusters. + if len(w.Clusters) == 0 { + // Clusterless workers only get clusterless jobs. + findTaskQuery = findTaskQuery. + Where("jobs.worker_cluster_id is NULL") + } else { + // Clustered workers get clusterless jobs AND jobs of their own clusters. + clusterIDs := []uint{} + for _, cluster := range w.Clusters { + clusterIDs = append(clusterIDs, cluster.ID) + } + findTaskQuery = findTaskQuery. + Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs) } - findTaskQuery = findTaskQuery. - Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs) } findTaskResult := findTaskQuery. diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index bc858cbe..c0acfc95 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -301,17 +301,23 @@ func TestWorkerClusterJobWithCluster(t *testing.T) { require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2)) // Create a worker in cluster1: - w := linuxWorker(t, db, func(w *Worker) { + workerC := linuxWorker(t, db, func(w *Worker) { w.Clusters = []*WorkerCluster{&cluster1} }) + // Create a worker without cluster: + workerNC := linuxWorker(t, db, func(w *Worker) { + w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" + w.Clusters = nil + }) + { // Test job with different cluster: authTask := authorTestTask("the task", "blender") job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask) job.WorkerClusterUUID = cluster2.UUID constructTestJob(ctx, t, db, job) - task, err := db.ScheduleTask(ctx, &w) + task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) assert.Nil(t, task, "job with different cluster should not be scheduled") } @@ -322,10 +328,14 @@ func TestWorkerClusterJobWithCluster(t *testing.T) { job.WorkerClusterUUID = cluster1.UUID constructTestJob(ctx, t, db, job) - task, err := db.ScheduleTask(ctx, &w) + task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) require.NotNil(t, task, "job with matching cluster should be scheduled") assert.Equal(t, authTask.UUID, task.UUID) + + task, err = db.ScheduleTask(ctx, &workerNC) + require.NoError(t, err) + assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster") } } @@ -338,18 +348,29 @@ func TestWorkerClusterJobWithoutCluster(t *testing.T) { require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) // Create a worker in cluster1: - w := linuxWorker(t, db, func(w *Worker) { + workerC := linuxWorker(t, db, func(w *Worker) { w.Clusters = []*WorkerCluster{&cluster1} }) + // Create a worker without cluster: + workerNC := linuxWorker(t, db, func(w *Worker) { + w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" + w.Clusters = nil + }) + // Test cluster-less job: authTask := authorTestTask("the task", "blender") job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) constructTestJob(ctx, t, db, job) - task, err := db.ScheduleTask(ctx, &w) + task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - require.NotNil(t, task, "job without cluster should always be scheduled") + require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster") + assert.Equal(t, authTask.UUID, task.UUID) + + task, err = db.ScheduleTask(ctx, &workerNC) + require.NoError(t, err) + require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster") assert.Equal(t, authTask.UUID, task.UUID) } diff --git a/internal/manager/persistence/worker_cluster.go b/internal/manager/persistence/worker_cluster.go index 6d1c6ba9..60974e6d 100644 --- a/internal/manager/persistence/worker_cluster.go +++ b/internal/manager/persistence/worker_cluster.go @@ -26,6 +26,18 @@ func (db *DB) CreateWorkerCluster(ctx context.Context, wc *WorkerCluster) error return nil } +// HasWorkerClusters returns whether there are any clusters defined at all. +func (db *DB) HasWorkerClusters(ctx context.Context) (bool, error) { + var count int64 + tx := db.gormDB.WithContext(ctx). + Model(&WorkerCluster{}). + Count(&count) + if err := tx.Error; err != nil { + return false, workerClusterError(err, "counting worker clusters") + } + return count > 0, nil +} + func (db *DB) FetchWorkerCluster(ctx context.Context, uuid string) (*WorkerCluster, error) { tx := db.gormDB.WithContext(ctx) return fetchWorkerCluster(tx, uuid) diff --git a/internal/manager/persistence/worker_cluster_test.go b/internal/manager/persistence/worker_cluster_test.go index e520fbc4..ed054c6b 100644 --- a/internal/manager/persistence/worker_cluster_test.go +++ b/internal/manager/persistence/worker_cluster_test.go @@ -38,6 +38,11 @@ func TestFetchDeleteClusters(t *testing.T) { f := workerTestFixtures(t, 1*time.Second) defer f.done() + // Single cluster was created by fixture. + has, err := f.db.HasWorkerClusters(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerClusters to return true") + secondCluster := WorkerCluster{ UUID: uuid.New(), Name: "arbeiderscluster", @@ -57,6 +62,10 @@ func TestFetchDeleteClusters(t *testing.T) { assert.Contains(t, allClusterIDs, f.cluster.UUID) assert.Contains(t, allClusterIDs, secondCluster.UUID) + has, err = f.db.HasWorkerClusters(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerClusters to return true") + // Test deleting the 2nd cluster. require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, secondCluster.UUID)) @@ -64,6 +73,12 @@ func TestFetchDeleteClusters(t *testing.T) { require.NoError(t, err) require.Len(t, allClusters, 1) assert.Equal(t, f.cluster.UUID, allClusters[0].UUID) + + // Test deleting the 1st cluster. + require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID)) + has, err = f.db.HasWorkerClusters(f.ctx) + require.NoError(t, err) + assert.False(t, has, "expecting HasWorkerClusters to return false") } func TestAssignUnassignWorkerClusters(t *testing.T) {