Delete cmd/job-creator
and cmd/shaman-checkout-id-setter
These two tools were necessary to work around certain limitations of earlier versions of Flamenco, and are no longer needed. And they're getting in the way of other development.
This commit is contained in:
parent
406e1ee655
commit
977247a117
@ -1,261 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"flag"
|
|
||||||
"io/fs"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"runtime"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
|
||||||
"github.com/mattn/go-colorable"
|
|
||||||
"github.com/rs/zerolog"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
|
|
||||||
"projects.blender.org/studio/flamenco/internal/appinfo"
|
|
||||||
"projects.blender.org/studio/flamenco/internal/manager/config"
|
|
||||||
"projects.blender.org/studio/flamenco/internal/manager/job_compilers"
|
|
||||||
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
||||||
"projects.blender.org/studio/flamenco/pkg/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
var cliArgs struct {
|
|
||||||
version bool
|
|
||||||
jobUUID string
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
|
|
||||||
log.Logger = log.Output(output)
|
|
||||||
log.Info().
|
|
||||||
Str("version", appinfo.ApplicationVersion).
|
|
||||||
Str("git", appinfo.ApplicationGitHash).
|
|
||||||
Str("releaseCycle", appinfo.ReleaseCycle).
|
|
||||||
Str("os", runtime.GOOS).
|
|
||||||
Str("arch", runtime.GOARCH).
|
|
||||||
Msgf("starting %v job compiler", appinfo.ApplicationName)
|
|
||||||
|
|
||||||
parseCliArgs()
|
|
||||||
if cliArgs.version {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if cliArgs.jobUUID == "" {
|
|
||||||
log.Fatal().Msg("give me a job UUID to regenerate tasks for")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load configuration.
|
|
||||||
configService := config.NewService()
|
|
||||||
err := configService.Load()
|
|
||||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
||||||
log.Error().Err(err).Msg("loading configuration")
|
|
||||||
}
|
|
||||||
|
|
||||||
isFirstRun, err := configService.IsFirstRun()
|
|
||||||
switch {
|
|
||||||
case err != nil:
|
|
||||||
log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not")
|
|
||||||
case isFirstRun:
|
|
||||||
log.Info().Msg("This seems to be your first run of Flamenco, this tool won't work.")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Construct the services.
|
|
||||||
persist := openDB(*configService)
|
|
||||||
defer persist.Close()
|
|
||||||
|
|
||||||
timeService := clock.New()
|
|
||||||
compiler, err := job_compilers.Load(timeService)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("error loading job compilers")
|
|
||||||
}
|
|
||||||
|
|
||||||
// The main context determines the lifetime of the application. All
|
|
||||||
// long-running goroutines need to keep an eye on this, and stop their work
|
|
||||||
// once it closes.
|
|
||||||
mainCtx, mainCtxCancel := context.WithCancel(context.Background())
|
|
||||||
defer mainCtxCancel()
|
|
||||||
|
|
||||||
installSignalHandler(mainCtxCancel)
|
|
||||||
|
|
||||||
recompile(mainCtx, cliArgs.jobUUID, persist, compiler)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recompile regenerates the job's tasks.
|
|
||||||
func recompile(ctx context.Context, jobUUID string, db *persistence.DB, compiler *job_compilers.Service) {
|
|
||||||
dbJob, err := db.FetchJob(ctx, jobUUID)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("could not get job from database")
|
|
||||||
}
|
|
||||||
logger := log.With().Str("job", jobUUID).Logger()
|
|
||||||
logger.Info().Msg("found job")
|
|
||||||
|
|
||||||
dbTasks, err := db.FetchTasksOfJob(ctx, dbJob)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("could not query database for tasks")
|
|
||||||
}
|
|
||||||
if len(dbTasks) > 0 {
|
|
||||||
// This tool has only been tested with jobs that have had their tasks completely lost.
|
|
||||||
log.Fatal().
|
|
||||||
Int("numTasks", len(dbTasks)).
|
|
||||||
Msg("this job still has tasks, this is not a situation this tool should be used in")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recompile the job.
|
|
||||||
fakeSubmittedJob := constructSubmittedJob(dbJob)
|
|
||||||
authoredJob, err := compiler.Compile(ctx, fakeSubmittedJob)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal().Err(err).Msg("could not recompile job")
|
|
||||||
}
|
|
||||||
sanityCheck(logger, dbJob, authoredJob)
|
|
||||||
|
|
||||||
// Store the recompiled tasks.
|
|
||||||
if err := db.StoreAuthoredJobTaks(ctx, dbJob, authoredJob); err != nil {
|
|
||||||
logger.Fatal().Err(err).Msg("error storing recompiled tasks")
|
|
||||||
}
|
|
||||||
logger.Info().Msg("new tasks have been stored")
|
|
||||||
|
|
||||||
updateTaskStatuses(ctx, logger, db, dbJob)
|
|
||||||
|
|
||||||
logger.Info().Msg("job recompilation seems to have worked out")
|
|
||||||
}
|
|
||||||
|
|
||||||
func constructSubmittedJob(dbJob *persistence.Job) api.SubmittedJob {
|
|
||||||
fakeSubmittedJob := api.SubmittedJob{
|
|
||||||
Name: dbJob.Name,
|
|
||||||
Priority: dbJob.Priority,
|
|
||||||
SubmitterPlatform: "reconstrutor", // The platform shouldn't matter, as all paths have already been replaced.
|
|
||||||
Type: dbJob.JobType,
|
|
||||||
TypeEtag: nil,
|
|
||||||
|
|
||||||
Settings: &api.JobSettings{AdditionalProperties: make(map[string]interface{})},
|
|
||||||
Metadata: &api.JobMetadata{AdditionalProperties: make(map[string]string)},
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, value := range dbJob.Settings {
|
|
||||||
fakeSubmittedJob.Settings.AdditionalProperties[key] = value
|
|
||||||
}
|
|
||||||
for key, value := range dbJob.Metadata {
|
|
||||||
fakeSubmittedJob.Metadata.AdditionalProperties[key] = value
|
|
||||||
}
|
|
||||||
if dbJob.WorkerTag != nil {
|
|
||||||
fakeSubmittedJob.WorkerTag = &dbJob.WorkerTag.UUID
|
|
||||||
} else if dbJob.WorkerTagID != nil {
|
|
||||||
panic("WorkerTagID is set, but WorkerTag is not")
|
|
||||||
}
|
|
||||||
|
|
||||||
return fakeSubmittedJob
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check that the authored job is consistent with the original job.
|
|
||||||
func sanityCheck(logger zerolog.Logger, expect *persistence.Job, actual *job_compilers.AuthoredJob) {
|
|
||||||
if actual.Name != expect.Name {
|
|
||||||
logger.Fatal().
|
|
||||||
Str("expected", expect.Name).
|
|
||||||
Str("actual", actual.Name).
|
|
||||||
Msg("recompilation did not produce expected name")
|
|
||||||
}
|
|
||||||
if actual.JobType != expect.JobType {
|
|
||||||
logger.Fatal().
|
|
||||||
Str("expected", expect.JobType).
|
|
||||||
Str("actual", actual.JobType).
|
|
||||||
Msg("recompilation did not produce expected job type")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateTaskStatuses(ctx context.Context, logger zerolog.Logger, db *persistence.DB, dbJob *persistence.Job) {
|
|
||||||
logger = logger.With().Str("jobStatus", string(dbJob.Status)).Logger()
|
|
||||||
|
|
||||||
// Update the task statuses based on the job status. This is NOT using the
|
|
||||||
// state machine, as these tasks are not actually going from one state to the
|
|
||||||
// other. They are just being updated in the database.
|
|
||||||
taskStatusMap := map[api.JobStatus]api.TaskStatus{
|
|
||||||
api.JobStatusActive: api.TaskStatusQueued,
|
|
||||||
api.JobStatusCancelRequested: api.TaskStatusCanceled,
|
|
||||||
api.JobStatusCanceled: api.TaskStatusCanceled,
|
|
||||||
api.JobStatusCompleted: api.TaskStatusCompleted,
|
|
||||||
api.JobStatusFailed: api.TaskStatusCanceled,
|
|
||||||
api.JobStatusPaused: api.TaskStatusPaused,
|
|
||||||
api.JobStatusQueued: api.TaskStatusQueued,
|
|
||||||
api.JobStatusRequeueing: api.TaskStatusQueued,
|
|
||||||
api.JobStatusUnderConstruction: api.TaskStatusQueued,
|
|
||||||
}
|
|
||||||
newTaskStatus, ok := taskStatusMap[dbJob.Status]
|
|
||||||
if !ok {
|
|
||||||
logger.Warn().Msg("unknown job status, not touching task statuses")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger = logger.With().Str("taskStatus", string(newTaskStatus)).Logger()
|
|
||||||
|
|
||||||
err := db.UpdateJobsTaskStatuses(ctx, dbJob, newTaskStatus, "reset task status after job reconstruction")
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatal().Msg("could not update task statuses")
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info().Msg("task statuses have been updated based on the job status")
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseCliArgs() {
|
|
||||||
var quiet, debug, trace bool
|
|
||||||
|
|
||||||
flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.")
|
|
||||||
flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.")
|
|
||||||
flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.")
|
|
||||||
flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.")
|
|
||||||
flag.StringVar(&cliArgs.jobUUID, "job", "", "Job UUID to regenerate")
|
|
||||||
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
var logLevel zerolog.Level
|
|
||||||
switch {
|
|
||||||
case trace:
|
|
||||||
logLevel = zerolog.TraceLevel
|
|
||||||
case debug:
|
|
||||||
logLevel = zerolog.DebugLevel
|
|
||||||
case quiet:
|
|
||||||
logLevel = zerolog.WarnLevel
|
|
||||||
default:
|
|
||||||
logLevel = zerolog.InfoLevel
|
|
||||||
}
|
|
||||||
zerolog.SetGlobalLevel(logLevel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// openDB opens the database or dies.
|
|
||||||
func openDB(configService config.Service) *persistence.DB {
|
|
||||||
dsn := configService.Get().DatabaseDSN
|
|
||||||
if dsn == "" {
|
|
||||||
log.Fatal().Msg("configure the database in flamenco-manager.yaml")
|
|
||||||
}
|
|
||||||
|
|
||||||
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer dbCtxCancel()
|
|
||||||
persist, err := persistence.OpenDB(dbCtx, dsn)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().
|
|
||||||
Err(err).
|
|
||||||
Str("dsn", dsn).
|
|
||||||
Msg("error opening database")
|
|
||||||
}
|
|
||||||
|
|
||||||
return persist
|
|
||||||
}
|
|
||||||
|
|
||||||
// installSignalHandler spawns a goroutine that handles incoming POSIX signals.
|
|
||||||
func installSignalHandler(cancelFunc context.CancelFunc) {
|
|
||||||
signals := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(signals, os.Interrupt)
|
|
||||||
signal.Notify(signals, syscall.SIGTERM)
|
|
||||||
go func() {
|
|
||||||
for signum := range signals {
|
|
||||||
log.Info().Str("signal", signum.String()).Msg("signal received, shutting down")
|
|
||||||
cancelFunc()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
@ -1,176 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"flag"
|
|
||||||
"io/fs"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"runtime"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/mattn/go-colorable"
|
|
||||||
"github.com/rs/zerolog"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
|
|
||||||
"projects.blender.org/studio/flamenco/internal/appinfo"
|
|
||||||
"projects.blender.org/studio/flamenco/internal/manager/config"
|
|
||||||
"projects.blender.org/studio/flamenco/internal/manager/persistence"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339}
|
|
||||||
log.Logger = log.Output(output)
|
|
||||||
log.Info().
|
|
||||||
Str("version", appinfo.ApplicationVersion).
|
|
||||||
Str("git", appinfo.ApplicationGitHash).
|
|
||||||
Str("releaseCycle", appinfo.ReleaseCycle).
|
|
||||||
Str("os", runtime.GOOS).
|
|
||||||
Str("arch", runtime.GOARCH).
|
|
||||||
Msgf("starting %v shaman-checkout-id-setter", appinfo.ApplicationName)
|
|
||||||
|
|
||||||
log.Warn().Msg("Use with care, and at your own risk.")
|
|
||||||
log.Warn().Msg("This is an experimental program, and may ruin your entire Flamenco database.")
|
|
||||||
log.Warn().Msg("Press Enter to continue.")
|
|
||||||
_, _ = bufio.NewReader(os.Stdin).ReadBytes('\n')
|
|
||||||
|
|
||||||
parseCliArgs()
|
|
||||||
|
|
||||||
// Load configuration.
|
|
||||||
configService := config.NewService()
|
|
||||||
err := configService.Load()
|
|
||||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
||||||
log.Error().Err(err).Msg("loading configuration")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reject working on a brand new installation.
|
|
||||||
isFirstRun, err := configService.IsFirstRun()
|
|
||||||
switch {
|
|
||||||
case err != nil:
|
|
||||||
log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not")
|
|
||||||
case isFirstRun:
|
|
||||||
log.Fatal().Msg("this should be run on an already-used database")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the {jobs} variable.
|
|
||||||
vars := configService.ResolveVariables(config.VariableAudienceWorkers, config.VariablePlatform(runtime.GOOS))
|
|
||||||
jobsPath := vars["jobs"].Value
|
|
||||||
if jobsPath == "" {
|
|
||||||
log.Fatal().Msg("unable to resolve 'jobs' variable")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to the database.
|
|
||||||
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
||||||
defer ctxCancel()
|
|
||||||
persist := openDB(*configService)
|
|
||||||
defer persist.Close()
|
|
||||||
|
|
||||||
// Get all jobs from the database.
|
|
||||||
jobs, err := persist.FetchJobs(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("unable to fetch jobs")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Int("numJobs", len(jobs)).Msg("processing all jobs")
|
|
||||||
numJobsUpdated := 0
|
|
||||||
for _, job := range jobs {
|
|
||||||
logger := log.With().Uint("id", job.ID).Str("uuid", job.UUID).Logger()
|
|
||||||
|
|
||||||
if job.Storage.ShamanCheckoutID != "" {
|
|
||||||
logger.Info().Str("checkoutID", job.Storage.ShamanCheckoutID).Msg("job already has a Shaman checkout ID")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Trace().Msg("processing job")
|
|
||||||
|
|
||||||
// Find the 'blendfile' setting.
|
|
||||||
blendfile, ok := job.Settings["blendfile"].(string)
|
|
||||||
if !ok {
|
|
||||||
logger.Info().Msg("skipping job, it has no `blendfile` setting")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if it starts with `{jobs}`, otherwise it's not submitted via Shaman.
|
|
||||||
relpath, found := strings.CutPrefix(blendfile, "{jobs}"+string(os.PathSeparator))
|
|
||||||
if !found {
|
|
||||||
logger.Info().Str("blendfile", blendfile).Msg("skipping job, its blendfile setting doesn't start with `{jobs}/`")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// See if there is a `pack-info.txt` file next to the blend file. This is
|
|
||||||
// another indication that we have the right directory.
|
|
||||||
packInfoPath := filepath.Join(jobsPath, filepath.Dir(relpath), "pack-info.txt")
|
|
||||||
_, err := os.Stat(packInfoPath)
|
|
||||||
switch {
|
|
||||||
case errors.Is(err, os.ErrNotExist):
|
|
||||||
logger.Warn().Str("packInfo", packInfoPath).Msg("skipping job, pack-info.txt not found where expected")
|
|
||||||
continue
|
|
||||||
case err != nil:
|
|
||||||
logger.Fatal().Str("packInfo", packInfoPath).Msg("error accessing pack-info.txt")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract the checkout ID from the blend file path.
|
|
||||||
checkoutID := filepath.Dir(relpath)
|
|
||||||
logger = logger.With().Str("checkoutID", checkoutID).Logger()
|
|
||||||
|
|
||||||
// Store it on the job.
|
|
||||||
logger.Debug().Msg("updating job")
|
|
||||||
job.Storage.ShamanCheckoutID = checkoutID
|
|
||||||
if err := persist.SaveJobStorageInfo(ctx, job); err != nil {
|
|
||||||
logger.Error().Err(err).Msg("error saving job to the database")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
numJobsUpdated++
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info().Msgf("done, updated %d of %d jobs", numJobsUpdated, len(jobs))
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseCliArgs() {
|
|
||||||
var quiet, debug, trace bool
|
|
||||||
|
|
||||||
flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.")
|
|
||||||
flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.")
|
|
||||||
flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.")
|
|
||||||
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
var logLevel zerolog.Level
|
|
||||||
switch {
|
|
||||||
case trace:
|
|
||||||
logLevel = zerolog.TraceLevel
|
|
||||||
case debug:
|
|
||||||
logLevel = zerolog.DebugLevel
|
|
||||||
case quiet:
|
|
||||||
logLevel = zerolog.WarnLevel
|
|
||||||
default:
|
|
||||||
logLevel = zerolog.InfoLevel
|
|
||||||
}
|
|
||||||
zerolog.SetGlobalLevel(logLevel)
|
|
||||||
}
|
|
||||||
|
|
||||||
// openDB opens the database or dies.
|
|
||||||
func openDB(configService config.Service) *persistence.DB {
|
|
||||||
dsn := configService.Get().DatabaseDSN
|
|
||||||
if dsn == "" {
|
|
||||||
log.Fatal().Msg("configure the database in flamenco-manager.yaml")
|
|
||||||
}
|
|
||||||
|
|
||||||
dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer dbCtxCancel()
|
|
||||||
persist, err := persistence.OpenDB(dbCtx, dsn)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().
|
|
||||||
Err(err).
|
|
||||||
Str("dsn", dsn).
|
|
||||||
Msg("error opening database")
|
|
||||||
}
|
|
||||||
|
|
||||||
return persist
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user