From 0502498dfac2bb5d6ee19552e95cf60cefdd4cc5 Mon Sep 17 00:00:00 2001 From: "Anish Bharadwaj (he)" Date: Mon, 24 Apr 2023 15:10:59 +0200 Subject: [PATCH] Fix #104201: Task Limit error in Flamenco Manager Insert tasks in batches so that the required SQL query stays within the limits of SQLite. No changes to the API, only to the persistence layer. Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104205 --- internal/manager/persistence/jobs.go | 18 ++++-- internal/manager/persistence/jobs_test.go | 55 +++++++++++++++++++ .../persistence/task_scheduler_test.go | 1 + 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index fa893f8d..0ce1ecf8 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -208,11 +208,19 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au } deps[i] = depTask } - - dbTask.Dependencies = deps - subQuery := tx.Model(dbTask).Updates(Task{Dependencies: deps}) - if subQuery.Error != nil { - return taskError(subQuery.Error, "unable to store dependencies of task %q", authoredTask.UUID) + dependenciesbatchsize := 1000 + for j := 0; j < len(deps); j += dependenciesbatchsize { + end := j + dependenciesbatchsize + if end > len(deps) { + end = len(deps) + } + currentDeps := deps[j:end] + dbTask.Dependencies = currentDeps + tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) + subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) + if subQuery.Error != nil { + return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) + } } } diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index e482fdec..6fb0cfd4 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -258,6 +258,21 @@ func TestCountTasksOfJobInStatus(t *testing.T) { assert.Equal(t, 3, numTotal) } +func TestCheckIfJobsHoldLargeNumOfTasks(t *testing.T) { + if testing.Short() { + t.Skip("Skipping test in short mode") + } + numtasks := 3500 + ctx, close, db, job, _ := jobTasksTestFixturesWithTaskNum(t, numtasks) + defer close() + + numQueued, numTotal, err := db.CountTasksOfJobInStatus(ctx, job, api.TaskStatusQueued) + assert.NoError(t, err) + assert.Equal(t, numtasks, numQueued) + assert.Equal(t, numtasks, numTotal) + +} + func TestFetchJobsInStatus(t *testing.T) { ctx, close, db, job1, _ := jobTasksTestFixtures(t) defer close() @@ -594,6 +609,36 @@ func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { return createTestAuthoredJob("263fd47e-b9f8-4637-b726-fd7e47ecfdae", task1, task2, task3) } +func createTestAuthoredJobWithNumTasks(numTasks int) job_compilers.AuthoredJob { + //Generates all of the render jobs + prevtasks := make([]*job_compilers.AuthoredTask, 0) + for i := 0; i < numTasks-1; i++ { + currtask := job_compilers.AuthoredTask{ + Name: "render-" + fmt.Sprintf("%d", i), + Type: "blender-render", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + } + prevtasks = append(prevtasks, &currtask) + } + // Generates the preview video command with Dependencies + videoJob := job_compilers.AuthoredTask{ + Name: "preview-video", + Type: "ffmpeg", + UUID: uuid.New(), + Commands: []job_compilers.AuthoredCommand{}, + Dependencies: prevtasks, + } + // convert pointers to values and generate job + taskvalues := make([]job_compilers.AuthoredTask, len(prevtasks)) + for i, ptr := range prevtasks { + taskvalues[i] = *ptr + } + taskvalues = append(taskvalues, videoJob) + return createTestAuthoredJob(uuid.New(), taskvalues...) + +} + func createTestAuthoredJob(jobID string, tasks ...job_compilers.AuthoredTask) job_compilers.AuthoredJob { job := job_compilers.AuthoredJob{ JobID: jobID, @@ -676,6 +721,16 @@ func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *D return ctx, cancel, db, dbJob, authoredJob } +// This created Test Jobs using the new function createTestAuthoredJobWithNumTasks so that you can set the number of tasks +func jobTasksTestFixturesWithTaskNum(t *testing.T, numtasks int) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) { + ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeoutlong) + + authoredJob := createTestAuthoredJobWithNumTasks(numtasks) + dbJob := persistAuthoredJob(t, ctx, db, authoredJob) + + return ctx, cancel, db, dbJob, authoredJob +} + func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*Worker)) *Worker { w := Worker{ UUID: "f0a123a9-ab05-4ce2-8577-94802cfe74a4", diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index c0acfc95..90fe0b8f 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -16,6 +16,7 @@ import ( ) const schedulerTestTimeout = 100 * time.Millisecond +const schedulerTestTimeoutlong = 5000 * time.Millisecond func TestNoTasks(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout)