
Replace old used-to-be-GORM datastructures (#104305) with sqlc-generated structs. This also makes it possible to use more specific structs that are more taylored to the specific queries, increasing efficiency. This commit deals with the remaining areas, like the job deleter, task timeout checker, and task state machine. And anything else to get things running again. Functional changes are kept to a minimum, as the API still serves the same data. Because this work covers so much of Flamenco's code, it's been split up into different commits. Each commit brings Flamenco to a state where it compiles and unit tests pass. Only the result of the final commit has actually been tested properly. Ref: #104343
194 lines
5.6 KiB
Go
194 lines
5.6 KiB
Go
package persistence
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
"projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc"
|
|
"projects.blender.org/studio/flamenco/pkg/api"
|
|
)
|
|
|
|
var (
|
|
// Note that active tasks are not schedulable, because they're already dunning on some worker.
|
|
schedulableTaskStatuses = []api.TaskStatus{api.TaskStatusQueued, api.TaskStatusSoftFailed}
|
|
schedulableJobStatuses = []api.JobStatus{api.JobStatusActive, api.JobStatusQueued}
|
|
// completedTaskStatuses = []api.TaskStatus{api.TaskStatusCompleted}
|
|
)
|
|
|
|
// ScheduledTask contains a Task and some info about its job.
|
|
//
|
|
// This structure is returned from different points in the code below, and
|
|
// filled from different sqlc-generated structs. That's why it has to be an
|
|
// explicit struct here, rather than an alias for some sqlc struct.
|
|
type ScheduledTask struct {
|
|
Task Task
|
|
JobUUID string
|
|
JobPriority int64
|
|
JobType string
|
|
}
|
|
|
|
// ScheduleTask finds a task to execute by the given worker.
|
|
// If no task is available, (nil, nil) is returned, as this is not an error situation.
|
|
// NOTE: this does not also fetch returnedTask.Worker, but returnedTask.WorkerID is set.
|
|
func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*ScheduledTask, error) {
|
|
logger := log.With().Str("worker", w.UUID).Logger()
|
|
logger.Trace().Msg("finding task for worker")
|
|
|
|
// Run all queries in a single transaction.
|
|
//
|
|
// After this point, all queries should use this transaction. Otherwise SQLite
|
|
// will deadlock, as it will make any other query wait until this transaction
|
|
// is done.
|
|
qtx, err := db.queriesWithTX()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer qtx.rollback()
|
|
|
|
scheduledTask, err := db.scheduleTask(ctx, qtx.queries, w, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if scheduledTask == nil {
|
|
// No task means no changes to the database.
|
|
// It's fine to just roll back the transaction.
|
|
return nil, nil
|
|
}
|
|
|
|
if err := qtx.commit(); err != nil {
|
|
return nil, fmt.Errorf(
|
|
"could not commit database transaction after scheduling task %s for worker %s: %w",
|
|
scheduledTask.Task.UUID, w.UUID, err)
|
|
}
|
|
|
|
return scheduledTask, nil
|
|
}
|
|
|
|
func (db *DB) scheduleTask(ctx context.Context, queries *sqlc.Queries, w *Worker, logger zerolog.Logger) (*ScheduledTask, error) {
|
|
if w.ID == 0 {
|
|
panic("worker should be in database, but has zero ID")
|
|
}
|
|
workerID := sql.NullInt64{Int64: int64(w.ID), Valid: true}
|
|
|
|
// If a task is alreay active & assigned to this worker, return just that.
|
|
// Note that this task type could be blocklisted or no longer supported by the
|
|
// Worker, but since it's active that is unlikely.
|
|
{
|
|
row, err := queries.FetchAssignedAndRunnableTaskOfWorker(
|
|
ctx, sqlc.FetchAssignedAndRunnableTaskOfWorkerParams{
|
|
ActiveTaskStatus: api.TaskStatusActive,
|
|
ActiveJobStatuses: schedulableJobStatuses,
|
|
WorkerID: workerID,
|
|
})
|
|
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
// Fine, just means there was no task assigned yet.
|
|
case err != nil:
|
|
return nil, err
|
|
case row.Task.ID > 0:
|
|
// Task was previously assigned, just go for it again.
|
|
scheduledTask := ScheduledTask{
|
|
Task: row.Task,
|
|
JobUUID: row.JobUUID,
|
|
JobPriority: row.JobPriority,
|
|
JobType: row.JobType,
|
|
}
|
|
return &scheduledTask, nil
|
|
}
|
|
}
|
|
|
|
scheduledTask, err := findTaskForWorker(ctx, queries, w)
|
|
|
|
switch {
|
|
case errors.Is(err, sql.ErrNoRows):
|
|
// Fine, just means there was no task assigned yet.
|
|
return nil, nil
|
|
case isDatabaseBusyError(err):
|
|
logger.Trace().Err(err).Msg("database busy while finding task for worker")
|
|
return nil, errDatabaseBusy
|
|
case err != nil:
|
|
logger.Error().Err(err).Msg("finding task for worker")
|
|
return nil, fmt.Errorf("finding task for worker: %w", err)
|
|
}
|
|
|
|
// Assign the task to the worker.
|
|
assignmentTimestamp := db.nowNullable()
|
|
err = queries.AssignTaskToWorker(ctx, sqlc.AssignTaskToWorkerParams{
|
|
WorkerID: workerID,
|
|
TaskID: scheduledTask.Task.ID,
|
|
Now: assignmentTimestamp,
|
|
})
|
|
|
|
switch {
|
|
case isDatabaseBusyError(err):
|
|
logger.Trace().Err(err).Msg("database busy while assigning task to worker")
|
|
return nil, errDatabaseBusy
|
|
case err != nil:
|
|
logger.Warn().
|
|
Str("taskID", scheduledTask.Task.UUID).
|
|
Err(err).
|
|
Msg("assigning task to worker")
|
|
return nil, fmt.Errorf("assigning task to worker: %w", err)
|
|
}
|
|
|
|
// Make sure the returned task matches the database.
|
|
scheduledTask.Task.WorkerID = workerID
|
|
scheduledTask.Task.UpdatedAt = assignmentTimestamp
|
|
|
|
logger.Info().
|
|
Str("taskID", scheduledTask.Task.UUID).
|
|
Msg("assigned task to worker")
|
|
|
|
return scheduledTask, nil
|
|
}
|
|
|
|
func findTaskForWorker(
|
|
ctx context.Context,
|
|
queries *sqlc.Queries,
|
|
w *Worker,
|
|
) (*ScheduledTask, error) {
|
|
|
|
// Construct the list of worker tag IDs to check.
|
|
tags, err := queries.FetchTagsOfWorker(ctx, w.UUID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
workerTags := make([]sql.NullInt64, len(tags))
|
|
for index, tag := range tags {
|
|
workerTags[index] = sql.NullInt64{Int64: tag.ID, Valid: true}
|
|
}
|
|
|
|
row, err := queries.FindRunnableTask(ctx, sqlc.FindRunnableTaskParams{
|
|
WorkerID: int64(w.ID),
|
|
SchedulableTaskStatuses: schedulableTaskStatuses,
|
|
SchedulableJobStatuses: schedulableJobStatuses,
|
|
SupportedTaskTypes: w.TaskTypes(),
|
|
TaskStatusCompleted: api.TaskStatusCompleted,
|
|
WorkerTags: workerTags,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if row.Task.ID == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
scheduledTask := ScheduledTask{
|
|
Task: row.Task,
|
|
JobUUID: row.JobUUID,
|
|
JobPriority: row.JobPriority,
|
|
JobType: row.JobType,
|
|
}
|
|
return &scheduledTask, nil
|
|
}
|