package persistence // SPDX-License-Identifier: GPL-3.0-or-later import ( "context" "database/sql/driver" "encoding/json" "errors" "fmt" "gorm.io/gorm" "git.blender.org/flamenco/internal/manager/job_compilers" "git.blender.org/flamenco/pkg/api" ) type Job struct { gorm.Model UUID string `gorm:"type:char(36);default:'';unique;index"` Name string `gorm:"type:varchar(64);default:''"` JobType string `gorm:"type:varchar(32);default:''"` Priority int `gorm:"type:smallint;default:0"` Status api.JobStatus `gorm:"type:varchar(32);default:''"` Settings StringInterfaceMap `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"` } type StringInterfaceMap map[string]interface{} type StringStringMap map[string]string type Task struct { gorm.Model UUID string `gorm:"type:char(36);default:'';unique;index"` Name string `gorm:"type:varchar(64);default:''"` Type string `gorm:"type:varchar(32);default:''"` JobID uint `gorm:"default:0"` Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"` Priority int `gorm:"type:smallint;default:50"` Status api.TaskStatus `gorm:"type:varchar(16);default:''"` // Which worker is/was working on this. WorkerID *uint Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` // Dependencies are tasks that need to be completed before this one can run. Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` Commands Commands `gorm:"type:jsonb"` Activity string `gorm:"type:varchar(255);default:''"` } type Commands []Command type Command struct { Name string `json:"name"` Parameters StringInterfaceMap `json:"parameters"` } func (c Commands) Value() (driver.Value, error) { return json.Marshal(c) } func (c *Commands) Scan(value interface{}) error { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, &c) } func (js StringInterfaceMap) Value() (driver.Value, error) { return json.Marshal(js) } func (js *StringInterfaceMap) Scan(value interface{}) error { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, &js) } func (js StringStringMap) Value() (driver.Value, error) { return json.Marshal(js) } func (js *StringStringMap) Scan(value interface{}) error { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, &js) } // StoreJob stores an AuthoredJob and its tasks, and saves it to the database. // The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status. func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error { return db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // TODO: separate conversion of struct types from storing things in the database. dbJob := Job{ UUID: authoredJob.JobID, Name: authoredJob.Name, JobType: authoredJob.JobType, Status: authoredJob.Status, Priority: authoredJob.Priority, Settings: StringInterfaceMap(authoredJob.Settings), Metadata: StringStringMap(authoredJob.Metadata), } if err := tx.Create(&dbJob).Error; err != nil { return fmt.Errorf("storing job: %v", err) } uuidToTask := make(map[string]*Task) for _, authoredTask := range authoredJob.Tasks { var commands []Command for _, authoredCommand := range authoredTask.Commands { commands = append(commands, Command{ Name: authoredCommand.Name, Parameters: StringInterfaceMap(authoredCommand.Parameters), }) } dbTask := Task{ Name: authoredTask.Name, Type: authoredTask.Type, UUID: authoredTask.UUID, Job: &dbJob, Priority: authoredTask.Priority, Status: api.TaskStatusQueued, Commands: commands, // dependencies are stored below. } if err := tx.Create(&dbTask).Error; err != nil { return fmt.Errorf("storing task: %v", err) } uuidToTask[authoredTask.UUID] = &dbTask } // Store the dependencies between tasks. for _, authoredTask := range authoredJob.Tasks { if len(authoredTask.Dependencies) == 0 { continue } dbTask, ok := uuidToTask[authoredTask.UUID] if !ok { return fmt.Errorf("unable to find task %q in the database, even though it was just authored", authoredTask.UUID) } deps := make([]*Task, len(authoredTask.Dependencies)) for i, t := range authoredTask.Dependencies { depTask, ok := uuidToTask[t.UUID] if !ok { return fmt.Errorf("finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID) } deps[i] = depTask } dbTask.Dependencies = deps if err := tx.Save(dbTask).Error; err != nil { return fmt.Errorf("unable to store dependencies of task %q: %w", authoredTask.UUID, err) } } return nil }) } func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { dbJob := Job{} findResult := db.gormDB.WithContext(ctx).First(&dbJob, "uuid = ?", jobUUID) if findResult.Error != nil { return nil, findResult.Error } return &dbJob, nil } func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { tx := db.gormDB.WithContext(ctx). Model(j). Updates(Job{Status: j.Status}) if tx.Error != nil { return fmt.Errorf("saving job status: %w", tx.Error) } return nil } func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { dbTask := Task{} tx := db.gormDB.WithContext(ctx). Joins("Job"). First(&dbTask, "tasks.uuid = ?", taskUUID) if tx.Error != nil { return nil, tx.Error } return &dbTask, nil } func (db *DB) SaveTask(ctx context.Context, t *Task) error { if err := db.gormDB.WithContext(ctx).Save(t).Error; err != nil { return fmt.Errorf("saving task: %w", err) } return nil } func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error { if err := db.gormDB.Model(t).Updates(Task{Activity: t.Activity}).Error; err != nil { return fmt.Errorf("saving task activity: %w", err) } return nil } func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error { tx := db.gormDB.WithContext(ctx). Model(t).Updates(Task{WorkerID: &w.ID}) if tx.Error != nil { return fmt.Errorf("assigning task %s to worker %s: %w", t.UUID, w.UUID, tx.Error) } // Gorm updates t.WorkerID itself, but not t.Worker (even when it's added to // the Updates() call above). t.Worker = w return nil } func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]*Task, error) { result := []*Task{} tx := db.gormDB.WithContext(ctx). Model(&Task{}). Joins("Job"). Where("tasks.worker_id = ?", worker.ID). Where("tasks.status = ?", taskStatus). Scan(&result) if tx.Error != nil { return nil, fmt.Errorf("finding tasks of worker %s in status %q: %w", worker.UUID, taskStatus, tx.Error) } return result, nil } func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) { var numTasksInStatus int64 tx := db.gormDB.WithContext(ctx). Model(&Task{}). Where("job_id", job.ID). Where("status", taskStatus). Count(&numTasksInStatus) if tx.Error != nil { return false, tx.Error } return numTasksInStatus > 0, nil } func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error) { type Result struct { Status api.TaskStatus NumTasks int } var results []Result tx := db.gormDB.WithContext(ctx). Model(&Task{}). Select("status, count(*) as num_tasks"). Where("job_id", job.ID). Group("status"). Scan(&results) if tx.Error != nil { return 0, 0, fmt.Errorf("count tasks of job %s in status %q: %w", job.UUID, taskStatus, tx.Error) } for _, result := range results { if result.Status == taskStatus { numInStatus += result.NumTasks } numTotal += result.NumTasks } return } // UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`. func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job, taskStatus api.TaskStatus, activity string) error { if taskStatus == "" { return errors.New("empty status not allowed") } tx := db.gormDB.WithContext(ctx). Model(Task{}). Where("job_Id = ?", job.ID). Updates(Task{Status: taskStatus, Activity: activity}) if tx.Error != nil { return tx.Error } return nil } // UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`, // limited to those tasks with status in `statusesToUpdate`. func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job, statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error { if taskStatus == "" { return errors.New("empty status not allowed") } tx := db.gormDB.WithContext(ctx). Model(Task{}). Where("job_Id = ?", job.ID). Where("status in ?", statusesToUpdate). Updates(Task{Status: taskStatus, Activity: activity}) return tx.Error }