Slight change of worker cluster behaviour

Workers without cluster now only run jobs without cluster.
This commit is contained in:
Sybren A. Stüvel 2023-04-04 13:17:45 +02:00
parent d75962c817
commit 3724a8874e
4 changed files with 76 additions and 16 deletions

View File

@ -26,13 +26,18 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
logger := log.With().Str("worker", w.UUID).Logger()
logger.Trace().Msg("finding task for worker")
hasWorkerClusters, err := db.HasWorkerClusters(ctx)
if err != nil {
return nil, err
}
// Run two queries in one transaction:
// 1. find task, and
// 2. assign the task to the worker.
var task *Task
txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var err error
task, err = findTaskForWorker(tx, w)
task, err = findTaskForWorker(tx, w, hasWorkerClusters)
if err != nil {
if isDatabaseBusyError(err) {
logger.Trace().Err(err).Msg("database busy while finding task for worker")
@ -79,7 +84,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
return task, nil
}
func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, error) {
task := Task{}
// If a task is alreay active & assigned to this worker, return just that.
@ -124,9 +129,15 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
Where("TF.worker_id is NULL"). // Not failed before
Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted
if len(w.Clusters) > 0 {
// Worker is assigned to one or more clusters, so limit the available jobs
// to those that have no cluster, or overlap with the Worker's clusters.
if checkWorkerClusters {
// The system has one or more clusters, so limit the available jobs to those
// that have no cluster, or overlap with the Worker's clusters.
if len(w.Clusters) == 0 {
// Clusterless workers only get clusterless jobs.
findTaskQuery = findTaskQuery.
Where("jobs.worker_cluster_id is NULL")
} else {
// Clustered workers get clusterless jobs AND jobs of their own clusters.
clusterIDs := []uint{}
for _, cluster := range w.Clusters {
clusterIDs = append(clusterIDs, cluster.ID)
@ -134,6 +145,7 @@ func findTaskForWorker(tx *gorm.DB, w *Worker) (*Task, error) {
findTaskQuery = findTaskQuery.
Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs)
}
}
findTaskResult := findTaskQuery.
Order("jobs.priority desc"). // Highest job priority

View File

@ -301,17 +301,23 @@ func TestWorkerClusterJobWithCluster(t *testing.T) {
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2))
// Create a worker in cluster1:
w := linuxWorker(t, db, func(w *Worker) {
workerC := linuxWorker(t, db, func(w *Worker) {
w.Clusters = []*WorkerCluster{&cluster1}
})
// Create a worker without cluster:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
w.Clusters = nil
})
{ // Test job with different cluster:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask)
job.WorkerClusterUUID = cluster2.UUID
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &w)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
assert.Nil(t, task, "job with different cluster should not be scheduled")
}
@ -322,10 +328,14 @@ func TestWorkerClusterJobWithCluster(t *testing.T) {
job.WorkerClusterUUID = cluster1.UUID
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &w)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, task, "job with matching cluster should be scheduled")
assert.Equal(t, authTask.UUID, task.UUID)
task, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster")
}
}
@ -338,18 +348,29 @@ func TestWorkerClusterJobWithoutCluster(t *testing.T) {
require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1))
// Create a worker in cluster1:
w := linuxWorker(t, db, func(w *Worker) {
workerC := linuxWorker(t, db, func(w *Worker) {
w.Clusters = []*WorkerCluster{&cluster1}
})
// Create a worker without cluster:
workerNC := linuxWorker(t, db, func(w *Worker) {
w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e"
w.Clusters = nil
})
// Test cluster-less job:
authTask := authorTestTask("the task", "blender")
job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask)
constructTestJob(ctx, t, db, job)
task, err := db.ScheduleTask(ctx, &w)
task, err := db.ScheduleTask(ctx, &workerC)
require.NoError(t, err)
require.NotNil(t, task, "job without cluster should always be scheduled")
require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster")
assert.Equal(t, authTask.UUID, task.UUID)
task, err = db.ScheduleTask(ctx, &workerNC)
require.NoError(t, err)
require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster")
assert.Equal(t, authTask.UUID, task.UUID)
}

View File

@ -26,6 +26,18 @@ func (db *DB) CreateWorkerCluster(ctx context.Context, wc *WorkerCluster) error
return nil
}
// HasWorkerClusters returns whether there are any clusters defined at all.
func (db *DB) HasWorkerClusters(ctx context.Context) (bool, error) {
var count int64
tx := db.gormDB.WithContext(ctx).
Model(&WorkerCluster{}).
Count(&count)
if err := tx.Error; err != nil {
return false, workerClusterError(err, "counting worker clusters")
}
return count > 0, nil
}
func (db *DB) FetchWorkerCluster(ctx context.Context, uuid string) (*WorkerCluster, error) {
tx := db.gormDB.WithContext(ctx)
return fetchWorkerCluster(tx, uuid)

View File

@ -38,6 +38,11 @@ func TestFetchDeleteClusters(t *testing.T) {
f := workerTestFixtures(t, 1*time.Second)
defer f.done()
// Single cluster was created by fixture.
has, err := f.db.HasWorkerClusters(f.ctx)
require.NoError(t, err)
assert.True(t, has, "expecting HasWorkerClusters to return true")
secondCluster := WorkerCluster{
UUID: uuid.New(),
Name: "arbeiderscluster",
@ -57,6 +62,10 @@ func TestFetchDeleteClusters(t *testing.T) {
assert.Contains(t, allClusterIDs, f.cluster.UUID)
assert.Contains(t, allClusterIDs, secondCluster.UUID)
has, err = f.db.HasWorkerClusters(f.ctx)
require.NoError(t, err)
assert.True(t, has, "expecting HasWorkerClusters to return true")
// Test deleting the 2nd cluster.
require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, secondCluster.UUID))
@ -64,6 +73,12 @@ func TestFetchDeleteClusters(t *testing.T) {
require.NoError(t, err)
require.Len(t, allClusters, 1)
assert.Equal(t, f.cluster.UUID, allClusters[0].UUID)
// Test deleting the 1st cluster.
require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID))
has, err = f.db.HasWorkerClusters(f.ctx)
require.NoError(t, err)
assert.False(t, has, "expecting HasWorkerClusters to return false")
}
func TestAssignUnassignWorkerClusters(t *testing.T) {