diff --git a/.gitignore b/.gitignore index f06739e3..f3dfd208 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ /flamenco-worker_race /shaman-checkout-id-setter /stresser +/job-creator /addon-packer flamenco-manager.yaml flamenco-worker.yaml diff --git a/Makefile b/Makefile index 32ce915b..1440f839 100644 --- a/Makefile +++ b/Makefile @@ -69,6 +69,10 @@ flamenco-worker: stresser: go build -v ${BUILD_FLAGS} ${PKG}/cmd/stresser +.PHONY: job-creator +job-creator: + go build -v ${BUILD_FLAGS} ${PKG}/cmd/job-creator + addon-packer: cmd/addon-packer/addon-packer.go go build -v ${BUILD_FLAGS} ${PKG}/cmd/addon-packer diff --git a/cmd/job-creator/main.go b/cmd/job-creator/main.go new file mode 100644 index 00000000..34699b99 --- /dev/null +++ b/cmd/job-creator/main.go @@ -0,0 +1,261 @@ +package main + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "flag" + "io/fs" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/benbjohnson/clock" + "github.com/mattn/go-colorable" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/appinfo" + "git.blender.org/flamenco/internal/manager/config" + "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +var cliArgs struct { + version bool + jobUUID string +} + +func main() { + output := zerolog.ConsoleWriter{Out: colorable.NewColorableStdout(), TimeFormat: time.RFC3339} + log.Logger = log.Output(output) + log.Info(). + Str("version", appinfo.ApplicationVersion). + Str("git", appinfo.ApplicationGitHash). + Str("releaseCycle", appinfo.ReleaseCycle). + Str("os", runtime.GOOS). + Str("arch", runtime.GOARCH). + Msgf("starting %v job compiler", appinfo.ApplicationName) + + parseCliArgs() + if cliArgs.version { + return + } + + if cliArgs.jobUUID == "" { + log.Fatal().Msg("give me a job UUID to regenerate tasks for") + } + + // Load configuration. + configService := config.NewService() + err := configService.Load() + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Error().Err(err).Msg("loading configuration") + } + + isFirstRun, err := configService.IsFirstRun() + switch { + case err != nil: + log.Fatal().Err(err).Msg("unable to determine whether this is the first run of Flamenco or not") + case isFirstRun: + log.Info().Msg("This seems to be your first run of Flamenco, this tool won't work.") + return + } + + // Construct the services. + persist := openDB(*configService) + defer persist.Close() + + timeService := clock.New() + compiler, err := job_compilers.Load(timeService) + if err != nil { + log.Fatal().Err(err).Msg("error loading job compilers") + } + + // The main context determines the lifetime of the application. All + // long-running goroutines need to keep an eye on this, and stop their work + // once it closes. + mainCtx, mainCtxCancel := context.WithCancel(context.Background()) + defer mainCtxCancel() + + installSignalHandler(mainCtxCancel) + + recompile(mainCtx, cliArgs.jobUUID, persist, compiler) +} + +// recompile regenerates the job's tasks. +func recompile(ctx context.Context, jobUUID string, db *persistence.DB, compiler *job_compilers.Service) { + dbJob, err := db.FetchJob(ctx, jobUUID) + if err != nil { + log.Fatal().Err(err).Msg("could not get job from database") + } + logger := log.With().Str("job", jobUUID).Logger() + logger.Info().Msg("found job") + + dbTasks, err := db.FetchTasksOfJob(ctx, dbJob) + if err != nil { + log.Fatal().Err(err).Msg("could not query database for tasks") + } + if len(dbTasks) > 0 { + // This tool has only been tested with jobs that have had their tasks completely lost. + log.Fatal(). + Int("numTasks", len(dbTasks)). + Msg("this job still has tasks, this is not a situation this tool should be used in") + } + + // Recompile the job. + fakeSubmittedJob := constructSubmittedJob(dbJob) + authoredJob, err := compiler.Compile(ctx, fakeSubmittedJob) + if err != nil { + logger.Fatal().Err(err).Msg("could not recompile job") + } + sanityCheck(logger, dbJob, authoredJob) + + // Store the recompiled tasks. + if err := db.StoreAuthoredJobTaks(ctx, dbJob, authoredJob); err != nil { + logger.Fatal().Err(err).Msg("error storing recompiled tasks") + } + logger.Info().Msg("new tasks have been stored") + + updateTaskStatuses(ctx, logger, db, dbJob) + + logger.Info().Msg("job recompilation seems to have worked out") +} + +func constructSubmittedJob(dbJob *persistence.Job) api.SubmittedJob { + fakeSubmittedJob := api.SubmittedJob{ + Name: dbJob.Name, + Priority: dbJob.Priority, + SubmitterPlatform: "reconstrutor", // The platform shouldn't matter, as all paths have already been replaced. + Type: dbJob.JobType, + TypeEtag: nil, + + Settings: &api.JobSettings{AdditionalProperties: make(map[string]interface{})}, + Metadata: &api.JobMetadata{AdditionalProperties: make(map[string]string)}, + } + + for key, value := range dbJob.Settings { + fakeSubmittedJob.Settings.AdditionalProperties[key] = value + } + for key, value := range dbJob.Metadata { + fakeSubmittedJob.Metadata.AdditionalProperties[key] = value + } + if dbJob.WorkerTag != nil { + fakeSubmittedJob.WorkerTag = &dbJob.WorkerTag.UUID + } else if dbJob.WorkerTagID != nil { + panic("WorkerTagID is set, but WorkerTag is not") + } + + return fakeSubmittedJob +} + +// Check that the authored job is consistent with the original job. +func sanityCheck(logger zerolog.Logger, expect *persistence.Job, actual *job_compilers.AuthoredJob) { + if actual.Name != expect.Name { + logger.Fatal(). + Str("expected", expect.Name). + Str("actual", actual.Name). + Msg("recompilation did not produce expected name") + } + if actual.JobType != expect.JobType { + logger.Fatal(). + Str("expected", expect.JobType). + Str("actual", actual.JobType). + Msg("recompilation did not produce expected job type") + } +} + +func updateTaskStatuses(ctx context.Context, logger zerolog.Logger, db *persistence.DB, dbJob *persistence.Job) { + logger = logger.With().Str("jobStatus", string(dbJob.Status)).Logger() + + // Update the task statuses based on the job status. This is NOT using the + // state machine, as these tasks are not actually going from one state to the + // other. They are just being updated in the database. + taskStatusMap := map[api.JobStatus]api.TaskStatus{ + api.JobStatusActive: api.TaskStatusQueued, + api.JobStatusCancelRequested: api.TaskStatusCanceled, + api.JobStatusCanceled: api.TaskStatusCanceled, + api.JobStatusCompleted: api.TaskStatusCompleted, + api.JobStatusFailed: api.TaskStatusCanceled, + api.JobStatusPaused: api.TaskStatusPaused, + api.JobStatusQueued: api.TaskStatusQueued, + api.JobStatusRequeueing: api.TaskStatusQueued, + api.JobStatusUnderConstruction: api.TaskStatusQueued, + } + newTaskStatus, ok := taskStatusMap[dbJob.Status] + if !ok { + logger.Warn().Msg("unknown job status, not touching task statuses") + return + } + + logger = logger.With().Str("taskStatus", string(newTaskStatus)).Logger() + + err := db.UpdateJobsTaskStatuses(ctx, dbJob, newTaskStatus, "reset task status after job reconstruction") + if err != nil { + logger.Fatal().Msg("could not update task statuses") + } + + logger.Info().Msg("task statuses have been updated based on the job status") +} + +func parseCliArgs() { + var quiet, debug, trace bool + + flag.BoolVar(&cliArgs.version, "version", false, "Shows the application version, then exits.") + flag.BoolVar(&quiet, "quiet", false, "Only log warning-level and worse.") + flag.BoolVar(&debug, "debug", false, "Enable debug-level logging.") + flag.BoolVar(&trace, "trace", false, "Enable trace-level logging.") + flag.StringVar(&cliArgs.jobUUID, "job", "", "Job UUID to regenerate") + + flag.Parse() + + var logLevel zerolog.Level + switch { + case trace: + logLevel = zerolog.TraceLevel + case debug: + logLevel = zerolog.DebugLevel + case quiet: + logLevel = zerolog.WarnLevel + default: + logLevel = zerolog.InfoLevel + } + zerolog.SetGlobalLevel(logLevel) +} + +// openDB opens the database or dies. +func openDB(configService config.Service) *persistence.DB { + dsn := configService.Get().DatabaseDSN + if dsn == "" { + log.Fatal().Msg("configure the database in flamenco-manager.yaml") + } + + dbCtx, dbCtxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dbCtxCancel() + persist, err := persistence.OpenDB(dbCtx, dsn) + if err != nil { + log.Fatal(). + Err(err). + Str("dsn", dsn). + Msg("error opening database") + } + + return persist +} + +// installSignalHandler spawns a goroutine that handles incoming POSIX signals. +func installSignalHandler(cancelFunc context.CancelFunc) { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + signal.Notify(signals, syscall.SIGTERM) + go func() { + for signum := range signals { + log.Info().Str("signal", signum.String()).Msg("signal received, shutting down") + cancelFunc() + } + }() +} diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8f6329d9..745d5d9b 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -162,72 +162,93 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au return jobError(err, "storing job") } - uuidToTask := make(map[string]*Task) - for _, authoredTask := range authoredJob.Tasks { - var commands []Command - for _, authoredCommand := range authoredTask.Commands { - commands = append(commands, Command{ - Name: authoredCommand.Name, - Parameters: StringInterfaceMap(authoredCommand.Parameters), - }) - } - - dbTask := Task{ - Name: authoredTask.Name, - Type: authoredTask.Type, - UUID: authoredTask.UUID, - Job: &dbJob, - Priority: authoredTask.Priority, - Status: api.TaskStatusQueued, - Commands: commands, - // dependencies are stored below. - } - if err := tx.Create(&dbTask).Error; err != nil { - return taskError(err, "storing task: %v", err) - } - - uuidToTask[authoredTask.UUID] = &dbTask - } - - // Store the dependencies between tasks. - for _, authoredTask := range authoredJob.Tasks { - if len(authoredTask.Dependencies) == 0 { - continue - } - - dbTask, ok := uuidToTask[authoredTask.UUID] - if !ok { - return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID) - } - - deps := make([]*Task, len(authoredTask.Dependencies)) - for i, t := range authoredTask.Dependencies { - depTask, ok := uuidToTask[t.UUID] - if !ok { - return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID) - } - deps[i] = depTask - } - dependenciesbatchsize := 1000 - for j := 0; j < len(deps); j += dependenciesbatchsize { - end := j + dependenciesbatchsize - if end > len(deps) { - end = len(deps) - } - currentDeps := deps[j:end] - dbTask.Dependencies = currentDeps - tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) - subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) - if subQuery.Error != nil { - return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) - } - } - } - - return nil + return db.storeAuthoredJobTaks(ctx, tx, &dbJob, &authoredJob) }) } +// StoreAuthoredJobTaks is a low-level function that is only used for recreating an existing job's tasks. +// It stores `authoredJob`'s tasks, but attaches them to the already-persisted `job`. +func (db *DB) StoreAuthoredJobTaks( + ctx context.Context, + job *Job, + authoredJob *job_compilers.AuthoredJob, +) error { + tx := db.gormDB.WithContext(ctx) + return db.storeAuthoredJobTaks(ctx, tx, job, authoredJob) +} + +func (db *DB) storeAuthoredJobTaks( + ctx context.Context, + tx *gorm.DB, + dbJob *Job, + authoredJob *job_compilers.AuthoredJob, +) error { + + uuidToTask := make(map[string]*Task) + for _, authoredTask := range authoredJob.Tasks { + var commands []Command + for _, authoredCommand := range authoredTask.Commands { + commands = append(commands, Command{ + Name: authoredCommand.Name, + Parameters: StringInterfaceMap(authoredCommand.Parameters), + }) + } + + dbTask := Task{ + Name: authoredTask.Name, + Type: authoredTask.Type, + UUID: authoredTask.UUID, + Job: dbJob, + Priority: authoredTask.Priority, + Status: api.TaskStatusQueued, + Commands: commands, + // dependencies are stored below. + } + if err := tx.Create(&dbTask).Error; err != nil { + return taskError(err, "storing task: %v", err) + } + + uuidToTask[authoredTask.UUID] = &dbTask + } + + // Store the dependencies between tasks. + for _, authoredTask := range authoredJob.Tasks { + if len(authoredTask.Dependencies) == 0 { + continue + } + + dbTask, ok := uuidToTask[authoredTask.UUID] + if !ok { + return taskError(nil, "unable to find task %q in the database, even though it was just authored", authoredTask.UUID) + } + + deps := make([]*Task, len(authoredTask.Dependencies)) + for i, t := range authoredTask.Dependencies { + depTask, ok := uuidToTask[t.UUID] + if !ok { + return taskError(nil, "finding task with UUID %q; a task depends on a task that is not part of this job", t.UUID) + } + deps[i] = depTask + } + dependenciesbatchsize := 1000 + for j := 0; j < len(deps); j += dependenciesbatchsize { + end := j + dependenciesbatchsize + if end > len(deps) { + end = len(deps) + } + currentDeps := deps[j:end] + dbTask.Dependencies = currentDeps + tx.Model(&dbTask).Where("UUID = ?", dbTask.UUID) + subQuery := tx.Model(dbTask).Updates(Task{Dependencies: currentDeps}) + if subQuery.Error != nil { + return taskError(subQuery.Error, "error with storing dependencies of task %q issue exists in dependencies %d to %d", authoredTask.UUID, j, end) + } + } + } + + return nil +} + // FetchJob fetches a single job, without fetching its tasks. func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { dbJob := Job{}