diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index bdcc1c63..7c74adc7 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -619,7 +619,7 @@ func convertSqlTaskWithJobAndWorker( if err != nil { return nil, taskError(err, "fetching worker assigned to task %s", task.UUID) } - gormWorker = convertSqlcWorker(sqlcWorker) + gormWorker = *convertSqlcWorker(sqlcWorker) } // Convert the Task. @@ -1051,8 +1051,7 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err workers := make([]*Worker, len(failureList)) for idx := range failureList { - worker := convertSqlcWorker(failureList[idx].Worker) - workers[idx] = &worker + workers[idx] = convertSqlcWorker(failureList[idx].Worker) } return workers, nil } diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index 761efa6a..444022a0 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -65,9 +65,7 @@ func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock entries[idx].TaskType = row.JobBlock.TaskType entries[idx].JobID = uint(row.JobBlock.JobID) entries[idx].WorkerID = uint(row.JobBlock.WorkerID) - - worker := convertSqlcWorker(row.Worker) - entries[idx].Worker = &worker + entries[idx].Worker = convertSqlcWorker(row.Worker) } return entries, nil diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 4a80d593..3928b737 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -317,3 +317,10 @@ WHERE jobs.uuid=@job_uuid; -- name: SummarizeJobStatuses :many SELECT status, count(id) as status_count FROM jobs GROUP BY status; + +-- name: FetchTimedOutTasks :many +SELECT * +FROM tasks +WHERE + status = @task_status +AND last_touched_at <= @untouched_since; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index edb0875c..80c98666 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -851,6 +851,56 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch return items, nil } +const fetchTimedOutTasks = `-- name: FetchTimedOutTasks :many +SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity +FROM tasks +WHERE + status = ?1 +AND last_touched_at <= ?2 +` + +type FetchTimedOutTasksParams struct { + TaskStatus string + UntouchedSince sql.NullTime +} + +func (q *Queries) FetchTimedOutTasks(ctx context.Context, arg FetchTimedOutTasksParams) ([]Task, error) { + rows, err := q.db.QueryContext(ctx, fetchTimedOutTasks, arg.TaskStatus, arg.UntouchedSince) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Task + for rows.Next() { + var i Task + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.Type, + &i.JobID, + &i.Priority, + &i.Status, + &i.WorkerID, + &i.LastTouchedAt, + &i.Commands, + &i.Activity, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getLastRenderedJobUUID = `-- name: GetLastRenderedJobUUID :one SELECT uuid FROM jobs INNER JOIN last_rendereds LR ON jobs.id = LR.job_id diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql index 15e7d62f..db537b23 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -152,3 +152,11 @@ WHERE id=@id; SELECT status, count(id) as status_count FROM workers WHERE deleted_at is NULL GROUP BY status; + +-- name: FetchTimedOutWorkers :many +SELECT * +FROM workers +WHERE + last_seen_at <= @last_seen_before +AND deleted_at IS NULL +AND status NOT IN (sqlc.slice('worker_statuses_no_timeout')); diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go index f24c065b..7d6012f3 100644 --- a/internal/manager/persistence/sqlc/query_workers.sql.go +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -182,6 +182,71 @@ func (q *Queries) FetchTagsOfWorker(ctx context.Context, uuid string) ([]WorkerT return items, nil } +const fetchTimedOutWorkers = `-- name: FetchTimedOutWorkers :many +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart +FROM workers +WHERE + last_seen_at <= ?1 +AND deleted_at IS NULL +AND status NOT IN (/*SLICE:worker_statuses_no_timeout*/?) +` + +type FetchTimedOutWorkersParams struct { + LastSeenBefore sql.NullTime + WorkerStatusesNoTimeout []string +} + +func (q *Queries) FetchTimedOutWorkers(ctx context.Context, arg FetchTimedOutWorkersParams) ([]Worker, error) { + query := fetchTimedOutWorkers + var queryParams []interface{} + queryParams = append(queryParams, arg.LastSeenBefore) + if len(arg.WorkerStatusesNoTimeout) > 0 { + for _, v := range arg.WorkerStatusesNoTimeout { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:worker_statuses_no_timeout*/?", strings.Repeat(",?", len(arg.WorkerStatusesNoTimeout))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:worker_statuses_no_timeout*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Worker + for rows.Next() { + var i Worker + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchWorker = `-- name: FetchWorker :one SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is NULL ` diff --git a/internal/manager/persistence/timeout.go b/internal/manager/persistence/timeout.go index ad2d074d..b149b0cd 100644 --- a/internal/manager/persistence/timeout.go +++ b/internal/manager/persistence/timeout.go @@ -4,8 +4,10 @@ package persistence import ( "context" + "database/sql" "time" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -26,38 +28,50 @@ var workerStatusNoTimeout = []api.WorkerStatus{ // // The returned tasks also have their `Job` and `Worker` fields set. func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*Task, error) { - result := []*Task{} - tx := db.gormDB.WithContext(ctx). - Model(&Task{}). - Joins("Job"). - Joins("Worker"). - Where("tasks.status = ?", api.TaskStatusActive). - Where("tasks.last_touched_at <= ?", untouchedSince). - Scan(&result) - if tx.Error != nil { - return nil, taskError(tx.Error, "finding timed out tasks (untouched since %s)", untouchedSince.String()) + queries := db.queries() + + sqlcTasks, err := queries.FetchTimedOutTasks(ctx, sqlc.FetchTimedOutTasksParams{ + TaskStatus: string(api.TaskStatusActive), + UntouchedSince: sql.NullTime{Time: untouchedSince, Valid: true}, + }) + + if err != nil { + return nil, taskError(err, "finding timed out tasks (untouched since %s)", untouchedSince.String()) } - // GORM apparently doesn't call the task's AfterFind() function for the above query. - for _, task := range result { - err := task.AfterFind(tx) + result := make([]*Task, len(sqlcTasks)) + for index, task := range sqlcTasks { + gormTask, err := convertSqlTaskWithJobAndWorker(ctx, queries, task) if err != nil { - return nil, taskError(tx.Error, "finding the job & worker UUIDs for task %s", task.UUID) + return nil, err } + result[index] = gormTask } return result, nil } func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) { - result := []*Worker{} - tx := db.gormDB.WithContext(ctx). - Model(&Worker{}). - Where("workers.status not in ?", workerStatusNoTimeout). - Where("workers.last_seen_at <= ?", lastSeenBefore). - Scan(&result) - if tx.Error != nil { - return nil, workerError(tx.Error, "finding timed out workers (last seen before %s)", lastSeenBefore.String()) + queries := db.queries() + + statuses := make([]string, len(workerStatusNoTimeout)) + for i, status := range workerStatusNoTimeout { + statuses[i] = string(status) + } + + sqlcWorkers, err := queries.FetchTimedOutWorkers(ctx, sqlc.FetchTimedOutWorkersParams{ + WorkerStatusesNoTimeout: statuses, + LastSeenBefore: sql.NullTime{ + Time: lastSeenBefore.UTC(), + Valid: true}, + }) + if err != nil { + return nil, workerError(err, "finding timed out workers (last seen before %s)", lastSeenBefore.String()) + } + + result := make([]*Worker, len(sqlcWorkers)) + for index := range sqlcWorkers { + result[index] = convertSqlcWorker(sqlcWorkers[index]) } return result, nil } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index 8305d799..7971e13f 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -82,7 +82,7 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { Software: w.Software, Status: string(w.Status), LastSeenAt: sql.NullTime{ - Time: w.LastSeenAt, + Time: w.LastSeenAt.UTC(), Valid: !w.LastSeenAt.IsZero(), }, StatusRequested: string(w.StatusRequested), @@ -133,7 +133,7 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { convertedWorker.Tags[index] = convertSqlcWorkerTag(workerTags[index]) } - return &convertedWorker, nil + return convertedWorker, nil } func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { @@ -171,8 +171,7 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) { gormWorkers := make([]*Worker, len(workers)) for idx := range workers { - worker := convertSqlcWorker(workers[idx].Worker) - gormWorkers[idx] = &worker + gormWorkers[idx] = convertSqlcWorker(workers[idx].Worker) } return gormWorkers, nil } @@ -309,8 +308,8 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e // expected by the rest of the code. This is mostly in place to aid in the GORM // to SQLC migration. It is intended that eventually the rest of the code will // use the same SQLC-generated model. -func convertSqlcWorker(worker sqlc.Worker) Worker { - return Worker{ +func convertSqlcWorker(worker sqlc.Worker) *Worker { + return &Worker{ Model: Model{ ID: uint(worker.ID), CreatedAt: worker.CreatedAt,