diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index ea9bef0f..127c1a02 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -36,6 +36,8 @@ type PersistenceService interface { SaveJobPriority(ctx context.Context, job *persistence.Job) error // FetchTask fetches the given task and the accompanying job. FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) + // FetchTaskJobUUID fetches the UUID of the job this task belongs to. + FetchTaskJobUUID(ctx context.Context, taskID string) (string, error) FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error) SaveTaskActivity(ctx context.Context, t *persistence.Task) error // TaskTouchedByWorker marks the task as 'touched' by a worker. This is used for timeout detection. diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 9e9918b2..92d2aad9 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -439,7 +439,7 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { return sendAPIError(e, http.StatusBadRequest, "bad task ID") } - dbTask, err := f.persist.FetchTask(ctx, taskID) + jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID) if err != nil { if errors.Is(err, persistence.ErrTaskNotFound) { return sendAPIError(e, http.StatusNotFound, "no such task") @@ -447,9 +447,9 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { logger.Error().Err(err).Msg("error fetching task") return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err) } - logger = logger.With().Str("job", dbTask.Job.UUID).Logger() + logger = logger.With().Str("job", jobUUID).Logger() - size, err := f.logStorage.TaskLogSize(dbTask.Job.UUID, taskID) + size, err := f.logStorage.TaskLogSize(jobUUID, taskID) if err != nil { if errors.Is(err, os.ErrNotExist) { logger.Debug().Msg("task log unavailable, task has no log on disk") @@ -475,11 +475,11 @@ func (f *Flamenco) FetchTaskLogInfo(e echo.Context, taskID string) error { taskLogInfo := api.TaskLogInfo{ TaskId: taskID, - JobId: dbTask.Job.UUID, + JobId: jobUUID, Size: int(size), } - fullLogPath := f.logStorage.Filepath(dbTask.Job.UUID, taskID) + fullLogPath := f.logStorage.Filepath(jobUUID, taskID) relPath, err := f.localStorage.RelPath(fullLogPath) if err != nil { logger.Error().Err(err).Msg("task log is outside the manager storage, cannot construct its URL for download") @@ -501,7 +501,7 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error { return sendAPIError(e, http.StatusBadRequest, "bad task ID") } - dbTask, err := f.persist.FetchTask(ctx, taskID) + jobUUID, err := f.persist.FetchTaskJobUUID(ctx, taskID) if err != nil { if errors.Is(err, persistence.ErrTaskNotFound) { return sendAPIError(e, http.StatusNotFound, "no such task") @@ -509,9 +509,9 @@ func (f *Flamenco) FetchTaskLogTail(e echo.Context, taskID string) error { logger.Error().Err(err).Msg("error fetching task") return sendAPIError(e, http.StatusInternalServerError, "error fetching task: %v", err) } - logger = logger.With().Str("job", dbTask.Job.UUID).Logger() + logger = logger.With().Str("job", jobUUID).Logger() - tail, err := f.logStorage.Tail(dbTask.Job.UUID, taskID) + tail, err := f.logStorage.Tail(jobUUID, taskID) if err != nil { if errors.Is(err, os.ErrNotExist) { logger.Debug().Msg("task tail unavailable, task has no log on disk") @@ -700,7 +700,11 @@ func taskDBtoAPI(dbTask *persistence.Task) api.Task { Status: dbTask.Status, Activity: dbTask.Activity, Commands: make([]api.Command, len(dbTask.Commands)), - Worker: workerToTaskWorker(dbTask.Worker), + + // TODO: convert this to just store dbTask.WorkerUUID. + Worker: workerToTaskWorker(dbTask.Worker), + + JobId: dbTask.JobUUID, } if dbTask.Job != nil { diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 61b52327..b5134cab 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -753,22 +753,10 @@ func TestFetchTaskLogTail(t *testing.T) { jobID := "18a9b096-d77e-438c-9be2-74397038298b" taskID := "2e020eee-20f8-4e95-8dcf-65f7dfc3ebab" - dbJob := persistence.Job{ - UUID: jobID, - Name: "test job", - Status: api.JobStatusActive, - Settings: persistence.StringInterfaceMap{}, - Metadata: persistence.StringStringMap{}, - } - dbTask := persistence.Task{ - UUID: taskID, - Job: &dbJob, - Name: "test task", - } // The task can be found, but has no on-disk task log. // This should not cause any error, but instead be returned as "no content". - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&dbTask, nil) + mf.persistence.EXPECT().FetchTaskJobUUID(gomock.Any(), taskID).Return(jobID, nil) mf.logStorage.EXPECT().Tail(jobID, taskID). Return("", fmt.Errorf("wrapped error: %w", os.ErrNotExist)) @@ -778,7 +766,7 @@ func TestFetchTaskLogTail(t *testing.T) { assertResponseNoContent(t, echoCtx) // Check that a 204 No Content is also returned when the task log file on disk exists, but is empty. - mf.persistence.EXPECT().FetchTask(gomock.Any(), taskID).Return(&dbTask, nil) + mf.persistence.EXPECT().FetchTaskJobUUID(gomock.Any(), taskID).Return(jobID, nil) mf.logStorage.EXPECT().Tail(jobID, taskID). Return("", fmt.Errorf("wrapped error: %w", os.ErrNotExist)) @@ -796,21 +784,9 @@ func TestFetchTaskLogInfo(t *testing.T) { jobID := "18a9b096-d77e-438c-9be2-74397038298b" taskID := "2e020eee-20f8-4e95-8dcf-65f7dfc3ebab" - dbJob := persistence.Job{ - UUID: jobID, - Name: "test job", - Status: api.JobStatusActive, - Settings: persistence.StringInterfaceMap{}, - Metadata: persistence.StringStringMap{}, - } - dbTask := persistence.Task{ - UUID: taskID, - Job: &dbJob, - Name: "test task", - } mf.persistence.EXPECT(). - FetchTask(gomock.Any(), taskID). - Return(&dbTask, nil). + FetchTaskJobUUID(gomock.Any(), taskID). + Return(jobID, nil). AnyTimes() // The task can be found, but has no on-disk task log. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 9f63ca2d..03bb6c96 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -244,6 +244,21 @@ func (mr *MockPersistenceServiceMockRecorder) FetchTaskFailureList(arg0, arg1 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTaskFailureList", reflect.TypeOf((*MockPersistenceService)(nil).FetchTaskFailureList), arg0, arg1) } +// FetchTaskJobUUID mocks base method. +func (m *MockPersistenceService) FetchTaskJobUUID(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTaskJobUUID", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTaskJobUUID indicates an expected call of FetchTaskJobUUID. +func (mr *MockPersistenceServiceMockRecorder) FetchTaskJobUUID(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTaskJobUUID", reflect.TypeOf((*MockPersistenceService)(nil).FetchTaskJobUUID), arg0, arg1) +} + // FetchWorker mocks base method. func (m *MockPersistenceService) FetchWorker(arg0 context.Context, arg1 string) (*persistence.Worker, error) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/db.go b/internal/manager/persistence/db.go index 1ffe2806..388a4013 100644 --- a/internal/manager/persistence/db.go +++ b/internal/manager/persistence/db.go @@ -184,7 +184,9 @@ func (db *DB) queries() (*sqlc.Queries, error) { if err != nil { return nil, fmt.Errorf("could not get low-level database driver: %w", err) } - return sqlc.New(sqldb), nil + + loggingWrapper := LoggingDBConn{sqldb} + return sqlc.New(&loggingWrapper), nil } // now returns the result of `nowFunc()` wrapped in a sql.NullTime. diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index 24eb3dae..816a1383 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -2,6 +2,7 @@ package persistence import ( + "database/sql" "errors" "fmt" @@ -9,6 +10,7 @@ import ( ) var ( + // TODO: let these errors wrap database/sql.ErrNoRows. ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} @@ -63,36 +65,48 @@ func wrapError(errorToWrap error, message string, format ...interface{}) error { // translateGormJobError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormJobError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormJobError(err error) error { + if errors.Is(err, sql.ErrNoRows) { + return ErrTaskNotFound + } + if errors.Is(err, gorm.ErrRecordNotFound) { return ErrJobNotFound } - return gormError + return err } // translateGormTaskError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormTaskError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormTaskError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrTaskNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrTaskNotFound + } + return err } // translateGormWorkerError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormWorkerError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrWorkerNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrWorkerNotFound + } + return err } // translateGormWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerTagError(gormError error) error { - if errors.Is(gormError, gorm.ErrRecordNotFound) { +func translateGormWorkerTagError(err error) error { + if errors.Is(err, sql.ErrNoRows) { return ErrWorkerTagNotFound } - return gormError + if errors.Is(err, gorm.ErrRecordNotFound) { + return ErrWorkerTagNotFound + } + return err } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8a2f94d3..f28f0802 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -66,12 +66,14 @@ type Task struct { Type string `gorm:"type:varchar(32);default:''"` JobID uint `gorm:"default:0"` Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` + JobUUID string `gorm:"-"` // Fetched by SQLC, not GORM. Priority int `gorm:"type:smallint;default:50"` Status api.TaskStatus `gorm:"type:varchar(16);default:''"` // Which worker is/was working on this. WorkerID *uint Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:SET NULL"` + WorkerUUID string `gorm:"-"` // Fetched by SQLC, not GORM. LastTouchedAt time.Time `gorm:"index"` // Should contain UTC timestamps. // Dependencies are tasks that need to be completed before this one can run. @@ -454,18 +456,66 @@ func (db *DB) SaveJobStorageInfo(ctx context.Context, j *Job) error { } func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { - dbTask := Task{} - tx := db.gormDB.WithContext(ctx). - // Allow finding the Worker, even after it was deleted. Jobs and Tasks - // don't have soft-deletion. - Unscoped(). - Joins("Job"). - Joins("Worker"). - First(&dbTask, "tasks.uuid = ?", taskUUID) - if tx.Error != nil { - return nil, taskError(tx.Error, "fetching task") + queries, err := db.queries() + if err != nil { + return nil, err } - return &dbTask, nil + + taskRow, err := queries.FetchTask(ctx, taskUUID) + if err != nil { + return nil, taskError(err, "fetching task %s", taskUUID) + } + + convertedTask, err := convertSqlcTask(taskRow) + if err != nil { + return nil, err + } + + // TODO: remove this code, and let the caller fetch the job explicitly when needed. + if taskRow.Task.JobID > 0 { + dbJob, err := queries.FetchJobByID(ctx, taskRow.Task.JobID) + if err != nil { + return nil, jobError(err, "fetching job of task %s", taskUUID) + } + + convertedJob, err := convertSqlcJob(dbJob) + if err != nil { + return nil, jobError(err, "converting job of task %s", taskUUID) + } + convertedTask.Job = convertedJob + if convertedTask.JobUUID != convertedJob.UUID { + panic("Conversion to SQLC is incomplete") + } + } + + // TODO: remove this code, and let the caller fetch the Worker explicitly when needed. + if taskRow.WorkerUUID.Valid { + worker, err := queries.FetchWorkerUnconditional(ctx, taskRow.WorkerUUID.String) + if err != nil { + return nil, taskError(err, "fetching worker assigned to task %s", taskUUID) + } + convertedWorker := convertSqlcWorker(worker) + convertedTask.Worker = &convertedWorker + } + + return convertedTask, nil +} + +// FetchTaskJobUUID fetches the job UUID of the given task. +func (db *DB) FetchTaskJobUUID(ctx context.Context, taskUUID string) (string, error) { + queries, err := db.queries() + if err != nil { + return "", err + } + + jobUUID, err := queries.FetchTaskJobUUID(ctx, taskUUID) + if err != nil { + return "", taskError(err, "fetching job UUID of task %s", taskUUID) + } + if !jobUUID.Valid { + return "", PersistenceError{Message: fmt.Sprintf("unable to find job of task %s", taskUUID)} + } + return jobUUID.String, nil } func (db *DB) SaveTask(ctx context.Context, t *Task) error { @@ -791,3 +841,43 @@ func convertSqlcJob(job sqlc.Job) (*Job, error) { return &dbJob, nil } + +// convertSqlcTask converts a FetchTaskRow from the SQLC-generated model to the +// model expected by the rest of the code. This is mostly in place to aid in the +// GORM to SQLC migration. It is intended that eventually the rest of the code +// will use the same SQLC-generated model. +func convertSqlcTask(taskRow sqlc.FetchTaskRow) (*Task, error) { + dbTask := Task{ + Model: Model{ + ID: uint(taskRow.Task.ID), + CreatedAt: taskRow.Task.CreatedAt, + UpdatedAt: taskRow.Task.UpdatedAt.Time, + }, + + UUID: taskRow.Task.UUID, + Name: taskRow.Task.Name, + Type: taskRow.Task.Type, + Priority: int(taskRow.Task.Priority), + Status: api.TaskStatus(taskRow.Task.Status), + LastTouchedAt: taskRow.Task.LastTouchedAt.Time, + Activity: taskRow.Task.Activity, + + JobID: uint(taskRow.Task.JobID), + JobUUID: taskRow.JobUUID.String, + WorkerUUID: taskRow.WorkerUUID.String, + } + + // TODO: convert dependencies? + + if taskRow.Task.WorkerID.Valid { + workerID := uint(taskRow.Task.WorkerID.Int64) + dbTask.WorkerID = &workerID + } + + if err := json.Unmarshal(taskRow.Task.Commands, &dbTask.Commands); err != nil { + return nil, taskError(err, fmt.Sprintf("task %s of job %s has invalid commands: %v", + taskRow.Task.UUID, taskRow.JobUUID.String, err)) + } + + return &dbTask, nil +} diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index 436c5f18..cd264f7f 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -238,9 +238,12 @@ func TestCountTaskFailuresOfWorker(t *testing.T) { ctx, close, db, dbJob, authoredJob := jobTasksTestFixtures(t) defer close() - task0, _ := db.FetchTask(ctx, authoredJob.Tasks[0].UUID) - task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) - task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + task0, err := db.FetchTask(ctx, authoredJob.Tasks[0].UUID) + require.NoError(t, err) + task1, err := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + require.NoError(t, err) + task2, err := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + require.NoError(t, err) // Sanity check on the test data. assert.Equal(t, "blender", task0.Type) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 000a6db4..20a0b1d5 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -75,6 +75,19 @@ func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) { assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID) } +func TestFetchTaskJobUUID(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) + defer cancel() + + job := createTestAuthoredJobWithTasks() + err := db.StoreAuthoredJob(ctx, job) + require.NoError(t, err) + + jobUUID, err := db.FetchTaskJobUUID(ctx, job.Tasks[0].UUID) + require.NoError(t, err) + assert.Equal(t, job.JobID, jobUUID) +} + func TestSaveJobStorageInfo(t *testing.T) { // Test that saving job storage info doesn't count as "update". // This is necessary for `cmd/shaman-checkout-id-setter` to do its work quietly. diff --git a/internal/manager/persistence/logger.go b/internal/manager/persistence/logger.go index 2135a006..3e346ef8 100644 --- a/internal/manager/persistence/logger.go +++ b/internal/manager/persistence/logger.go @@ -4,13 +4,16 @@ package persistence import ( "context" + "database/sql" "errors" "fmt" "time" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "gorm.io/gorm" gormlogger "gorm.io/gorm/logger" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" ) // dbLogger implements the behaviour of Gorm's default logger on top of Zerolog. @@ -126,3 +129,28 @@ func (l dbLogger) logger(args ...interface{}) zerolog.Logger { } return logCtx.Logger() } + +// LoggingDBConn wraps a database/sql.DB connection, so that it can be used with +// sqlc and log all the queries. +type LoggingDBConn struct { + wrappedConn sqlc.DBTX +} + +var _ sqlc.DBTX = (*LoggingDBConn)(nil) + +func (ldbc *LoggingDBConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Exec") + return ldbc.wrappedConn.ExecContext(ctx, sql, args...) +} +func (ldbc *LoggingDBConn) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) { + log.Trace().Str("sql", sql).Msg("database: query Prepare") + return ldbc.wrappedConn.PrepareContext(ctx, sql) +} +func (ldbc *LoggingDBConn) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query Query") + return ldbc.wrappedConn.QueryContext(ctx, sql, args...) +} +func (ldbc *LoggingDBConn) QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row { + log.Trace().Str("sql", sql).Interface("args", args).Msg("database: query QueryRow") + return ldbc.wrappedConn.QueryRowContext(ctx, sql, args...) +} diff --git a/internal/manager/persistence/sqlc/query_jobs.sql b/internal/manager/persistence/sqlc/query_jobs.sql index 0f606454..e3108647 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql +++ b/internal/manager/persistence/sqlc/query_jobs.sql @@ -18,9 +18,15 @@ INSERT INTO jobs ( VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); -- name: FetchJob :one +-- Fetch a job by its UUID. SELECT * FROM jobs WHERE uuid = ? LIMIT 1; +-- name: FetchJobByID :one +-- Fetch a job by its numerical ID. +SELECT * FROM jobs +WHERE id = ? LIMIT 1; + -- name: DeleteJob :exec DELETE FROM jobs WHERE uuid = ?; @@ -55,3 +61,16 @@ UPDATE jobs SET updated_at=@now, priority=@priority WHERE id=@id; -- name: SaveJobStorageInfo :exec UPDATE jobs SET storage_shaman_checkout_id=@storage_shaman_checkout_id WHERE id=@id; + +-- name: FetchTask :one +SELECT sqlc.embed(tasks), jobs.UUID as jobUUID, workers.UUID as workerUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.uuid = @uuid; + +-- name: FetchTaskJobUUID :one +SELECT jobs.UUID as jobUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +WHERE tasks.uuid = @uuid; diff --git a/internal/manager/persistence/sqlc/query_jobs.sql.go b/internal/manager/persistence/sqlc/query_jobs.sql.go index 26032839..6d1d70f4 100644 --- a/internal/manager/persistence/sqlc/query_jobs.sql.go +++ b/internal/manager/persistence/sqlc/query_jobs.sql.go @@ -74,6 +74,7 @@ SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activ WHERE uuid = ? LIMIT 1 ` +// Fetch a job by its UUID. func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { row := q.db.QueryRowContext(ctx, fetchJob, uuid) var i Job @@ -96,6 +97,34 @@ func (q *Queries) FetchJob(ctx context.Context, uuid string) (Job, error) { return i, err } +const fetchJobByID = `-- name: FetchJobByID :one +SELECT id, created_at, updated_at, uuid, name, job_type, priority, status, activity, settings, metadata, delete_requested_at, storage_shaman_checkout_id, worker_tag_id FROM jobs +WHERE id = ? LIMIT 1 +` + +// Fetch a job by its numerical ID. +func (q *Queries) FetchJobByID(ctx context.Context, id int64) (Job, error) { + row := q.db.QueryRowContext(ctx, fetchJobByID, id) + var i Job + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.JobType, + &i.Priority, + &i.Status, + &i.Activity, + &i.Settings, + &i.Metadata, + &i.DeleteRequestedAt, + &i.StorageShamanCheckoutID, + &i.WorkerTagID, + ) + return i, err +} + const fetchJobUUIDsUpdatedBefore = `-- name: FetchJobUUIDsUpdatedBefore :many SELECT uuid FROM jobs WHERE updated_at <= ?1 ` @@ -204,6 +233,57 @@ func (q *Queries) FetchJobsInStatus(ctx context.Context, statuses []string) ([]J return items, nil } +const fetchTask = `-- name: FetchTask :one +SELECT tasks.id, tasks.created_at, tasks.updated_at, tasks.uuid, tasks.name, tasks.type, tasks.job_id, tasks.priority, tasks.status, tasks.worker_id, tasks.last_touched_at, tasks.commands, tasks.activity, jobs.UUID as jobUUID, workers.UUID as workerUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +LEFT JOIN workers ON (tasks.worker_id = workers.id) +WHERE tasks.uuid = ?1 +` + +type FetchTaskRow struct { + Task Task + JobUUID sql.NullString + WorkerUUID sql.NullString +} + +func (q *Queries) FetchTask(ctx context.Context, uuid string) (FetchTaskRow, error) { + row := q.db.QueryRowContext(ctx, fetchTask, uuid) + var i FetchTaskRow + err := row.Scan( + &i.Task.ID, + &i.Task.CreatedAt, + &i.Task.UpdatedAt, + &i.Task.UUID, + &i.Task.Name, + &i.Task.Type, + &i.Task.JobID, + &i.Task.Priority, + &i.Task.Status, + &i.Task.WorkerID, + &i.Task.LastTouchedAt, + &i.Task.Commands, + &i.Task.Activity, + &i.JobUUID, + &i.WorkerUUID, + ) + return i, err +} + +const fetchTaskJobUUID = `-- name: FetchTaskJobUUID :one +SELECT jobs.UUID as jobUUID +FROM tasks +LEFT JOIN jobs ON (tasks.job_id = jobs.id) +WHERE tasks.uuid = ?1 +` + +func (q *Queries) FetchTaskJobUUID(ctx context.Context, uuid string) (sql.NullString, error) { + row := q.db.QueryRowContext(ctx, fetchTaskJobUUID, uuid) + var jobuuid sql.NullString + err := row.Scan(&jobuuid) + return jobuuid, err +} + const requestJobDeletion = `-- name: RequestJobDeletion :exec UPDATE jobs SET updated_at = ?1, diff --git a/internal/manager/persistence/sqlc/query_workers.sql b/internal/manager/persistence/sqlc/query_workers.sql new file mode 100644 index 00000000..a2d6713c --- /dev/null +++ b/internal/manager/persistence/sqlc/query_workers.sql @@ -0,0 +1,18 @@ + +-- Worker queries +-- + +-- name: FetchWorker :one +-- FetchWorker only returns the worker if it wasn't soft-deleted. +SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL; + +-- name: FetchWorkerUnconditional :one +-- FetchWorkerUnconditional ignores soft-deletion status and just returns the worker. +SELECT * FROM workers WHERE workers.uuid = @uuid; + +-- name: FetchWorkerTags :many +SELECT worker_tags.* +FROM worker_tags +LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) +LEFT JOIN workers on (m.worker_id = workers.id) +WHERE workers.uuid = @uuid; diff --git a/internal/manager/persistence/sqlc/query_workers.sql.go b/internal/manager/persistence/sqlc/query_workers.sql.go new file mode 100644 index 00000000..663fa422 --- /dev/null +++ b/internal/manager/persistence/sqlc/query_workers.sql.go @@ -0,0 +1,109 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.25.0 +// source: query_workers.sql + +package sqlc + +import ( + "context" +) + +const fetchWorker = `-- name: FetchWorker :one + +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 and deleted_at is NULL +` + +// Worker queries +// +// FetchWorker only returns the worker if it wasn't soft-deleted. +func (q *Queries) FetchWorker(ctx context.Context, uuid string) (Worker, error) { + row := q.db.QueryRowContext(ctx, fetchWorker, uuid) + var i Worker + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ) + return i, err +} + +const fetchWorkerTags = `-- name: FetchWorkerTags :many +SELECT worker_tags.id, worker_tags.created_at, worker_tags.updated_at, worker_tags.uuid, worker_tags.name, worker_tags.description +FROM worker_tags +LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id) +LEFT JOIN workers on (m.worker_id = workers.id) +WHERE workers.uuid = ?1 +` + +func (q *Queries) FetchWorkerTags(ctx context.Context, uuid string) ([]WorkerTag, error) { + rows, err := q.db.QueryContext(ctx, fetchWorkerTags, uuid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []WorkerTag + for rows.Next() { + var i WorkerTag + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Name, + &i.Description, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const fetchWorkerUnconditional = `-- name: FetchWorkerUnconditional :one +SELECT id, created_at, updated_at, uuid, secret, name, address, platform, software, status, last_seen_at, status_requested, lazy_status_request, supported_task_types, deleted_at, can_restart FROM workers WHERE workers.uuid = ?1 +` + +// FetchWorkerUnconditional ignores soft-deletion status and just returns the worker. +func (q *Queries) FetchWorkerUnconditional(ctx context.Context, uuid string) (Worker, error) { + row := q.db.QueryRowContext(ctx, fetchWorkerUnconditional, uuid) + var i Worker + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.UUID, + &i.Secret, + &i.Name, + &i.Address, + &i.Platform, + &i.Software, + &i.Status, + &i.LastSeenAt, + &i.StatusRequested, + &i.LazyStatusRequest, + &i.SupportedTaskTypes, + &i.DeletedAt, + &i.CanRestart, + ) + return i, err +} diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index cba04ade..a0c2a92f 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -15,7 +15,6 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "gorm.io/gorm" - "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -106,7 +105,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe ctx, cancel, db := persistenceTestFixtures(t, testContextTimeout) w := Worker{ - UUID: uuid.New(), + UUID: "557930e7-5b55-469e-a6d7-fc800f3685be", Name: "дрон", Address: "fe80::5054:ff:fede:2ad7", Platform: "linux", @@ -116,7 +115,7 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe } wc := WorkerTag{ - UUID: uuid.New(), + UUID: "e0e05417-9793-4829-b1d0-d446dd819f3d", Name: "arbejdsklynge", Description: "Worker tag in Danish", } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index a9637d54..c7d87f9e 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "gorm.io/gorm" + "projects.blender.org/studio/flamenco/internal/manager/persistence/sqlc" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -73,18 +74,30 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { } func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { - w := Worker{} - tx := db.gormDB.WithContext(ctx). - Preload("Tags"). - Find(&w, "uuid = ?", uuid). - Limit(1) - if tx.Error != nil { - return nil, workerError(tx.Error, "fetching worker") + queries, err := db.queries() + if err != nil { + return nil, err } - if w.ID == 0 { - return nil, ErrWorkerNotFound + + worker, err := queries.FetchWorker(ctx, uuid) + if err != nil { + return nil, workerError(err, "fetching worker %s", uuid) } - return &w, nil + + // TODO: remove this code, and let the caller fetch the tags when interested in them. + workerTags, err := queries.FetchWorkerTags(ctx, uuid) + if err != nil { + return nil, workerTagError(err, "fetching tags of worker %s", uuid) + } + + convertedWorker := convertSqlcWorker(worker) + convertedWorker.Tags = make([]*WorkerTag, len(workerTags)) + for index := range workerTags { + convertedTag := convertSqlcWorkerTag(workerTags[index]) + convertedWorker.Tags[index] = &convertedTag + } + + return &convertedWorker, nil } func (db *DB) DeleteWorker(ctx context.Context, uuid string) error { @@ -216,3 +229,48 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e return statusCounts, nil } + +// convertSqlcWorker converts a worker from the SQLC-generated model to the model +// expected by the rest of the code. This is mostly in place to aid in the GORM +// to SQLC migration. It is intended that eventually the rest of the code will +// use the same SQLC-generated model. +func convertSqlcWorker(worker sqlc.Worker) Worker { + return Worker{ + Model: Model{ + ID: uint(worker.ID), + CreatedAt: worker.CreatedAt, + UpdatedAt: worker.UpdatedAt.Time, + }, + DeletedAt: gorm.DeletedAt(worker.DeletedAt), + + UUID: worker.UUID, + Secret: worker.Secret, + Name: worker.Name, + Address: worker.Address, + Platform: worker.Platform, + Software: worker.Software, + Status: api.WorkerStatus(worker.Status), + LastSeenAt: worker.LastSeenAt.Time, + CanRestart: worker.CanRestart != 0, + StatusRequested: api.WorkerStatus(worker.StatusRequested), + LazyStatusRequest: worker.LazyStatusRequest != 0, + SupportedTaskTypes: worker.SupportedTaskTypes, + } +} + +// convertSqlcWorkerTag converts a worker tag from the SQLC-generated model to +// the model expected by the rest of the code. This is mostly in place to aid in +// the GORM to SQLC migration. It is intended that eventually the rest of the +// code will use the same SQLC-generated model. +func convertSqlcWorkerTag(tag sqlc.WorkerTag) WorkerTag { + return WorkerTag{ + Model: Model{ + ID: uint(tag.ID), + CreatedAt: tag.CreatedAt, + UpdatedAt: tag.UpdatedAt.Time, + }, + UUID: tag.UUID, + Name: tag.Name, + Description: tag.Description, + } +} diff --git a/sqlc.yaml b/sqlc.yaml index 887ce133..5fd3fcbc 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -14,3 +14,23 @@ sql: rename: uuid: "UUID" uuids: "UUIDs" + jobuuid: "JobUUID" + taskUUID: "TaskUUID" + workeruuid: "WorkerUUID" + - engine: "sqlite" + schema: "internal/manager/persistence/sqlc/schema.sql" + queries: "internal/manager/persistence/sqlc/query_workers.sql" + gen: + go: + out: "internal/manager/persistence/sqlc" + overrides: + - db_type: "jsonb" + go_type: + import: "encoding/json" + type: "RawMessage" + rename: + uuid: "UUID" + uuids: "UUIDs" + jobuuid: "JobUUID" + taskUUID: "TaskUUID" + workeruuid: "WorkerUUID"