
Convert the task scheduler from gorm to sqlc. This makes the query considerably easier to read. No functional changes intended.
178 lines
5.2 KiB
Go
178 lines
5.2 KiB
Go
package persistence
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"gorm.io/gorm"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
var (
|
|
// Note that active tasks are not schedulable, because they're already dunning on some worker.
|
|
schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed}
|
|
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued}
|
|
// completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted}
|
|
)
|
|
|
|
// ScheduleTask finds a task to execute by the given worker.
|
|
// If no task is available, (nil, nil) is returned, as this is not an error situation.
|
|
// NOTE: this does not also fetch returnedTask.Worker, but returnedTask.WorkerID is set.
|
|
func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
|
|
logger := log.With().Str("worker", w.UUID).Logger()
|
|
logger.Trace().Msg("finding task for worker")
|
|
|
|
// Run all queries in a single transaction.
|
|
//
|
|
// After this point, all queries should use this transaction. Otherwise SQLite
|
|
// will deadlock, as it will make any other query wait until this transaction
|
|
// is done.
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer qtx.rollback()
|
|
|
|
task, err := db.scheduleTask(ctx, qtx.queries, w, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if task == nil {
|
|
// No task means no changes to the database.
|
|
// It's fine to just roll back the transaction.
|
|
return nil, nil
|
|
}
|
|
|
|
gormTask, err := convertSqlTaskWithJobAndWorker(ctx, qtx.queries, *task)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := qtx.commit(); err != nil {
|
|
return nil, fmt.Errorf(
|
|
"could not commit database transaction after scheduling task %s for worker %s: %w",
|
|
task.UUID, w.UUID, err)
|
|
}
|
|
|
|
return gormTask, nil
|
|
}
|
|
|
|
func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker, logger zerolog.Logger) (*sqlc.Task, error) {
|
|
if w.ID == 0 {
|
|
panic("worker should be in database, but has zero ID")
|
|
}
|
|
workerID := sql.NullInt64{Int64: int64(w.ID), Valid: true}
|
|
|
|
// If a task is alreay active & assigned to this worker, return just that.
|
|
// Note that this task type could be blocklisted or no longer supported by the
|
|
// Worker, but since it's active that is unlikely.
|
|
{
|
|
row, err := queries.FetchAssignedAndRunnableTaskOfWorker(ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{
|
|
ActiveTaskStatus: string(api.TaskStatusActive),
|
|
ActiveJobStatuses: convertJobStatuses(schedulableJobStatuses),
|
|
WorkerID: workerID,
|
|
})
|
|
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
// Fine, just means there was no task assigned yet.
|
|
case err != nil:
|
|
return nil, err
|
|
case row.Task.ID > 0:
|
|
return &row.Task, nil
|
|
}
|
|
}
|
|
|
|
task, err := findTaskForWorker(ctx, queries, w)
|
|
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
// Fine, just means there was no task assigned yet.
|
|
return nil, nil
|
|
case isDatabaseBusyError(err):
|
|
logger.Trace().Err(err).Msg("database busy while finding task for worker")
|
|
return nil, errDatabaseBusy
|
|
case err != nil:
|
|
logger.Error().Err(err).Msg("finding task for worker")
|
|
return nil, fmt.Errorf("finding task for worker: %w", err)
|
|
}
|
|
|
|
// Assign the task to the worker.
|
|
err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{
|
|
WorkerID: workerID,
|
|
Now: db.now(),
|
|
TaskID: task.ID,
|
|
})
|
|
|
|
switch {
|
|
case isDatabaseBusyError(err):
|
|
logger.Trace().Err(err).Msg("database busy while assigning task to worker")
|
|
return nil, errDatabaseBusy
|
|
case err != nil:
|
|
logger.Warn().
|
|
Str("taskID", task.UUID).
|
|
Err(err).
|
|
Msg("assigning task to worker")
|
|
return nil, fmt.Errorf("assigning task to worker: %w", err)
|
|
}
|
|
|
|
// Make sure the returned task matches the database.
|
|
task.WorkerID = workerID
|
|
|
|
logger.Info().
|
|
Str("taskID", task.UUID).
|
|
Msg("assigned task to worker")
|
|
|
|
return &task, nil
|
|
}
|
|
|
|
func findTaskForWorker(
|
|
ctx context.Context,
|
|
queries *sqlc.Queries,
|
|
w *Worker,
|
|
) (sqlc.Task, error) {
|
|
|
|
// Construct the list of worker tags to check.
|
|
workerTags := make([]sql.NullInt64, len(w.Tags))
|
|
for index, tag := range w.Tags {
|
|
workerTags[index] = sql.NullInt64{Int64: int64(tag.ID), Valid: true}
|
|
}
|
|
|
|
row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{
|
|
WorkerID: int64(w.ID),
|
|
SchedulableTaskStatuses: convertTaskStatuses(schedulableTaskStatuses),
|
|
SchedulableJobStatuses: convertJobStatuses(schedulableJobStatuses),
|
|
SupportedTaskTypes: w.TaskTypes(),
|
|
TaskStatusCompleted: string(api.TaskStatusCompleted),
|
|
WorkerTags: workerTags,
|
|
})
|
|
if err != nil {
|
|
return sqlc.Task{}, err
|
|
}
|
|
if row.Task.ID == 0 {
|
|
return sqlc.Task{}, nil
|
|
}
|
|
return row.Task, nil
|
|
}
|
|
|
|
// taskAssignedAndRunnableQuery appends some GORM clauses to query for a task
|
|
// that's already assigned to this worker, and is in a runnable state.
|
|
func taskAssignedAndRunnableQuery(tx *gorm.DB, w *Worker) *gorm.DB {
|
|
return tx.
|
|
Joins("left join jobs on tasks.job_id = jobs.id").
|
|
Where("tasks.status = ?", api.TaskStatusActive).
|
|
Where("jobs.status in ?", schedulableJobStatuses).
|
|
Where("tasks.worker_id = ?", w.ID). // assigned to this worker
|
|
Limit(1)
|
|
}
|