Manager: convert more worker functions to sqlc
No functional changes.
This commit is contained in:
parent
9a229a7b8f
commit
7a9f809c43
@ -40,6 +40,10 @@ RETURNING id;
|
|||||||
INSERT INTO worker_tag_membership (worker_tag_id, worker_id)
|
INSERT INTO worker_tag_membership (worker_tag_id, worker_id)
|
||||||
VALUES (@worker_tag_id, @worker_id);
|
VALUES (@worker_tag_id, @worker_id);
|
||||||
|
|
||||||
|
-- name: FetchWorkers :many
|
||||||
|
SELECT sqlc.embed(workers) FROM workers
|
||||||
|
WHERE deleted_at IS NULL;
|
||||||
|
|
||||||
-- name: FetchWorker :one
|
-- name: FetchWorker :one
|
||||||
-- FetchWorker only returns the worker if it wasn't soft-deleted.
|
-- FetchWorker only returns the worker if it wasn't soft-deleted.
|
||||||
SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL;
|
SELECT * FROM workers WHERE workers.uuid = @uuid and deleted_at is NULL;
|
||||||
@ -54,3 +58,43 @@ FROM worker_tags
|
|||||||
LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id)
|
LEFT JOIN worker_tag_membership m ON (m.worker_tag_id = worker_tags.id)
|
||||||
LEFT JOIN workers on (m.worker_id = workers.id)
|
LEFT JOIN workers on (m.worker_id = workers.id)
|
||||||
WHERE workers.uuid = @uuid;
|
WHERE workers.uuid = @uuid;
|
||||||
|
|
||||||
|
-- name: SoftDeleteWorker :execrows
|
||||||
|
UPDATE workers SET deleted_at=@deleted_at
|
||||||
|
WHERE uuid=@uuid;
|
||||||
|
|
||||||
|
-- name: SaveWorkerStatus :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=@updated_at,
|
||||||
|
status=@status,
|
||||||
|
status_requested=@status_requested,
|
||||||
|
lazy_status_request=@lazy_status_request
|
||||||
|
WHERE id=@id;
|
||||||
|
|
||||||
|
-- name: SaveWorker :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=@updated_at,
|
||||||
|
uuid=@uuid,
|
||||||
|
secret=@secret,
|
||||||
|
name=@name,
|
||||||
|
address=@address,
|
||||||
|
platform=@platform,
|
||||||
|
software=@software,
|
||||||
|
status=@status,
|
||||||
|
last_seen_at=@last_seen_at,
|
||||||
|
status_requested=@status_requested,
|
||||||
|
lazy_status_request=@lazy_status_request,
|
||||||
|
supported_task_types=@supported_task_types,
|
||||||
|
can_restart=@can_restart
|
||||||
|
WHERE id=@id;
|
||||||
|
|
||||||
|
-- name: WorkerSeen :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=@updated_at,
|
||||||
|
last_seen_at=@last_seen_at
|
||||||
|
WHERE id=@id;
|
||||||
|
|
||||||
|
-- name: SummarizeWorkerStatuses :many
|
||||||
|
SELECT status, count(id) as status_count FROM workers
|
||||||
|
WHERE deleted_at is NULL
|
||||||
|
GROUP BY status;
|
||||||
|
@ -198,3 +198,205 @@ func (q *Queries) FetchWorkerUnconditional(ctx context.Context, uuid string) (Wo
|
|||||||
)
|
)
|
||||||
return i, err
|
return i, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const fetchWorkers = `-- name: FetchWorkers :many
|
||||||
|
SELECT workers.id, workers.created_at, workers.updated_at, workers.uuid, workers.secret, workers.name, workers.address, workers.platform, workers.software, workers.status, workers.last_seen_at, workers.status_requested, workers.lazy_status_request, workers.supported_task_types, workers.deleted_at, workers.can_restart FROM workers
|
||||||
|
WHERE deleted_at IS NULL
|
||||||
|
`
|
||||||
|
|
||||||
|
type FetchWorkersRow struct {
|
||||||
|
Worker Worker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) FetchWorkers(ctx context.Context) ([]FetchWorkersRow, error) {
|
||||||
|
rows, err := q.db.QueryContext(ctx, fetchWorkers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var items []FetchWorkersRow
|
||||||
|
for rows.Next() {
|
||||||
|
var i FetchWorkersRow
|
||||||
|
if err := rows.Scan(
|
||||||
|
&i.Worker.ID,
|
||||||
|
&i.Worker.CreatedAt,
|
||||||
|
&i.Worker.UpdatedAt,
|
||||||
|
&i.Worker.UUID,
|
||||||
|
&i.Worker.Secret,
|
||||||
|
&i.Worker.Name,
|
||||||
|
&i.Worker.Address,
|
||||||
|
&i.Worker.Platform,
|
||||||
|
&i.Worker.Software,
|
||||||
|
&i.Worker.Status,
|
||||||
|
&i.Worker.LastSeenAt,
|
||||||
|
&i.Worker.StatusRequested,
|
||||||
|
&i.Worker.LazyStatusRequest,
|
||||||
|
&i.Worker.SupportedTaskTypes,
|
||||||
|
&i.Worker.DeletedAt,
|
||||||
|
&i.Worker.CanRestart,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
items = append(items, i)
|
||||||
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const saveWorker = `-- name: SaveWorker :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=?1,
|
||||||
|
uuid=?2,
|
||||||
|
secret=?3,
|
||||||
|
name=?4,
|
||||||
|
address=?5,
|
||||||
|
platform=?6,
|
||||||
|
software=?7,
|
||||||
|
status=?8,
|
||||||
|
last_seen_at=?9,
|
||||||
|
status_requested=?10,
|
||||||
|
lazy_status_request=?11,
|
||||||
|
supported_task_types=?12,
|
||||||
|
can_restart=?13
|
||||||
|
WHERE id=?14
|
||||||
|
`
|
||||||
|
|
||||||
|
type SaveWorkerParams struct {
|
||||||
|
UpdatedAt sql.NullTime
|
||||||
|
UUID string
|
||||||
|
Secret string
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Platform string
|
||||||
|
Software string
|
||||||
|
Status string
|
||||||
|
LastSeenAt sql.NullTime
|
||||||
|
StatusRequested string
|
||||||
|
LazyStatusRequest bool
|
||||||
|
SupportedTaskTypes string
|
||||||
|
CanRestart bool
|
||||||
|
ID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) SaveWorker(ctx context.Context, arg SaveWorkerParams) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, saveWorker,
|
||||||
|
arg.UpdatedAt,
|
||||||
|
arg.UUID,
|
||||||
|
arg.Secret,
|
||||||
|
arg.Name,
|
||||||
|
arg.Address,
|
||||||
|
arg.Platform,
|
||||||
|
arg.Software,
|
||||||
|
arg.Status,
|
||||||
|
arg.LastSeenAt,
|
||||||
|
arg.StatusRequested,
|
||||||
|
arg.LazyStatusRequest,
|
||||||
|
arg.SupportedTaskTypes,
|
||||||
|
arg.CanRestart,
|
||||||
|
arg.ID,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
const saveWorkerStatus = `-- name: SaveWorkerStatus :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=?1,
|
||||||
|
status=?2,
|
||||||
|
status_requested=?3,
|
||||||
|
lazy_status_request=?4
|
||||||
|
WHERE id=?5
|
||||||
|
`
|
||||||
|
|
||||||
|
type SaveWorkerStatusParams struct {
|
||||||
|
UpdatedAt sql.NullTime
|
||||||
|
Status string
|
||||||
|
StatusRequested string
|
||||||
|
LazyStatusRequest bool
|
||||||
|
ID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) SaveWorkerStatus(ctx context.Context, arg SaveWorkerStatusParams) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, saveWorkerStatus,
|
||||||
|
arg.UpdatedAt,
|
||||||
|
arg.Status,
|
||||||
|
arg.StatusRequested,
|
||||||
|
arg.LazyStatusRequest,
|
||||||
|
arg.ID,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
const softDeleteWorker = `-- name: SoftDeleteWorker :execrows
|
||||||
|
UPDATE workers SET deleted_at=?1
|
||||||
|
WHERE uuid=?2
|
||||||
|
`
|
||||||
|
|
||||||
|
type SoftDeleteWorkerParams struct {
|
||||||
|
DeletedAt sql.NullTime
|
||||||
|
UUID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) SoftDeleteWorker(ctx context.Context, arg SoftDeleteWorkerParams) (int64, error) {
|
||||||
|
result, err := q.db.ExecContext(ctx, softDeleteWorker, arg.DeletedAt, arg.UUID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return result.RowsAffected()
|
||||||
|
}
|
||||||
|
|
||||||
|
const summarizeWorkerStatuses = `-- name: SummarizeWorkerStatuses :many
|
||||||
|
SELECT status, count(id) as status_count FROM workers
|
||||||
|
WHERE deleted_at is NULL
|
||||||
|
GROUP BY status
|
||||||
|
`
|
||||||
|
|
||||||
|
type SummarizeWorkerStatusesRow struct {
|
||||||
|
Status string
|
||||||
|
StatusCount int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) SummarizeWorkerStatuses(ctx context.Context) ([]SummarizeWorkerStatusesRow, error) {
|
||||||
|
rows, err := q.db.QueryContext(ctx, summarizeWorkerStatuses)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var items []SummarizeWorkerStatusesRow
|
||||||
|
for rows.Next() {
|
||||||
|
var i SummarizeWorkerStatusesRow
|
||||||
|
if err := rows.Scan(&i.Status, &i.StatusCount); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
items = append(items, i)
|
||||||
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return items, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const workerSeen = `-- name: WorkerSeen :exec
|
||||||
|
UPDATE workers SET
|
||||||
|
updated_at=?1,
|
||||||
|
last_seen_at=?2
|
||||||
|
WHERE id=?3
|
||||||
|
`
|
||||||
|
|
||||||
|
type WorkerSeenParams struct {
|
||||||
|
UpdatedAt sql.NullTime
|
||||||
|
LastSeenAt sql.NullTime
|
||||||
|
ID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) WorkerSeen(ctx context.Context, arg WorkerSeenParams) error {
|
||||||
|
_, err := q.db.ExecContext(ctx, workerSeen, arg.UpdatedAt, arg.LastSeenAt, arg.ID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -152,25 +152,41 @@ func (db *DB) DeleteWorker(ctx context.Context, uuid string) error {
|
|||||||
return ErrDeletingWithoutFK
|
return ErrDeletingWithoutFK
|
||||||
}
|
}
|
||||||
|
|
||||||
tx := db.gormDB.WithContext(ctx).
|
queries, err := db.queries()
|
||||||
Where("uuid = ?", uuid).
|
if err != nil {
|
||||||
Delete(&Worker{})
|
return err
|
||||||
if tx.Error != nil {
|
|
||||||
return workerError(tx.Error, "deleting worker")
|
|
||||||
}
|
}
|
||||||
if tx.RowsAffected == 0 {
|
|
||||||
|
rowsAffected, err := queries.SoftDeleteWorker(ctx, sqlc.SoftDeleteWorkerParams{
|
||||||
|
DeletedAt: db.now(),
|
||||||
|
UUID: uuid,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return workerError(err, "deleting worker")
|
||||||
|
}
|
||||||
|
if rowsAffected == 0 {
|
||||||
return ErrWorkerNotFound
|
return ErrWorkerNotFound
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
|
func (db *DB) FetchWorkers(ctx context.Context) ([]*Worker, error) {
|
||||||
workers := make([]*Worker, 0)
|
queries, err := db.queries()
|
||||||
tx := db.gormDB.WithContext(ctx).Model(&Worker{}).Scan(&workers)
|
if err != nil {
|
||||||
if tx.Error != nil {
|
return nil, err
|
||||||
return nil, workerError(tx.Error, "fetching all workers")
|
|
||||||
}
|
}
|
||||||
return workers, nil
|
|
||||||
|
workers, err := queries.FetchWorkers(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, workerError(err, "fetching all workers")
|
||||||
|
}
|
||||||
|
|
||||||
|
gormWorkers := make([]*Worker, len(workers))
|
||||||
|
for idx := range workers {
|
||||||
|
worker := convertSqlcWorker(workers[idx].Worker)
|
||||||
|
gormWorkers[idx] = &worker
|
||||||
|
}
|
||||||
|
return gormWorkers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchWorkerTask returns the most recent task assigned to the given Worker.
|
// FetchWorkerTask returns the most recent task assigned to the given Worker.
|
||||||
@ -210,22 +226,52 @@ func (db *DB) FetchWorkerTask(ctx context.Context, worker *Worker) (*Task, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {
|
func (db *DB) SaveWorkerStatus(ctx context.Context, w *Worker) error {
|
||||||
err := db.gormDB.WithContext(ctx).
|
queries, err := db.queries()
|
||||||
Model(w).
|
|
||||||
Select("status", "status_requested", "lazy_status_request").
|
|
||||||
Updates(Worker{
|
|
||||||
Status: w.Status,
|
|
||||||
StatusRequested: w.StatusRequested,
|
|
||||||
LazyStatusRequest: w.LazyStatusRequest,
|
|
||||||
}).Error
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("saving worker: %w", err)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = queries.SaveWorkerStatus(ctx, sqlc.SaveWorkerStatusParams{
|
||||||
|
UpdatedAt: db.now(),
|
||||||
|
Status: string(w.Status),
|
||||||
|
StatusRequested: string(w.StatusRequested),
|
||||||
|
LazyStatusRequest: w.LazyStatusRequest,
|
||||||
|
ID: int64(w.ID),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("saving worker status: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
|
func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
|
||||||
if err := db.gormDB.WithContext(ctx).Save(w).Error; err != nil {
|
// TODO: remove this code, and just let the caller call CreateWorker() directly.
|
||||||
|
if w.ID == 0 {
|
||||||
|
return db.CreateWorker(ctx, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
queries, err := db.queries()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = queries.SaveWorker(ctx, sqlc.SaveWorkerParams{
|
||||||
|
UpdatedAt: db.now(),
|
||||||
|
UUID: w.UUID,
|
||||||
|
Secret: w.Secret,
|
||||||
|
Name: w.Name,
|
||||||
|
Address: w.Address,
|
||||||
|
Platform: w.Platform,
|
||||||
|
Software: w.Software,
|
||||||
|
Status: string(w.Status),
|
||||||
|
LastSeenAt: sql.NullTime{Time: w.LastSeenAt, Valid: !w.LastSeenAt.IsZero()},
|
||||||
|
StatusRequested: string(w.StatusRequested),
|
||||||
|
LazyStatusRequest: w.LazyStatusRequest,
|
||||||
|
SupportedTaskTypes: w.SupportedTaskTypes,
|
||||||
|
CanRestart: w.CanRestart,
|
||||||
|
ID: int64(w.ID),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("saving worker: %w", err)
|
return fmt.Errorf("saving worker: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -233,10 +279,18 @@ func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
|
|||||||
|
|
||||||
// WorkerSeen marks the worker as 'seen' by this Manager. This is used for timeout detection.
|
// WorkerSeen marks the worker as 'seen' by this Manager. This is used for timeout detection.
|
||||||
func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error {
|
func (db *DB) WorkerSeen(ctx context.Context, w *Worker) error {
|
||||||
tx := db.gormDB.WithContext(ctx).
|
queries, err := db.queries()
|
||||||
Model(w).
|
if err != nil {
|
||||||
Updates(Worker{LastSeenAt: db.gormDB.NowFunc()})
|
return err
|
||||||
if err := tx.Error; err != nil {
|
}
|
||||||
|
|
||||||
|
now := db.now()
|
||||||
|
err = queries.WorkerSeen(ctx, sqlc.WorkerSeenParams{
|
||||||
|
UpdatedAt: now,
|
||||||
|
LastSeenAt: now,
|
||||||
|
ID: int64(w.ID),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
return workerError(err, "saving worker 'last seen at'")
|
return workerError(err, "saving worker 'last seen at'")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -249,24 +303,19 @@ func (db *DB) SummarizeWorkerStatuses(ctx context.Context) (WorkerStatusCount, e
|
|||||||
logger := log.Ctx(ctx)
|
logger := log.Ctx(ctx)
|
||||||
logger.Debug().Msg("database: summarizing worker statuses")
|
logger.Debug().Msg("database: summarizing worker statuses")
|
||||||
|
|
||||||
// Query the database using a data structure that's easy to handle in GORM.
|
queries, err := db.queries()
|
||||||
type queryResult struct {
|
if err != nil {
|
||||||
Status api.WorkerStatus
|
return nil, err
|
||||||
StatusCount int
|
}
|
||||||
}
|
|
||||||
result := []*queryResult{}
|
rows, err := queries.SummarizeWorkerStatuses(ctx)
|
||||||
tx := db.gormDB.WithContext(ctx).Model(&Worker{}).
|
if err != nil {
|
||||||
Select("status as Status", "count(id) as StatusCount").
|
return nil, workerError(err, "summarizing worker statuses")
|
||||||
Group("status").
|
|
||||||
Scan(&result)
|
|
||||||
if tx.Error != nil {
|
|
||||||
return nil, workerError(tx.Error, "summarizing worker statuses")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert the array-of-structs to a map that's easier to handle by the caller.
|
|
||||||
statusCounts := make(WorkerStatusCount)
|
statusCounts := make(WorkerStatusCount)
|
||||||
for _, singleStatusCount := range result {
|
for _, row := range rows {
|
||||||
statusCounts[singleStatusCount.Status] = singleStatusCount.StatusCount
|
statusCounts[api.WorkerStatus(row.Status)] = int(row.StatusCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
return statusCounts, nil
|
return statusCounts, nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user