
Convert the database interface from the stdlib `database/sql` package to the GORM object relational mapper. GORM is also used by the Manager, and thus with this change both Worker and Manager have a uniform way of accessing their databases.
81 lines
2.1 KiB
Go
81 lines
2.1 KiB
Go
package persistence
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"git.blender.org/flamenco/pkg/api"
|
|
)
|
|
|
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
// TaskUpdate is a queued task update.
|
|
type TaskUpdate struct {
|
|
Model
|
|
|
|
TaskID string `gorm:"type:varchar(36);default:''"`
|
|
Payload []byte `gorm:"type:BLOB"`
|
|
}
|
|
|
|
func (t *TaskUpdate) Unmarshal() (*api.TaskUpdateJSONRequestBody, error) {
|
|
var apiTaskUpdate api.TaskUpdateJSONRequestBody
|
|
if err := json.Unmarshal(t.Payload, &apiTaskUpdate); err != nil {
|
|
return nil, err
|
|
}
|
|
return &apiTaskUpdate, nil
|
|
}
|
|
|
|
// UpstreamBufferQueueSize returns how many task updates are queued in the upstream buffer.
|
|
func (db *DB) UpstreamBufferQueueSize(ctx context.Context) (int, error) {
|
|
var queueSize int64
|
|
tx := db.gormDB.WithContext(ctx).
|
|
Model(&TaskUpdate{}).
|
|
Count(&queueSize)
|
|
if tx.Error != nil {
|
|
return 0, fmt.Errorf("counting queued task updates: %w", tx.Error)
|
|
}
|
|
return int(queueSize), nil
|
|
}
|
|
|
|
// UpstreamBufferQueue queues a task update in the upstrema buffer.
|
|
func (db *DB) UpstreamBufferQueue(ctx context.Context, taskID string, apiTaskUpdate api.TaskUpdateJSONRequestBody) error {
|
|
blob, err := json.Marshal(apiTaskUpdate)
|
|
if err != nil {
|
|
return fmt.Errorf("converting task update to JSON: %w", err)
|
|
}
|
|
|
|
taskUpdate := TaskUpdate{
|
|
TaskID: taskID,
|
|
Payload: blob,
|
|
}
|
|
|
|
tx := db.gormDB.WithContext(ctx).Create(&taskUpdate)
|
|
return tx.Error
|
|
}
|
|
|
|
// UpstreamBufferFrontItem returns the first-queued item. The item remains queued.
|
|
func (db *DB) UpstreamBufferFrontItem(ctx context.Context) (*TaskUpdate, error) {
|
|
taskUpdate := TaskUpdate{}
|
|
|
|
findResult := db.gormDB.WithContext(ctx).
|
|
Order("ID").
|
|
Limit(1).
|
|
Find(&taskUpdate)
|
|
if findResult.Error != nil {
|
|
return nil, findResult.Error
|
|
}
|
|
if taskUpdate.ID == 0 {
|
|
// No update fetched, which doesn't result in an error with Limt(1).Find(&task).
|
|
return nil, nil
|
|
}
|
|
|
|
return &taskUpdate, nil
|
|
}
|
|
|
|
// UpstreamBufferDiscard discards the queued task update with the given row ID.
|
|
func (db *DB) UpstreamBufferDiscard(ctx context.Context, queuedTaskUpdate *TaskUpdate) error {
|
|
tx := db.gormDB.WithContext(ctx).Delete(queuedTaskUpdate)
|
|
return tx.Error
|
|
}
|