diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 0be9a613..58bc1859 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -14,7 +14,6 @@ import ( "github.com/rs/zerolog/log" "gorm.io/gorm" - "gorm.io/gorm/clause" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" @@ -903,64 +902,72 @@ func (db *DB) TaskTouchedByWorker(ctx context.Context, t *Task) error { // // Returns the new number of workers that failed this task. func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) (numFailed int, err error) { - entry := TaskFailure{ - Task: t, - Worker: w, - } - tx := db.gormDB.WithContext(ctx). - Clauses(clause.OnConflict{DoNothing: true}). - Create(&entry) - if tx.Error != nil { - return 0, tx.Error + queries, err := db.queries() + if err != nil { + return 0, err } - var numFailed64 int64 - tx = db.gormDB.WithContext(ctx).Model(&TaskFailure{}). - Where("task_id=?", t.ID). - Count(&numFailed64) + err = queries.AddWorkerToTaskFailedList(ctx, sqlc.AddWorkerToTaskFailedListParams{ + CreatedAt: db.now().Time, + TaskID: int64(t.ID), + WorkerID: int64(w.ID), + }) + if err != nil { + return 0, err + } + + numFailed64, err := queries.CountWorkersFailingTask(ctx, int64(t.ID)) + if err != nil { + return 0, err + } // Integer literals are of type `int`, so that's just a bit nicer to work with // than `int64`. if numFailed64 > math.MaxInt32 { log.Warn().Int64("numFailed", numFailed64).Msg("number of failed workers is crazy high, something is wrong here") - return math.MaxInt32, tx.Error + return math.MaxInt32, nil } - return int(numFailed64), tx.Error + return int(numFailed64), nil } // ClearFailureListOfTask clears the list of workers that failed this task. func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error { - tx := db.gormDB.WithContext(ctx). - Where("task_id = ?", t.ID). - Delete(&TaskFailure{}) - return tx.Error + queries, err := db.queries() + if err != nil { + return err + } + + return queries.ClearFailureListOfTask(ctx, int64(t.ID)) } // ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of // workers that failed those tasks. func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error { + queries, err := db.queries() + if err != nil { + return err + } - // SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead. - jobTasksQuery := db.gormDB.Model(&Task{}). - Select("id"). - Where("job_id = ?", j.ID) - - tx := db.gormDB.WithContext(ctx). - Where("task_id in (?)", jobTasksQuery). - Delete(&TaskFailure{}) - return tx.Error + return queries.ClearFailureListOfJob(ctx, int64(j.ID)) } func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, error) { - var workers []*Worker + queries, err := db.queries() + if err != nil { + return nil, err + } - tx := db.gormDB.WithContext(ctx). - Model(&Worker{}). - Joins("inner join task_failures TF on TF.worker_id = workers.id"). - Where("TF.task_id = ?", t.ID). - Scan(&workers) + failureList, err := queries.FetchTaskFailureList(ctx, int64(t.ID)) + if err != nil { + return nil, err + } - return workers, tx.Error + workers := make([]*Worker, len(failureList)) + for idx := range failureList { + worker := convertSqlcWorker(failureList[idx].Worker) + workers[idx] = &worker + } + return workers, nil } // convertSqlcJob converts a job from the SQLC-generated model to the model diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 5547ffee..b8d79770 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -164,3 +164,26 @@ WHERE job_id = @job_id AND status = @task_status; SELECT status, count(*) as num_tasks FROM tasks WHERE job_id = @job_id GROUP BY status; + +-- name: AddWorkerToTaskFailedList :exec +INSERT INTO task_failures (created_at, task_id, worker_id) +VALUES (@created_at, @task_id, @worker_id) +ON CONFLICT DO NOTHING; + +-- name: CountWorkersFailingTask :one +-- Count how many workers have failed a given task. +SELECT count(*) as num_failed FROM task_failures +WHERE task_id=@task_id; + +-- name: ClearFailureListOfTask :exec +DELETE FROM task_failures WHERE task_id=@task_id; + +-- name: ClearFailureListOfJob :exec +-- SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead. +DELETE FROM task_failures +WHERE task_id in (SELECT id FROM tasks WHERE job_id=@job_id); + +-- name: FetchTaskFailureList :many +SELECT sqlc.embed(workers) FROM workers +INNER JOIN task_failures TF on TF.worker_id=workers.id +WHERE TF.task_id=@task_id; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 85616d77..a5cef021 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -13,6 +13,56 @@ import ( "time" ) +const addWorkerToTaskFailedList = `-- name: AddWorkerToTaskFailedList :exec +INSERT INTO task_failures (created_at, task_id, worker_id) +VALUES (?1, ?2, ?3) +ON CONFLICT DO NOTHING +` + +type AddWorkerToTaskFailedListParams struct { + CreatedAt time.Time + TaskID int64 + WorkerID int64 +} + +func (q *Queries) AddWorkerToTaskFailedList(ctx context.Context, arg AddWorkerToTaskFailedListParams) error { + _, err := q.db.ExecContext(ctx, addWorkerToTaskFailedList, arg.CreatedAt, arg.TaskID, arg.WorkerID) + return err +} + +const clearFailureListOfJob = `-- name: ClearFailureListOfJob :exec +DELETE FROM task_failures +WHERE task_id in (SELECT id FROM tasks WHERE job_id=?1) +` + +// SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead. +func (q *Queries) ClearFailureListOfJob(ctx context.Context, jobID int64) error { + _, err := q.db.ExecContext(ctx, clearFailureListOfJob, jobID) + return err +} + +const clearFailureListOfTask = `-- name: ClearFailureListOfTask :exec +DELETE FROM task_failures WHERE task_id=?1 +` + +func (q *Queries) ClearFailureListOfTask(ctx context.Context, taskID int64) error { + _, err := q.db.ExecContext(ctx, clearFailureListOfTask, taskID) + return err +} + +const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one +SELECT count(*) as num_failed FROM task_failures +WHERE task_id=?1 +` + +// Count how many workers have failed a given task. +func (q *Queries) CountWorkersFailingTask(ctx context.Context, taskID int64) (int64, error) { + row := q.db.QueryRowContext(ctx, countWorkersFailingTask, taskID) + var num_failed int64 + err := row.Scan(&num_failed) + return num_failed, err +} + const createJob = `-- name: CreateJob :exec INSERT INTO jobs ( @@ -271,6 +321,56 @@ func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, err return i, err } +const fetchTaskFailureList = `-- name: FetchTaskFailureList :many +SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers +INNER JOIN task_failures TF on TF.worker_id = workers.id +WHERE TF.task_id=?1 +` + +type FetchTaskFailureListRow struct { + Worker Worker +} + +func (q *Queries) FetchTaskFailureList(ctx context.Context, taskID int64) ([]FetchTaskFailureListRow, error) { + rows, err := q.db.QueryContext(ctx, fetchTaskFailureList, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FetchTaskFailureListRow + for rows.Next() { + var i FetchTaskFailureListRow + if err := rows.Scan( + &i.Worker.ID, + &i.Worker.CreatedAt, + &i.Worker.UpdatedAt, + &i.Worker.UUID, + &i.Worker.Secret, + &i.Worker.Name, + &i.Worker.Address, + &i.Worker.Platform, + &i.Worker.Software, + &i.Worker.Status, + &i.Worker.LastSeenAt, + &i.Worker.StatusRequested, + &i.Worker.LazyStatusRequest, + &i.Worker.SupportedTaskTypes, + &i.Worker.DeletedAt, + &i.Worker.CanRestart, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one SELECT jobs.UUID as jobUUID FROM tasks