flamenco/internal/manager/persistence/task_scheduler.go
Sybren A. Stüvel 8513e2fdc8 Manager: replace GORM 'now' function with our own implementation
GORM implicitly sets 'created at', 'updated at' and 'deleted at' timestamps
to 'now' by calling a 'now function'. This is now implemented by Flamenco
directly, instead of relying on GORM.

Ref: #104305
2024-09-26 22:38:17 +02:00

166 lines
4.7 KiB
Go

package persistence
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
"projects.blender.org/studio/flamenco/pkg/api"
)
var (
// Note that active tasks are not schedulable, because they're already dunning on some worker.
schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed}
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued}
// completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted}
)
// ScheduleTask finds a task to execute by the given worker.
// If no task is available, (nil, nil) is returned, as this is not an error situation.
// NOTE: this does not also fetch returnedTask.Worker, but returnedTask.WorkerID is set.
func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) {
logger := log.With().Str("worker", w.UUID).Logger()
logger.Trace().Msg("finding task for worker")
// Run all queries in a single transaction.
//
// After this point, all queries should use this transaction. Otherwise SQLite
// will deadlock, as it will make any other query wait until this transaction
// is done.
qtx, err := db.queriesWithTX()
if err != nil {
return nil, err
}
defer qtx.rollback()
task, err := db.scheduleTask(ctx, qtx.queries, w, logger)
if err != nil {
return nil, err
}
if task == nil {
// No task means no changes to the database.
// It's fine to just roll back the transaction.
return nil, nil
}
gormTask, err := convertSqlTaskWithJobAndWorker(ctx, qtx.queries, *task)
if err != nil {
return nil, err
}
if err := qtx.commit(); err != nil {
return nil, fmt.Errorf(
"could not commit database transaction after scheduling task %s for worker %s: %w",
task.UUID, w.UUID, err)
}
return gormTask, nil
}
func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker, logger zerolog.Logger) (*sqlc.Task, error) {
if w.ID == 0 {
panic("worker should be in database, but has zero ID")
}
workerID := sql.NullInt64{Int64: int64(w.ID), Valid: true}
// If a task is alreay active & assigned to this worker, return just that.
// Note that this task type could be blocklisted or no longer supported by the
// Worker, but since it's active that is unlikely.
{
row, err := queries.FetchAssignedAndRunnableTaskOfWorker(ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{
ActiveTaskStatus: string(api.TaskStatusActive),
ActiveJobStatuses: convertJobStatuses(schedulableJobStatuses),
WorkerID: workerID,
})
switch {
case errors.Is(err, sql.ErrNoRows):
// Fine, just means there was no task assigned yet.
case err != nil:
return nil, err
case row.Task.ID > 0:
return &row.Task, nil
}
}
task, err := findTaskForWorker(ctx, queries, w)
switch {
case errors.Is(err, sql.ErrNoRows):
// Fine, just means there was no task assigned yet.
return nil, nil
case isDatabaseBusyError(err):
logger.Trace().Err(err).Msg("database busy while finding task for worker")
return nil, errDatabaseBusy
case err != nil:
logger.Error().Err(err).Msg("finding task for worker")
return nil, fmt.Errorf("finding task for worker: %w", err)
}
// Assign the task to the worker.
err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{
WorkerID: workerID,
Now: db.nowNullable(),
TaskID: task.ID,
})
switch {
case isDatabaseBusyError(err):
logger.Trace().Err(err).Msg("database busy while assigning task to worker")
return nil, errDatabaseBusy
case err != nil:
logger.Warn().
Str("taskID", task.UUID).
Err(err).
Msg("assigning task to worker")
return nil, fmt.Errorf("assigning task to worker: %w", err)
}
// Make sure the returned task matches the database.
task.WorkerID = workerID
logger.Info().
Str("taskID", task.UUID).
Msg("assigned task to worker")
return &task, nil
}
func findTaskForWorker(
ctx context.Context,
queries *sqlc.Queries,
w *Worker,
) (sqlc.Task, error) {
// Construct the list of worker tags to check.
workerTags := make([]sql.NullInt64, len(w.Tags))
for index, tag := range w.Tags {
workerTags[index] = sql.NullInt64{Int64: int64(tag.ID), Valid: true}
}
row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{
WorkerID: int64(w.ID),
SchedulableTaskStatuses: convertTaskStatuses(schedulableTaskStatuses),
SchedulableJobStatuses: convertJobStatuses(schedulableJobStatuses),
SupportedTaskTypes: w.TaskTypes(),
TaskStatusCompleted: string(api.TaskStatusCompleted),
WorkerTags: workerTags,
})
if err != nil {
return sqlc.Task{}, err
}
if row.Task.ID == 0 {
return sqlc.Task{}, nil
}
return row.Task, nil
}