Manager: convert timeout checks to sqlc

Ref: #104305
This commit is contained in:
Sybren A. Stüvel 2024-09-18 20:39:03 +02:00
parent ebf1693a7c
commit 777a417cc0
8 changed files with 174 additions and 34 deletions

View File

@ -619,7 +619,7 @@ func convertSqlTaskWithJobAndWorker(
if err != nil {
return nil, taskError(err, "fetching worker assigned to task %s", task.UUID)
}
gormWorker = convertSqlcWorker(sqlcWorker)
gormWorker = *convertSqlcWorker(sqlcWorker)
}
// Convert the Task.
@ -1051,8 +1051,7 @@ func (db *DB) FetchTaskFailureList(ctx context.Context, t *Task) ([]*Worker, err
workers := make([]*Worker, len(failureList))
for idx := range failureList {
worker := convertSqlcWorker(failureList[idx].Worker)
workers[idx] = &worker
workers[idx] = convertSqlcWorker(failureList[idx].Worker)
}
return workers, nil
}

View File

@ -65,9 +65,7 @@ func (db *DB) FetchJobBlocklist(ctx context.Context, jobUUID string) ([]JobBlock
entries[idx].TaskType = row.JobBlock.TaskType
entries[idx].JobID = uint(row.JobBlock.JobID)
entries[idx].WorkerID = uint(row.JobBlock.WorkerID)
worker := convertSqlcWorker(row.Worker)
entries[idx].Worker = &worker
entries[idx].Worker = convertSqlcWorker(row.Worker)
}
return entries, nil

View File

@ -317,3 +317,10 @@ WHERE jobs.uuid=@job_uuid;
-- name: SummarizeJobStatuses :many
SELECT status, count(id) as status_count FROM jobs
GROUP BY status;
-- name: FetchTimedOutTasks :many
SELECT *
FROM tasks
WHERE
status = @task_status
AND last_touched_at <= @untouched_since;

View File

