Manager: convert job blocklist queries from GORM to sqlc

Ref: #104305
This commit is contained in:
Sybren A. Stüvel 2024-09-18 10:33:40 +02:00
parent a015408486
commit 3a872370df
3 changed files with 167 additions and 38 deletions

View File

@ -93,42 +93,33 @@ func (db *DB) RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, t
// NOTE: this does NOT consider the task failure list, which blocks individual // NOTE: this does NOT consider the task failure list, which blocks individual
// workers from individual tasks. This is ONLY concerning the job blocklist. // workers from individual tasks. This is ONLY concerning the job blocklist.
func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (map[string]bool, error) { func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (map[string]bool, error) {
// Find the IDs of the workers blocked on this job + tasktype combo. queries := db.queries()
blockedWorkers := db.gormDB.
Table("workers as blocked_workers").
Select("blocked_workers.id").
Joins("inner join job_blocks JB on blocked_workers.id = JB.worker_id").
Where("JB.job_id = ?", job.ID).
Where("JB.task_type = ?", taskType)
query := db.gormDB.WithContext(ctx).
Model(&Worker{}).
Select("uuid").
Where("id not in (?)", blockedWorkers)
var (
workerUUIDs []string
err error
)
if job.WorkerTagID == nil { if job.WorkerTagID == nil {
// Count all workers, so no extra restrictions are necessary. workerUUIDs, err = queries.WorkersLeftToRun(ctx, sqlc.WorkersLeftToRunParams{
JobID: int64(job.ID),
TaskType: taskType,
})
} else { } else {
// Only count workers in the job's tag. workerUUIDs, err = queries.WorkersLeftToRunWithWorkerTag(ctx,
jobTag := db.gormDB. sqlc.WorkersLeftToRunWithWorkerTagParams{
Table("worker_tag_membership"). JobID: int64(job.ID),
Select("worker_id"). TaskType: taskType,
Where("worker_tag_id = ?", *job.WorkerTagID) WorkerTagID: int64(*job.WorkerTagID),
query = query. })
Where("id in (?)", jobTag) }
if err != nil {
return nil, err
} }
// Find the workers NOT blocked. // Construct a map of UUIDs.
workers := []*Worker{}
tx := query.Scan(&workers)
if tx.Error != nil {
return nil, tx.Error
}
// From the list of workers, construct the map of UUIDs.
uuidMap := map[string]bool{} uuidMap := map[string]bool{}
for _, worker := range workers { for _, uuid := range workerUUIDs {
uuidMap[worker.UUID] = true uuidMap[uuid] = true
} }
return uuidMap, nil return uuidMap, nil
@ -138,17 +129,16 @@ func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) (
func (db *DB) CountTaskFailuresOfWorker(ctx context.Context, job *Job, worker *Worker, taskType string) (int, error) { func (db *DB) CountTaskFailuresOfWorker(ctx context.Context, job *Job, worker *Worker, taskType string) (int, error) {
var numFailures int64 var numFailures int64
tx := db.gormDB.WithContext(ctx). queries := db.queries()
Model(&TaskFailure{}). numFailures, err := queries.CountTaskFailuresOfWorker(ctx, sqlc.CountTaskFailuresOfWorkerParams{
Joins("inner join tasks T on task_failures.task_id = T.id"). JobID: int64(job.ID),
Where("task_failures.worker_id = ?", worker.ID). WorkerID: int64(worker.ID),
Where("T.job_id = ?", job.ID). TaskType: taskType,
Where("T.type = ?", taskType). })
Count(&numFailures)
if numFailures > math.MaxInt { if numFailures > math.MaxInt {
panic("overflow error in number of failures") panic("overflow error in number of failures")
} }
return int(numFailures), tx.Error return int(numFailures), err
} }

View File

@ -269,3 +269,36 @@ WHERE
job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid) job_blocks.job_id in (SELECT jobs.id FROM jobs WHERE jobs.uuid=@jobuuid)
AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid) AND job_blocks.worker_id in (SELECT workers.id FROM workers WHERE workers.uuid=@workeruuid)
AND job_blocks.task_type = @task_type; AND job_blocks.task_type = @task_type;
-- name: WorkersLeftToRun :many
SELECT workers.uuid FROM workers
WHERE id NOT IN (
SELECT blocked_workers.id
FROM workers AS blocked_workers
INNER JOIN job_blocks JB on blocked_workers.id = JB.worker_id
WHERE
JB.job_id = @job_id
AND JB.task_type = @task_type
);
-- name: WorkersLeftToRunWithWorkerTag :many
SELECT workers.uuid
FROM workers
INNER JOIN worker_tag_membership WTM ON workers.id = WTM.worker_id
WHERE id NOT IN (
SELECT blocked_workers.id
FROM workers AS blocked_workers
INNER JOIN job_blocks JB ON blocked_workers.id = JB.worker_id
WHERE
JB.job_id = @job_id
AND JB.task_type = @task_type
)
AND WTM.worker_tag_id = @worker_tag_id;
-- name: CountTaskFailuresOfWorker :one
SELECT count(TF.task_id) FROM task_failures TF
INNER JOIN tasks T ON TF.task_id = T.id
WHERE
TF.worker_id = @worker_id
AND T.job_id = @job_id
AND T.type = @task_type;

