Add 'index in job' number to tasks

Number the tasks in a job, indicating their creation order. This gives the
web interface something to sort on that doesn't change on task updates.
This commit is contained in:
Sybren A. Stüvel 2024-11-09 23:05:16 +01:00
parent b3385394ae
commit 7f37c16a8d
13 changed files with 168 additions and 59 deletions

View File

@ -8,6 +8,7 @@ bugs in actually-released versions.
- Change the name of the add-on from "Flamenco 3" to just "Flamenco".
- Add `label` to job settings, to have full control over how they are presented in Blender's job submission GUI. If a job setting does not define a label, its `key` is used to generate one (like Flamenco 3.5 and older).
- Number the tasks in a job, indicating their creation order. This gives the web interface something to sort on that doesn't change on task updates.
- Add `shellSplit(someString)` function to the job compiler scripts. This splits a string into an array of strings using shell/CLI semantics.
- Make it possible to script job submissions in Blender, by executing the `bpy.ops.flamenco.submit_job(job_name="jobname")` operator.
- Security updates of some deendencies:

View File

@ -708,7 +708,8 @@ func taskDBtoAPI(dbTask *persistence.Task) api.Task {
// TODO: convert this to just store dbTask.WorkerUUID.
Worker: workerToTaskWorker(dbTask.Worker),
JobId: dbTask.JobUUID,
JobId: dbTask.JobUUID,
IndexInJob: dbTask.IndexInJob,
}
if dbTask.Job != nil {

View File

@ -154,11 +154,12 @@ func (f *Flamenco) FetchTask(e echo.Context, taskID string) error {
func taskDBtoSummary(task *persistence.Task) api.TaskSummary {
return api.TaskSummary{
Id: task.UUID,
Name: task.Name,
Priority: task.Priority,
Status: task.Status,
TaskType: task.Type,
Updated: task.UpdatedAt,
Id: task.UUID,
Name: task.Name,
IndexInJob: task.IndexInJob,
Priority: task.Priority,
Status: task.Status,
TaskType: task.Type,
Updated: task.UpdatedAt,
}
}

View File

@ -60,13 +60,14 @@ type Task struct {
Model
UUID string
Name string
Type string
JobID uint
Job *Job
JobUUID string // Fetched by SQLC, handled by GORM in Task.AfterFind()
Priority int
Status api.TaskStatus
Name string
Type string
JobID uint
Job *Job
JobUUID string // Fetched by SQLC, handled by GORM in Task.AfterFind()
IndexInJob int
Priority int
Status api.TaskStatus
// Which worker is/was working on this.
WorkerID *uint
@ -237,7 +238,7 @@ func (db *DB) storeAuthoredJobTaks(
now := db.now()
uuidToTask := make(map[string]TaskInfo)
for _, authoredTask := range authoredJob.Tasks {
for taskIndex, authoredTask := range authoredJob.Tasks {
// Marshal commands to JSON.
var commands []Command
for _, authoredCommand := range authoredTask.Commands {
@ -253,14 +254,15 @@ func (db *DB) storeAuthoredJobTaks(
}
taskParams := sqlc.CreateTaskParams{
CreatedAt: now,
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
JobID: jobID,
Priority: int64(authoredTask.Priority),
Status: string(api.TaskStatusQueued),
Commands: commandsJSON,
CreatedAt: now,
Name: authoredTask.Name,
Type: authoredTask.Type,
UUID: authoredTask.UUID,
JobID: jobID,
IndexInJob: int64(taskIndex + 1), // indexInJob is base-1.
Priority: int64(authoredTask.Priority),
Status: string(api.TaskStatusQueued),
Commands: commandsJSON,
// dependencies are stored below.
}
@ -1098,6 +1100,7 @@ func convertSqlcTask(task sqlc.Task, jobUUID string, workerUUID string) (*Task,
UUID: task.UUID,
Name: task.Name,
Type: task.Type,
IndexInJob: int(task.IndexInJob),
Priority: int(task.Priority),
Status: api.TaskStatus(task.Status),
LastTouchedAt: task.LastTouchedAt.Time,

View File

@ -29,12 +29,13 @@ func (db *DB) QueryJobTaskSummaries(ctx context.Context, jobUUID string) ([]*Tas
UpdatedAt: task.UpdatedAt.Time,
},
UUID: task.UUID,
Name: task.Name,
Type: task.Type,
Priority: int(task.Priority),
Status: api.TaskStatus(task.Status),
JobUUID: jobUUID,
UUID: task.UUID,
Name: task.Name,
Type: task.Type,
IndexInJob: int(task.IndexInJob),
Priority: int(task.Priority),
Status: api.TaskStatus(task.Status),
JobUUID: jobUUID,
}
gormTasks[index] = &gormTask
}

View File

@ -47,8 +47,9 @@ func TestQueryJobTaskSummaries(t *testing.T) {
require.NoError(t, err)
assert.Len(t, summaries, len(expectTaskUUIDs))
for _, summary := range summaries {
for index, summary := range summaries {
assert.True(t, expectTaskUUIDs[summary.UUID], "%q should be in %v", summary.UUID, expectTaskUUIDs)
assert.Equal(t, index+1, summary.IndexInJob)
}
}

View File

@ -0,0 +1,82 @@
-- Add sequence numbers to tasks, to indicate their creation order within their job.
--
-- +goose Up
CREATE TABLE temp_tasks (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
type varchar(32) DEFAULT '' NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
index_in_job integer DEFAULT 0 NOT NULL,
priority smallint DEFAULT 50 NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
worker_id integer,
last_touched_at datetime,
commands jsonb NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id),
UNIQUE(job_id, index_in_job),
CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL
);
INSERT INTO temp_tasks SELECT
id,
created_at,
updated_at,
uuid,
name,
type,
job_id,
ROW_NUMBER() OVER (
PARTITION BY job_id
ORDER BY rowid
),
priority,
status,
worker_id,
last_touched_at,
commands,
activity
FROM tasks;
DROP TABLE tasks;
ALTER TABLE temp_tasks RENAME TO tasks;
-- +goose Down
CREATE TABLE temp_tasks (
id integer NOT NULL,
created_at datetime NOT NULL,
updated_at datetime,
uuid varchar(36) UNIQUE DEFAULT '' NOT NULL,
name varchar(64) DEFAULT '' NOT NULL,
type varchar(32) DEFAULT '' NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
priority smallint DEFAULT 50 NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
worker_id integer,
last_touched_at datetime,
commands jsonb NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id),
CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL
);
INSERT INTO temp_tasks SELECT
id,
created_at,
updated_at,
uuid,
name,
type,
job_id,
priority,
status,
worker_id,
last_touched_at,
commands,
activity
FROM tasks;
DROP TABLE tasks;
ALTER TABLE temp_tasks RENAME TO tasks;

View File

@ -62,6 +62,7 @@ type Task struct {
Name string
Type string
JobID int64
IndexInJob int64
Priority int64
Status string
WorkerID sql.NullInt64

View File

@ -37,6 +37,7 @@ INSERT INTO tasks (
name,
type,
job_id,
index_in_job,
priority,
status,
commands
@ -47,6 +48,7 @@ INSERT INTO tasks (
@name,
@type,
@job_id,
@index_in_job,
@priority,
@status,
@commands
@ -313,7 +315,7 @@ AND T.type = @task_type;
-- name: QueryJobTaskSummaries :many
SELECT tasks.id, tasks.uuid, tasks.name, tasks.priority, tasks.status, tasks.type, tasks.updated_at
SELECT tasks.id, tasks.uuid, tasks.name, tasks.index_in_job, tasks.priority, tasks.status, tasks.type, tasks.updated_at
FROM tasks
LEFT JOIN jobs ON jobs.id = tasks.job_id
WHERE jobs.uuid=@job_uuid;

View File

@ -192,6 +192,7 @@ INSERT INTO tasks (
name,
type,
job_id,
index_in_job,
priority,
status,
commands
@ -204,19 +205,21 @@ INSERT INTO tasks (
?5,
?6,
?7,
?8
?8,
?9
)
`
type CreateTaskParams struct {
CreatedAt time.Time
UUID string
Name string
Type string
JobID int64
Priority int64
Status string
Commands json.RawMessage
CreatedAt time.Time
UUID string
Name string
Type string
JobID int64
IndexInJob int64
Priority int64
Status string
Commands json.RawMessage
}
func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (int64, error) {
@ -226,6 +229,7 @@ func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (int64,
arg.Name,
arg.Type,
arg.JobID,
arg.IndexInJob,
arg.Priority,
arg.Status,
arg.Commands,
@ -523,7 +527,7 @@ func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]J
}
const fetchTask = `-- name: FetchTask :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID, workers.UUID as workerUUID
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID, workers.UUID as workerUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
LEFT JOIN workers ON (tasks.worker_id = workers.id)
@ -547,6 +551,7 @@ func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, err
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -624,7 +629,7 @@ func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullSt
}
const fetchTasksOfJob = `-- name: FetchTasksOfJob :many
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
FROM tasks
LEFT JOIN workers ON (tasks.worker_id = workers.id)
WHERE tasks.job_id = ?1
@ -652,6 +657,7 @@ func (q *Queries) FetchTasksOfJob(ctx context.Context, jobID int64) ([]FetchTask
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -674,7 +680,7 @@ func (q *Queries) FetchTasksOfJob(ctx context.Context, jobID int64) ([]FetchTask
}
const fetchTasksOfJobInStatus = `-- name: FetchTasksOfJobInStatus :many
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, workers.UUID as workerUUID
FROM tasks
LEFT JOIN workers ON (tasks.worker_id = workers.id)
WHERE tasks.job_id = ?1
@ -719,6 +725,7 @@ func (q *Queries) FetchTasksOfJobInStatus(ctx context.Context, arg FetchTasksOfJ
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -741,7 +748,7 @@ func (q *Queries) FetchTasksOfJobInStatus(ctx context.Context, arg FetchTasksOfJ
}
const fetchTasksOfWorkerInStatus = `-- name: FetchTasksOfWorkerInStatus :many
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID
FROM tasks
LEFT JOIN jobs ON (tasks.job_id = jobs.id)
WHERE tasks.worker_id = ?1
@ -775,6 +782,7 @@ func (q *Queries) FetchTasksOfWorkerInStatus(ctx context.Context, arg FetchTasks
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -797,7 +805,7 @@ func (q *Queries) FetchTasksOfWorkerInStatus(ctx context.Context, arg FetchTasks
}
const fetchTasksOfWorkerInStatusOfJob = `-- name: FetchTasksOfWorkerInStatusOfJob :many
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
WHERE tasks.worker_id = ?1
AND tasks.job_id = ?2
@ -831,6 +839,7 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -852,7 +861,7 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
}
const fetchTimedOutTasks = `-- name: FetchTimedOutTasks :many
SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity
SELECT id, created_at, updated_at, uuid, name, type, job_id, index_in_job, priority, status, worker_id, last_touched_at, commands, activity
FROM tasks
WHERE
status = ?1
@ -881,6 +890,7 @@ func (q *Queries) FetchTimedOutTasks(ctx context.Context, arg FetchTimedOutTasks
&i.Name,
&i.Type,
&i.JobID,
&i.IndexInJob,
&i.Priority,
&i.Status,
&i.WorkerID,
@ -967,20 +977,21 @@ func (q *Queries) JobCountTasksInStatus(ctx context.Context, arg JobCountTasksIn
}
const queryJobTaskSummaries = `-- name: QueryJobTaskSummaries :many
SELECT tasks.id, tasks.uuid, tasks.name, tasks.priority, tasks.status, tasks.type, tasks.updated_at
SELECT tasks.id, tasks.uuid, tasks.name, tasks.index_in_job, tasks.priority, tasks.status, tasks.type, tasks.updated_at
FROM tasks
LEFT JOIN jobs ON jobs.id = tasks.job_id
WHERE jobs.uuid=?1
`
type QueryJobTaskSummariesRow struct {
ID int64
UUID string
Name string
Priority int64
Status string
Type string
UpdatedAt sql.NullTime
ID int64
UUID string
Name string
IndexInJob int64
Priority int64
Status string
Type string
UpdatedAt sql.NullTime
}
func (q *Queries) QueryJobTaskSummaries(ctx context.Context, jobUuid string) ([]QueryJobTaskSummariesRow, error) {
@ -996,6 +1007,7 @@ func (q *Queries) QueryJobTaskSummaries(ctx context.Context, jobUuid string) ([]
&i.ID,
&i.UUID,
&i.Name,
&i.IndexInJob,
&i.Priority,
&i.Status,
&i.Type,

View File

@ -29,7 +29,7 @@ func (q *Queries) AssignTaskToWorker(ctx context.Context, arg AssignTaskToWorker
}
const fetchAssignedAndRunnableTaskOfWorker = `-- name: FetchAssignedAndRunnableTaskOfWorker :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
WHERE tasks.status=?1
@ -72,6 +72,7 @@ func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -84,7 +85,7 @@ func (q *Queries) FetchAssignedAndRunnableTaskOfWorker(ctx context.Context, arg
const fetchWorkerTask = `-- name: FetchWorkerTask :one
SELECT
tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity,
tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity,
jobs.id, jobs.created_at, jobs.updated_at, jobs.uuid, jobs.name, jobs.job_type, jobs.priority, jobs.status, jobs.activity, jobs.settings, jobs.metadata, jobs.delete_requested_at, jobs.storage_shaman_checkout_id, jobs.worker_tag_id,
(tasks.status = ?1 AND jobs.status = ?2) as is_active
FROM tasks
@ -121,6 +122,7 @@ func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,
@ -147,7 +149,7 @@ func (q *Queries) FetchWorkerTask(ctx context.Context, arg FetchWorkerTaskParams
}
const findRunnableTask = `-- name: FindRunnableTask :one
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.index_in_job, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity
FROM tasks
INNER JOIN jobs ON tasks.job_id = jobs.id
LEFT JOIN task_failures TF ON tasks.id = TF.task_id AND TF.worker_id=?1
@ -244,6 +246,7 @@ func (q *Queries) FindRunnableTask(ctx context.Context, arg FindRunnableTaskPara
&i.Task.Name,
&i.Task.Type,
&i.Task.JobID,
&i.Task.IndexInJob,
&i.Task.Priority,
&i.Task.Status,
&i.Task.WorkerID,

View File

@ -70,6 +70,7 @@ CREATE TABLE tasks (
name varchar(64) DEFAULT '' NOT NULL,
type varchar(32) DEFAULT '' NOT NULL,
job_id integer DEFAULT 0 NOT NULL,
index_in_job integer DEFAULT 0 NOT NULL,
priority smallint DEFAULT 50 NOT NULL,
status varchar(16) DEFAULT '' NOT NULL,
worker_id integer,
@ -77,6 +78,7 @@ CREATE TABLE tasks (
commands jsonb NOT NULL,
activity varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY (id),
UNIQUE(job_id, index_in_job),
CONSTRAINT fk_tasks_job FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE,
CONSTRAINT fk_tasks_worker FOREIGN KEY (worker_id) REFERENCES workers(id) ON DELETE SET NULL
);

View File

@ -52,8 +52,7 @@ export default {
const options = {
// See pkg/api/flamenco-openapi.yaml, schemas Task and TaskUpdate.
columns: [
// Useful for debugging when there are many similar tasks:
// { title: "ID", field: "id", headerSort: false, formatter: (cell) => cell.getData().id.substr(0, 8), },
{ title: 'Num', field: 'index_in_job' },
{
title: 'Status',
field: 'status',