@ -851,6 +851,56 @@ func (q *Queries) FetchTasksOfWorkerInStatusOfJob(ctx context.Context, arg Fetch
return items, nil
}
const fetchTimedOutTasks = `-- name: FetchTimedOutTasks :many
SELECT id, created_at, updated_at, uuid, name, type, job_id, priority, status, worker_id, last_touched_at, commands, activity
FROM tasks
WHERE
status = ?1
AND last_touched_at <= ?2
`
type FetchTimedOutTasksParams struct {
TaskStatus string
UntouchedSince sql.NullTime
}
func (q *Queries) FetchTimedOutTasks(ctx context.Context, arg FetchTimedOutTasksParams) ([]Task, error) {
rows, err := q.db.QueryContext(ctx, fetchTimedOutTasks, arg.TaskStatus, arg.UntouchedSince)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Task
for rows.Next() {
var i Task
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.UUID,
&i.Name,
&i.Type,
&i.JobID,
&i.Priority,
&i.Status,
&i.WorkerID,
&i.LastTouchedAt,
&i.Commands,
&i.Activity,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getLastRenderedJobUUID = `-- name: GetLastRenderedJobUUID :one
SELECT uuid FROM jobs
INNER JOIN last_rendereds LR ON jobs.id = LR.job_id

View File

@ -152,3 +152,11 @@ WHERE id=@id;
SELECT status, count(id) as status_count FROM workers
WHERE deleted_at is NULL
GROUP BY status;
-- name: FetchTimedOutWorkers :many
SELECT *
FROM workers
WHERE
last_seen_at <= @last_seen_before
AND deleted_at IS NULL
AND status NOT IN (sqlc.slice('worker_statuses_no_timeout'));

View File

@ -182,6 +182,71 @@ func (q *Queries) FetchTagsOfWorker(ctx context.Context, uuid string) ([]WorkerT
return items, nil
}
const fetchTimedOutWorkers = `-- name: FetchTimedOutWorkers :many
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart
FROM workers
WHERE
last_seen_at <= ?1
AND deleted_at IS NULL
AND status NOT IN (/*SLICE:worker_statuses_no_timeout*/?)
`
type FetchTimedOutWorkersParams struct {
LastSeenBefore sql.NullTime
WorkerStatusesNoTimeout []string
}
func (q *Queries) FetchTimedOutWorkers(ctx context.Context, arg FetchTimedOutWorkersParams) ([]Worker, error) {
query := fetchTimedOutWorkers
var queryParams []interface{}
queryParams = append(queryParams, arg.LastSeenBefore)
if len(arg.WorkerStatusesNoTimeout) > 0 {
for _, v := range arg.WorkerStatusesNoTimeout {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:worker_statuses_no_timeout*/?", strings.Repeat(",?", len(arg.WorkerStatusesNoTimeout))[1:], 1)
} else {
query = strings.Replace(query, "/*SLICE:worker_statuses_no_timeout*/?", "NULL", 1)
}
rows, err := q.db.QueryContext(ctx, query, queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Worker
for rows.Next() {
var i Worker
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.UUID,
&i.Secret,
&i.Name,
&i.Address,
&i.Platform,
&i.Software,
&i.Status,
&i.LastSeenAt,
&i.StatusRequested,
&i.LazyStatusRequest,
&i.SupportedTaskTypes,
&i.DeletedAt,
&i.CanRestart,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const fetchWorker = `-- name: FetchWorker :one
SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is NULL
`

View File

@ -4,8 +4,10 @@ package persistence
import (
"context"
"database/sql"
"time"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/pkg/api"
)
@ -26,38 +28,50 @@ var workerStatusNoTimeout = []api.WorkerStatus{
//
// The returned tasks also have their `Job` and `Worker` fields set.
func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*Task, error) {
result := []*Task{}
tx := db.gormDB.WithContext(ctx).
Model(&Task{}).
Joins("Job").
Joins("Worker").
Where("tasks.status = ?", api.TaskStatusActive).
Where("tasks.last_touched_at <= ?", untouchedSince).
Scan(&result)
if tx.Error != nil {
return nil, taskError(tx.Error, "finding timed out tasks (untouched since %s)", untouchedSince.String())
queries := db.queries()
sqlcTasks, err := queries.FetchTimedOutTasks(ctx, sqlc.FetchTimedOutTasksParams{
TaskStatus: string(api.TaskStatusActive),
UntouchedSince: sql.NullTime{Time: untouchedSince, Valid: true},
})
if err != nil {
return nil, taskError(err, "finding timed out tasks (untouched since %s)", untouchedSince.String())
}
// GORM apparently doesn't call the task's AfterFind() function for the above query.
for _, task := range result {
err := task.AfterFind(tx)
result := make([]*Task, len(sqlcTasks))
for index, task := range sqlcTasks {
gormTask, err := convertSqlTaskWithJobAndWorker(ctx, queries, task)
if err != nil {
return nil, taskError(tx.Error, "finding the job & worker UUIDs for task %s", task.UUID)
return nil, err
}
result[index] = gormTask
}
return result, nil
}
func (db *DB) FetchTimedOutWorkers(ctx context.Context, lastSeenBefore time.Time) ([]*Worker, error) {
result := []*Worker{}
tx := db.gormDB.WithContext(ctx).
Model(&Worker{}).
Where("workers.status not in ?", workerStatusNoTimeout).
Where("workers.last_seen_at <= ?", lastSeenBefore).
Scan(&result)
if tx.Error != nil {
return nil, workerError(tx.Error, "finding timed out workers (last seen before %s)", lastSeenBefore.String())
queries := db.queries()
statuses := make([]string, len(workerStatusNoTimeout))
for i, status := range workerStatusNoTimeout {
statuses[i] = string(status)
}
sqlcWorkers, err := queries.FetchTimedOutWorkers(ctx, sqlc.FetchTimedOutWorkersParams{
WorkerStatusesNoTimeout: statuses,
LastSeenBefore: sql.NullTime{
Time: lastSeenBefore.UTC(),
Valid: true},
})
if err != nil {
return nil, workerError(err, "finding timed out workers (last seen before %s)", lastSeenBefore.String())
}
result := make([]*Worker, len(sqlcWorkers))
for index := range sqlcWorkers {
result[index] = convertSqlcWorker(sqlcWorkers[index])
}
return result, nil
}

View File

@ -82,7 +82,7 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error {
Software: w.Software,
Status: string(w.Status),
LastSeenAt: sql.NullTime{
Time: w.LastSeenAt,
Time: w.LastSeenAt.UTC(),
Valid: !w.LastSeenAt.IsZero(),
},
StatusRequested: string(w.StatusRequested),
@ -133,7 +133,7 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) {
convertedWorker.Tags[index] = convertSqlcWorkerTag(workerTags[index])
}
return &convertedWorker, nil
return convertedWorker, nil
}
func (db *DB) DeleteWorker(ctx context.Context, uuid string) error {
@ -171,8 +171,7 @@ func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
gormWorkers := make([]*Worker, len(workers))
for idx := range workers {
worker := convertSqlcWorker(workers[idx].Worker)
gormWorkers[idx] = &worker
gormWorkers[idx] = convertSqlcWorker(workers[idx].Worker)
}
return gormWorkers, nil
}
@ -309,8 +308,8 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e
// expected by the rest of the code. This is mostly in place to aid in the GORM
// to SQLC migration. It is intended that eventually the rest of the code will
// use the same SQLC-generated model.
func convertSqlcWorker(worker sqlc.Worker) Worker {
return Worker{
func convertSqlcWorker(worker sqlc.Worker) *Worker {
return &Worker{
Model: Model{
ID: uint(worker.ID),
CreatedAt: worker.CreatedAt,