View File

@ -84,6 +84,28 @@ func (q *Queries) ClearJobBlocklist(ctx context.Context, jobuuid string) error {
return err return err
} }
const countTaskFailuresOfWorker = `-- name: CountTaskFailuresOfWorker :one
SELECT count(TF.task_id) FROM task_failures TF
INNER JOIN tasks T ON TF.task_id = T.id
WHERE
TF.worker_id = ?1
AND T.job_id = ?2
AND T.type = ?3
`
type CountTaskFailuresOfWorkerParams struct {
WorkerID int64
JobID int64
TaskType string
}
func (q *Queries) CountTaskFailuresOfWorker(ctx context.Context, arg CountTaskFailuresOfWorkerParams) (int64, error) {
row := q.db.QueryRowContext(ctx, countTaskFailuresOfWorker, arg.WorkerID, arg.JobID, arg.TaskType)
var count int64
err := row.Scan(&count)
return count, err
}
const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one const countWorkersFailingTask = `-- name: CountWorkersFailingTask :one
SELECT count(*) as num_failed FROM task_failures SELECT count(*) as num_failed FROM task_failures
WHERE task_id=?1 WHERE task_id=?1
@ -1178,3 +1200,87 @@ func (q *Queries) UpdateTaskStatus(ctx context.Context, arg UpdateTaskStatusPara
_, err := q.db.ExecContext(ctx, updateTaskStatus, arg.UpdatedAt, arg.Status, arg.ID) _, err := q.db.ExecContext(ctx, updateTaskStatus, arg.UpdatedAt, arg.Status, arg.ID)
return err return err
} }
const workersLeftToRun = `-- name: WorkersLeftToRun :many
SELECT workers.uuid FROM workers
WHERE id NOT IN (
SELECT blocked_workers.id
FROM workers AS blocked_workers
INNER JOIN job_blocks JB on blocked_workers.id = JB.worker_id
WHERE
JB.job_id = ?1
AND JB.task_type = ?2
)
`
type WorkersLeftToRunParams struct {
JobID int64
TaskType string
}
func (q *Queries) WorkersLeftToRun(ctx context.Context, arg WorkersLeftToRunParams) ([]string, error) {
rows, err := q.db.QueryContext(ctx, workersLeftToRun, arg.JobID, arg.TaskType)
if err != nil {
return nil, err
}
defer rows.Close()
var items []string
for rows.Next() {
var uuid string
if err := rows.Scan(&uuid); err != nil {
return nil, err
}
items = append(items, uuid)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const workersLeftToRunWithWorkerTag = `-- name: WorkersLeftToRunWithWorkerTag :many
SELECT workers.uuid
FROM workers
INNER JOIN worker_tag_membership WTM ON workers.id = WTM.worker_id
WHERE id NOT IN (
SELECT blocked_workers.id
FROM workers AS blocked_workers
INNER JOIN job_blocks JB ON blocked_workers.id = JB.worker_id
WHERE
JB.job_id = ?1
AND JB.task_type = ?2
)
AND WTM.worker_tag_id = ?3
`
type WorkersLeftToRunWithWorkerTagParams struct {
JobID int64
TaskType string
WorkerTagID int64
}
func (q *Queries) WorkersLeftToRunWithWorkerTag(ctx context.Context, arg WorkersLeftToRunWithWorkerTagParams) ([]string, error) {
rows, err := q.db.QueryContext(ctx, workersLeftToRunWithWorkerTag, arg.JobID, arg.TaskType, arg.WorkerTagID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []string
for rows.Next() {
var uuid string
if err := rows.Scan(&uuid); err != nil {
return nil, err
}
items = append(items, uuid)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}