diff --git a/cmd/job-creator/main.go b/cmd/job-creator/main.go deleted file mode 100644 index 6203569c..00000000 --- a/cmd/job-creator/main.go +++ /dev/null @@ -1,261 +0,0 @@ -package main - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "context" - "errors" - "flag" - "io/fs" - "os" - "os/signal" - "runtime" - "syscall" - "time" - - "github.com/benbjohnson/clock" - "github.com/mattn/go-colorable" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "projects.blender.org/studio/flamenco/internal/appinfo" - "projects.blender.org/studio/flamenco/internal/manager/config" - "projects.blender.org/studio/flamenco/internal/manager/job_compilers" - "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/pkg/api" -) - -var cliArgs struct { - version bool - jobUUID string -} - -func main() { - output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} - log.Logger = log.Output(output) - log.Info(). - Str("version", appinfo.ApplicationVersion). - Str("git", appinfo.ApplicationGitHash). - Str("releaseCycle", appinfo.ReleaseCycle). - Str("os", runtime.GOOS). - Str("arch", runtime.GOARCH). - Msgf("starting %v job compiler", appinfo.ApplicationName) - - parseCliArgs() - if cliArgs.version { - return - } - - if cliArgs.jobUUID == "" { - log.Fatal().Msg("give me a job UUID to regenerate tasks for") - } - - // Load configuration. - configService := config.NewService() - err := configService.Load() - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Error().Err(err).Msg("loading configuration") - } - - isFirstRun, err := configService.IsFirstRun() - switch { - case err != nil: - log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not") - case isFirstRun: - log.Info().Msg("This seems to be your first run of Flamenco, this tool won't work.") - return - } - - // Construct the services. - persist := openDB(*configService) - defer persist.Close() - - timeService := clock.New() - compiler, err := job_compilers.Load(timeService) - if err != nil { - log.Fatal().Err(err).Msg("error loading job compilers") - } - - // The main context determines the lifetime of the application. All - // long-running goroutines need to keep an eye on this, and stop their work - // once it closes. - mainCtx, mainCtxCancel := context.WithCancel(context.Background()) - defer mainCtxCancel() - - installSignalHandler(mainCtxCancel) - - recompile(mainCtx, cliArgs.jobUUID, persist, compiler) -} - -// recompile regenerates the job's tasks. -func recompile(ctx context.Context, jobUUID string, db *persistence.DB, compiler *job_compilers.Service) { - dbJob, err := db.FetchJob(ctx, jobUUID) - if err != nil { - log.Fatal().Err(err).Msg("could not get job from database") - } - logger := log.With().Str("job", jobUUID).Logger() - logger.Info().Msg("found job") - - dbTasks, err := db.FetchTasksOfJob(ctx, dbJob) - if err != nil { - log.Fatal().Err(err).Msg("could not query database for tasks") - } - if len(dbTasks) > 0 { - // This tool has only been tested with jobs that have had their tasks completely lost. - log.Fatal(). - Int("numTasks", len(dbTasks)). - Msg("this job still has tasks, this is not a situation this tool should be used in") - } - - // Recompile the job. - fakeSubmittedJob := constructSubmittedJob(dbJob) - authoredJob, err := compiler.Compile(ctx, fakeSubmittedJob) - if err != nil { - logger.Fatal().Err(err).Msg("could not recompile job") - } - sanityCheck(logger, dbJob, authoredJob) - - // Store the recompiled tasks. - if err := db.StoreAuthoredJobTaks(ctx, dbJob, authoredJob); err != nil { - logger.Fatal().Err(err).Msg("error storing recompiled tasks") - } - logger.Info().Msg("new tasks have been stored") - - updateTaskStatuses(ctx, logger, db, dbJob) - - logger.Info().Msg("job recompilation seems to have worked out") -} - -func constructSubmittedJob(dbJob *persistence.Job) api.SubmittedJob { - fakeSubmittedJob := api.SubmittedJob{ - Name: dbJob.Name, - Priority: dbJob.Priority, - SubmitterPlatform: "reconstrutor", // The platform shouldn't matter, as all paths have already been replaced. - Type: dbJob.JobType, - TypeEtag: nil, - - Settings: &api.JobSettings{AdditionalProperties: make(map[string]interface{})}, - Metadata: &api.JobMetadata{AdditionalProperties: make(map[string]string)}, - } - - for key, value := range dbJob.Settings { - fakeSubmittedJob.Settings.AdditionalProperties[key] = value - } - for key, value := range dbJob.Metadata { - fakeSubmittedJob.Metadata.AdditionalProperties[key] = value - } - if dbJob.WorkerTag != nil { - fakeSubmittedJob.WorkerTag = &dbJob.WorkerTag.UUID - } else if dbJob.WorkerTagID != nil { - panic("WorkerTagID is set, but WorkerTag is not") - } - - return fakeSubmittedJob -} - -// Check that the authored job is consistent with the original job. -func sanityCheck(logger zerolog.Logger, expect *persistence.Job, actual *job_compilers.AuthoredJob) { - if actual.Name != expect.Name { - logger.Fatal(). - Str("expected", expect.Name). - Str("actual", actual.Name). - Msg("recompilation did not produce expected name") - } - if actual.JobType != expect.JobType { - logger.Fatal(). - Str("expected", expect.JobType). - Str("actual", actual.JobType). - Msg("recompilation did not produce expected job type") - } -} - -func updateTaskStatuses(ctx context.Context, logger zerolog.Logger, db *persistence.DB, dbJob *persistence.Job) { - logger = logger.With().Str("jobStatus", string(dbJob.Status)).Logger() - - // Update the task statuses based on the job status. This is NOT using the - // state machine, as these tasks are not actually going from one state to the - // other. They are just being updated in the database. - taskStatusMap := map[api.JobStatus]api.TaskStatus{ - api.JobStatusActive: api.TaskStatusQueued, - api.JobStatusCancelRequested: api.TaskStatusCanceled, - api.JobStatusCanceled: api.TaskStatusCanceled, - api.JobStatusCompleted: api.TaskStatusCompleted, - api.JobStatusFailed: api.TaskStatusCanceled, - api.JobStatusPaused: api.TaskStatusPaused, - api.JobStatusQueued: api.TaskStatusQueued, - api.JobStatusRequeueing: api.TaskStatusQueued, - api.JobStatusUnderConstruction: api.TaskStatusQueued, - } - newTaskStatus, ok := taskStatusMap[dbJob.Status] - if !ok { - logger.Warn().Msg("unknown job status, not touching task statuses") - return - } - - logger = logger.With().Str("taskStatus", string(newTaskStatus)).Logger() - - err := db.UpdateJobsTaskStatuses(ctx, dbJob, newTaskStatus, "reset task status after job reconstruction") - if err != nil { - logger.Fatal().Msg("could not update task statuses") - } - - logger.Info().Msg("task statuses have been updated based on the job status") -} - -func parseCliArgs() { - var quiet, debug, trace bool - - flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") - flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.") - flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.") - flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.") - flag.StringVar(&cliArgs.jobUUID, "job", "", "Job UUID to regenerate") - - flag.Parse() - - var logLevel zerolog.Level - switch { - case trace: - logLevel = zerolog.TraceLevel - case debug: - logLevel = zerolog.DebugLevel - case quiet: - logLevel = zerolog.WarnLevel - default: - logLevel = zerolog.InfoLevel - } - zerolog.SetGlobalLevel(logLevel) -} - -// openDB opens the database or dies. -func openDB(configService config.Service) *persistence.DB { - dsn := configService.Get().DatabaseDSN - if dsn == "" { - log.Fatal().Msg("configure the database in flamenco-manager.yaml") - } - - dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer dbCtxCancel() - persist, err := persistence.OpenDB(dbCtx, dsn) - if err != nil { - log.Fatal(). - Err(err). - Str("dsn", dsn). - Msg("error opening database") - } - - return persist -} - -// installSignalHandler spawns a goroutine that handles incoming POSIX signals. -func installSignalHandler(cancelFunc context.CancelFunc) { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - signal.Notify(signals, syscall.SIGTERM) - go func() { - for signum := range signals { - log.Info().Str("signal", signum.String()).Msg("signal received, shutting down") - cancelFunc() - } - }() -} diff --git a/cmd/shaman-checkout-id-setter/main.go b/cmd/shaman-checkout-id-setter/main.go deleted file mode 100644 index f3a5745f..00000000 --- a/cmd/shaman-checkout-id-setter/main.go +++ /dev/null @@ -1,176 +0,0 @@ -package main - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "bufio" - "context" - "errors" - "flag" - "io/fs" - "os" - "path/filepath" - "runtime" - "strings" - "time" - - "github.com/mattn/go-colorable" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - - "projects.blender.org/studio/flamenco/internal/appinfo" - "projects.blender.org/studio/flamenco/internal/manager/config" - "projects.blender.org/studio/flamenco/internal/manager/persistence" -) - -func main() { - output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} - log.Logger = log.Output(output) - log.Info(). - Str("version", appinfo.ApplicationVersion). - Str("git", appinfo.ApplicationGitHash). - Str("releaseCycle", appinfo.ReleaseCycle). - Str("os", runtime.GOOS). - Str("arch", runtime.GOARCH). - Msgf("starting %v shaman-checkout-id-setter", appinfo.ApplicationName) - - log.Warn().Msg("Use with care, and at your own risk.") - log.Warn().Msg("This is an experimental program, and may ruin your entire Flamenco database.") - log.Warn().Msg("Press Enter to continue.") - _, _ = bufio.NewReader(os.Stdin).ReadBytes('\n') - - parseCliArgs() - - // Load configuration. - configService := config.NewService() - err := configService.Load() - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Error().Err(err).Msg("loading configuration") - } - - // Reject working on a brand new installation. - isFirstRun, err := configService.IsFirstRun() - switch { - case err != nil: - log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not") - case isFirstRun: - log.Fatal().Msg("this should be run on an already-used database") - } - - // Find the {jobs} variable. - vars := configService.ResolveVariables(config.VariableAudienceWorkers, config.VariablePlatform(runtime.GOOS)) - jobsPath := vars["jobs"].Value - if jobsPath == "" { - log.Fatal().Msg("unable to resolve 'jobs' variable") - } - - // Connect to the database. - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer ctxCancel() - persist := openDB(*configService) - defer persist.Close() - - // Get all jobs from the database. - jobs, err := persist.FetchJobs(ctx) - if err != nil { - log.Fatal().Err(err).Msg("unable to fetch jobs") - } - - log.Info().Int("numJobs", len(jobs)).Msg("processing all jobs") - numJobsUpdated := 0 - for _, job := range jobs { - logger := log.With().Uint("id", job.ID).Str("uuid", job.UUID).Logger() - - if job.Storage.ShamanCheckoutID != "" { - logger.Info().Str("checkoutID", job.Storage.ShamanCheckoutID).Msg("job already has a Shaman checkout ID") - continue - } - - logger.Trace().Msg("processing job") - - // Find the 'blendfile' setting. - blendfile, ok := job.Settings["blendfile"].(string) - if !ok { - logger.Info().Msg("skipping job, it has no `blendfile` setting") - continue - } - - // See if it starts with `{jobs}`, otherwise it's not submitted via Shaman. - relpath, found := strings.CutPrefix(blendfile, "{jobs}"+string(os.PathSeparator)) - if !found { - logger.Info().Str("blendfile", blendfile).Msg("skipping job, its blendfile setting doesn't start with `{jobs}/`") - continue - } - - // See if there is a `pack-info.txt` file next to the blend file. This is - // another indication that we have the right directory. - packInfoPath := filepath.Join(jobsPath, filepath.Dir(relpath), "pack-info.txt") - _, err := os.Stat(packInfoPath) - switch { - case errors.Is(err, os.ErrNotExist): - logger.Warn().Str("packInfo", packInfoPath).Msg("skipping job, pack-info.txt not found where expected") - continue - case err != nil: - logger.Fatal().Str("packInfo", packInfoPath).Msg("error accessing pack-info.txt") - } - - // Extract the checkout ID from the blend file path. - checkoutID := filepath.Dir(relpath) - logger = logger.With().Str("checkoutID", checkoutID).Logger() - - // Store it on the job. - logger.Debug().Msg("updating job") - job.Storage.ShamanCheckoutID = checkoutID - if err := persist.SaveJobStorageInfo(ctx, job); err != nil { - logger.Error().Err(err).Msg("error saving job to the database") - continue - } - - numJobsUpdated++ - } - - log.Info().Msgf("done, updated %d of %d jobs", numJobsUpdated, len(jobs)) -} - -func parseCliArgs() { - var quiet, debug, trace bool - - flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.") - flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.") - flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.") - - flag.Parse() - - var logLevel zerolog.Level - switch { - case trace: - logLevel = zerolog.TraceLevel - case debug: - logLevel = zerolog.DebugLevel - case quiet: - logLevel = zerolog.WarnLevel - default: - logLevel = zerolog.InfoLevel - } - zerolog.SetGlobalLevel(logLevel) -} - -// openDB opens the database or dies. -func openDB(configService config.Service) *persistence.DB { - dsn := configService.Get().DatabaseDSN - if dsn == "" { - log.Fatal().Msg("configure the database in flamenco-manager.yaml") - } - - dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer dbCtxCancel() - persist, err := persistence.OpenDB(dbCtx, dsn) - if err != nil { - log.Fatal(). - Err(err). - Str("dsn", dsn). - Msg("error opening database") - } - - return persist -}