
Instead of returning an error "error doing X", just return "doing X". The fact that it's returned as an error object says enough about that it's an error. This also makes it easier to chain error messages, without seeing the word "error" in every part of the chain.
339 lines
9.8 KiB
Go
339 lines
9.8 KiB
Go
package persistence
|
|
|
|
/* ***** BEGIN GPL LICENSE BLOCK *****
|
|
*
|
|
* Original Code Copyright (C) 2022 Blender Foundation.
|
|
*
|
|
* This file is part of Flamenco.
|
|
*
|
|
* Flamenco is free software: you can redistribute it and/or modify it under
|
|
* the terms of the GNU General Public License as published by the Free Software
|
|
* Foundation, either version 3 of the License, or (at your option) any later
|
|
* version.
|
|
*
|
|
* Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
|
|
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License along with
|
|
* Flamenco. If not, see <https://www.gnu.org/licenses/>.
|
|
*
|
|
* ***** END GPL LICENSE BLOCK ***** */
|
|
|
|
import (
|
|
"context"
|
|
"database/sql/driver"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"git.blender.org/flamenco/internal/manager/job_compilers"
|
|
"git.blender.org/flamenco/pkg/api"
|
|
)
|
|
|
|
type Job struct {
|
|
gorm.Model
|
|
UUID string `gorm:"type:char(36);default:'';unique;index"`
|
|
|
|
Name string `gorm:"type:varchar(64);default:''"`
|
|
JobType string `gorm:"type:varchar(32);default:''"`
|
|
Priority int `gorm:"type:smallint;default:0"`
|
|
Status api.JobStatus `gorm:"type:varchar(32);default:''"`
|
|
|
|
Settings StringInterfaceMap `gorm:"type:jsonb"`
|
|
Metadata StringStringMap `gorm:"type:jsonb"`
|
|
}
|
|
|
|
type StringInterfaceMap map[string]interface{}
|
|
type StringStringMap map[string]string
|
|
|
|
type Task struct {
|
|
gorm.Model
|
|
UUID string `gorm:"type:char(36);default:'';unique;index"`
|
|
|
|
Name string `gorm:"type:varchar(64);default:''"`
|
|
Type string `gorm:"type:varchar(32);default:''"`
|
|
JobID uint `gorm:"default:0"`
|
|
Job *Job `gorm:"foreignkey:JobID;references:ID;constraint:OnDelete:CASCADE"`
|
|
Priority int `gorm:"type:smallint;default:50"`
|
|
Status api.TaskStatus `gorm:"type:varchar(16);default:''"`
|
|
|
|
// Which worker is/was working on this.
|
|
WorkerID *uint
|
|
Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"`
|
|
|
|
// Dependencies are tasks that need to be completed before this one can run.
|
|
Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"`
|
|
|
|
Commands Commands `gorm:"type:jsonb"`
|
|
Activity string `gorm:"type:varchar(255);default:''"`
|
|
}
|
|
|
|
type Commands []Command
|
|
|
|
type Command struct {
|
|
Name string `json:"name"`
|
|
Parameters StringInterfaceMap `json:"parameters"`
|
|
}
|
|
|
|
func (c Commands) Value() (driver.Value, error) {
|
|
return json.Marshal(c)
|
|
}
|
|
func (c *Commands) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &c)
|
|
}
|
|
|
|
func (js StringInterfaceMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringInterfaceMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
func (js StringStringMap) Value() (driver.Value, error) {
|
|
return json.Marshal(js)
|
|
}
|
|
func (js *StringStringMap) Scan(value interface{}) error {
|
|
b, ok := value.([]byte)
|
|
if !ok {
|
|
return errors.New("type assertion to []byte failed")
|
|
}
|
|
return json.Unmarshal(b, &js)
|
|
}
|
|
|
|
// StoreJob stores an AuthoredJob and its tasks, and saves it to the database.
|
|
// The job will be in 'under construction' status. It is up to the caller to transition it to its desired initial status.
|
|
func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error {
|
|
return db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
|
// TODO: separate conversion of struct types from storing things in the database.
|
|
dbJob := Job{
|
|
UUID: authoredJob.JobID,
|
|
Name: authoredJob.Name,
|
|
JobType: authoredJob.JobType,
|
|
Status: authoredJob.Status,
|
|
Priority: authoredJob.Priority,
|
|
Settings: StringInterfaceMap(authoredJob.Settings),
|
|
Metadata: StringStringMap(authoredJob.Metadata),
|
|
}
|
|
|
|
if err := tx.Create(&dbJob).Error; err != nil {
|
|
return fmt.Errorf("storing job: %v", err)
|
|
}
|
|
|
|
uuidToTask := make(map[string]*Task)
|
|
for _, authoredTask := range authoredJob.Tasks {
|
|
var commands []Command
|
|
for _, authoredCommand := range authoredTask.Commands {
|
|
commands = append(commands, Command{
|
|
Name: authoredCommand.Name,
|
|
Parameters: StringInterfaceMap(authoredCommand.Parameters),
|
|
})
|
|
}
|
|
|
|
dbTask := Task{
|
|
Name: authoredTask.Name,
|
|
Type: authoredTask.Type,
|
|
UUID: authoredTask.UUID,
|
|
Job: &dbJob,
|
|
Priority: authoredTask.Priority,
|
|
Status: api.TaskStatusQueued,
|
|
Commands: commands,
|
|
// dependencies are stored below.
|
|
}
|
|
if err := tx.Create(&dbTask).Error; err != nil {
|
|
return fmt.Errorf("storing task: %v", err)
|
|
}
|
|
|
|
uuidToTask[authoredTask.UUID] = &dbTask
|
|
}
|
|
|
|
// Store the dependencies between tasks.
|
|
for _, authoredTask := range authoredJob.Tasks {
|
|
if len(authoredTask.Dependencies) == 0 {
|
|
continue
|
|
}
|
|
|
|
dbTask, ok := uuidToTask[authoredTask.UUID]
|
|
if !ok {
|
|
return fmt.Errorf("unable to find task %q in the database, even though it was just authored", authoredTask.UUID)
|
|
}
|
|
|
|
deps := make([]*Task, len(authoredTask.Dependencies))
|
|
for i, t := range authoredTask.Dependencies {
|
|
depTask, ok := uuidToTask[t.UUID]
|
|
if !ok {
|
|
return fmt.Errorf("finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID)
|
|
}
|
|
deps[i] = depTask
|
|
}
|
|
|
|
dbTask.Dependencies = deps
|
|
if err := tx.Save(dbTask).Error; err != nil {
|
|
return fmt.Errorf("unable to store dependencies of task %q: %w", authoredTask.UUID, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) {
|
|
dbJob := Job{}
|
|
findResult := db.gormDB.WithContext(ctx).First(&dbJob, "uuid = ?", jobUUID)
|
|
if findResult.Error != nil {
|
|
return nil, findResult.Error
|
|
}
|
|
|
|
return &dbJob, nil
|
|
}
|
|
|
|
func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error {
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(j).
|
|
Updates(Job{Status: j.Status})
|
|
if tx.Error != nil {
|
|
return fmt.Errorf("saving job status: %w", tx.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) {
|
|
dbTask := Task{}
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Joins("Job").
|
|
First(&dbTask, "tasks.uuid = ?", taskUUID)
|
|
if tx.Error != nil {
|
|
return nil, tx.Error
|
|
}
|
|
return &dbTask, nil
|
|
}
|
|
|
|
func (db *DB) SaveTask(ctx context.Context, t *Task) error {
|
|
if err := db.gormDB.WithContext(ctx).Save(t).Error; err != nil {
|
|
return fmt.Errorf("saving task: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) SaveTaskActivity(ctx context.Context, t *Task) error {
|
|
if err := db.gormDB.Model(t).Updates(Task{Activity: t.Activity}).Error; err != nil {
|
|
return fmt.Errorf("saving task activity: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) TaskAssignToWorker(ctx context.Context, t *Task, w *Worker) error {
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(t).Updates(Task{WorkerID: &w.ID})
|
|
if tx.Error != nil {
|
|
return fmt.Errorf("assigning task %s to worker %s: %w", t.UUID, w.UUID, tx.Error)
|
|
}
|
|
|
|
// Gorm updates t.WorkerID itself, but not t.Worker (even when it's added to
|
|
// the Updates() call above).
|
|
t.Worker = w
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, taskStatus api.TaskStatus) ([]*Task, error) {
|
|
result := []*Task{}
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(&Task{}).
|
|
Joins("Job").
|
|
Where("tasks.worker_id = ?", worker.ID).
|
|
Where("tasks.status = ?", taskStatus).
|
|
Scan(&result)
|
|
if tx.Error != nil {
|
|
return nil, fmt.Errorf("finding tasks of worker %s in status %q: %w", worker.UUID, taskStatus, tx.Error)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (db *DB) JobHasTasksInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (bool, error) {
|
|
var numTasksInStatus int64
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(&Task{}).
|
|
Where("job_id", job.ID).
|
|
Where("status", taskStatus).
|
|
Count(&numTasksInStatus)
|
|
if tx.Error != nil {
|
|
return false, tx.Error
|
|
}
|
|
return numTasksInStatus > 0, nil
|
|
}
|
|
|
|
func (db *DB) CountTasksOfJobInStatus(ctx context.Context, job *Job, taskStatus api.TaskStatus) (numInStatus, numTotal int, err error) {
|
|
type Result struct {
|
|
Status api.TaskStatus
|
|
NumTasks int
|
|
}
|
|
var results []Result
|
|
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(&Task{}).
|
|
Select("status, count(*) as num_tasks").
|
|
Where("job_id", job.ID).
|
|
Group("status").
|
|
Scan(&results)
|
|
|
|
if tx.Error != nil {
|
|
return 0, 0, fmt.Errorf("count tasks of job %s in status %q: %w", job.UUID, taskStatus, tx.Error)
|
|
}
|
|
|
|
for _, result := range results {
|
|
if result.Status == taskStatus {
|
|
numInStatus += result.NumTasks
|
|
}
|
|
numTotal += result.NumTasks
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// UpdateJobsTaskStatuses updates the status & activity of the tasks of `job`.
|
|
func (db *DB) UpdateJobsTaskStatuses(ctx context.Context, job *Job,
|
|
taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return errors.New("empty status not allowed")
|
|
}
|
|
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(Task{}).
|
|
Where("job_Id = ?", job.ID).
|
|
Updates(Task{Status: taskStatus, Activity: activity})
|
|
|
|
if tx.Error != nil {
|
|
return tx.Error
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateJobsTaskStatusesConditional updates the status & activity of the tasks of `job`,
|
|
// limited to those tasks with status in `statusesToUpdate`.
|
|
func (db *DB) UpdateJobsTaskStatusesConditional(ctx context.Context, job *Job,
|
|
statusesToUpdate []api.TaskStatus, taskStatus api.TaskStatus, activity string) error {
|
|
|
|
if taskStatus == "" {
|
|
return errors.New("empty status not allowed")
|
|
}
|
|
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(Task{}).
|
|
Where("job_Id = ?", job.ID).
|
|
Where("status in ?", statusesToUpdate).
|
|
Updates(Task{Status: taskStatus, Activity: activity})
|
|
return tx.Error
|
|
}
|