Sybren A. Stüvel c16c1f4b15 Refactor: deduplicate job fetching code
Deduplicate API implementation code to fetch a job from the persistence
service.

Almost no functional changes. Checking that the requested job UUID is
actually a valid UUID is now consistently done on all fetches. This is
not a functional change in normal Flamenco operations, where only valid
UUIDs are used anyway.
2022-12-14 13:02:59 +01:00

165 lines
4.8 KiB
Go

// SPDX-License-Identifier: GPL-3.0-or-later
package api_impl
import (
"context"
"errors"
"net/http"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/uuid"
"git.blender.org/flamenco/pkg/api"
)
// fetchJob fetches the job from the database, and sends the appropriate error
// to the HTTP client if it cannot. Returns `nil` in the latter case, and the
// error returned can then be returned from the Echo handler function.
func (f *Flamenco) fetchJob(e echo.Context, logger zerolog.Logger, jobID string) (*persistence.Job, error) {
// TODO: use a timeout for fetching jobs.
// ctx, cancel := context.WithTimeout(e.Request().Context(), fetchJobTimeout)
// defer cancel()
ctx := e.Request().Context()
if !uuid.IsValid(jobID) {
logger.Debug().Msg("invalid job ID received")
return nil, sendAPIError(e, http.StatusBadRequest, "job ID not valid")
}
logger.Debug().Msg("fetching job")
dbJob, err := f.persist.FetchJob(ctx, jobID)
if err != nil {
switch {
case errors.Is(err, persistence.ErrJobNotFound):
return nil, sendAPIError(e, http.StatusNotFound, "no such job")
case errors.Is(err, context.DeadlineExceeded):
logger.Error().Err(err).Msg("timeout fetching job from database")
return nil, sendAPIError(e, http.StatusInternalServerError, "timeout fetching job from database")
default:
logger.Error().Err(err).Msg("error fetching job")
return nil, sendAPIError(e, http.StatusInternalServerError, "error fetching job")
}
}
return dbJob, nil
}
func (f *Flamenco) FetchJob(e echo.Context, jobID string) error {
logger := requestLogger(e).With().
Str("job", jobID).
Logger()
dbJob, err := f.fetchJob(e, logger, jobID)
if dbJob == nil {
// f.fetchJob already sent a response.
return err
}
apiJob := jobDBtoAPI(dbJob)
return e.JSON(http.StatusOK, apiJob)
}
func (f *Flamenco) QueryJobs(e echo.Context) error {
logger := requestLogger(e)
var jobsQuery api.QueryJobsJSONRequestBody
if err := e.Bind(&jobsQuery); err != nil {
logger.Warn().Err(err).Msg("bad request received")
return sendAPIError(e, http.StatusBadRequest, "invalid format")
}
ctx := e.Request().Context()
dbJobs, err := f.persist.QueryJobs(ctx, api.JobsQuery(jobsQuery))
if err != nil {
logger.Warn().Err(err).Msg("error querying for jobs")
return sendAPIError(e, http.StatusInternalServerError, "error querying for jobs")
}
apiJobs := make([]api.Job, len(dbJobs))
for i, dbJob := range dbJobs {
apiJobs[i] = jobDBtoAPI(dbJob)
}
result := api.JobsQueryResult{
Jobs: apiJobs,
}
return e.JSON(http.StatusOK, result)
}
func (f *Flamenco) FetchJobTasks(e echo.Context, jobID string) error {
logger := requestLogger(e).With().
Str("job", jobID).
Logger()
ctx := e.Request().Context()
if !uuid.IsValid(jobID) {
logger.Debug().Msg("invalid job ID received")
return sendAPIError(e, http.StatusBadRequest, "job ID not valid")
}
tasks, err := f.persist.QueryJobTaskSummaries(ctx, jobID)
if err != nil {
logger.Warn().Err(err).Msg("error querying for jobs")
return sendAPIError(e, http.StatusInternalServerError, "error querying for jobs")
}
summaries := make([]api.TaskSummary, len(tasks))
for i, task := range tasks {
summaries[i] = taskDBtoSummary(task)
}
result := api.JobTasksSummary{
Tasks: &summaries,
}
return e.JSON(http.StatusOK, result)
}
func (f *Flamenco) FetchTask(e echo.Context, taskID string) error {
logger := requestLogger(e).With().
Str("task", taskID).
Logger()
ctx := e.Request().Context()
if !uuid.IsValid(taskID) {
logger.Debug().Msg("invalid job ID received")
return sendAPIError(e, http.StatusBadRequest, "job ID not valid")
}
// Fetch & convert the task.
task, err := f.persist.FetchTask(ctx, taskID)
if errors.Is(err, persistence.ErrTaskNotFound) {
logger.Debug().Msg("non-existent task requested")
return sendAPIError(e, http.StatusNotFound, "no such task")
}
if err != nil {
logger.Warn().Err(err).Msg("error fetching task")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task")
}
apiTask := taskDBtoAPI(task)
// Fetch & convert the failure list.
failedWorkers, err := f.persist.FetchTaskFailureList(ctx, task)
if err != nil {
logger.Warn().Err(err).Msg("error fetching task failure list")
return sendAPIError(e, http.StatusInternalServerError, "error fetching task failure list")
}
failedTaskWorkers := make([]api.TaskWorker, len(failedWorkers))
for idx, worker := range failedWorkers {
failedTaskWorkers[idx] = *workerToTaskWorker(worker)
}
apiTask.FailedByWorkers = &failedTaskWorkers
return e.JSON(http.StatusOK, apiTask)
}
func taskDBtoSummary(task *persistence.Task) api.TaskSummary {
return api.TaskSummary{
Id: task.UUID,
Name: task.Name,
Priority: task.Priority,
Status: task.Status,
TaskType: task.Type,
Updated: task.UpdatedAt,
}
}