diff --git a/addon/flamenco/bat/interface.py b/addon/flamenco/bat/interface.py index f716b258..4a26804a 100644 --- a/addon/flamenco/bat/interface.py +++ b/addon/flamenco/bat/interface.py @@ -2,7 +2,7 @@ """BAT packing interface for Flamenco.""" from dataclasses import dataclass -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Optional, Any import logging import queue @@ -48,6 +48,9 @@ class MsgProgress(Message): @dataclass class MsgDone(Message): output_path: Path + """Path of the submitted blend file, relative to the Shaman checkout root.""" + actual_checkout_path: PurePosixPath + """Shaman checkout path, i.e. the root of the job files, relative to the Shaman checkout root.""" missing_files: list[Path] @@ -163,7 +166,11 @@ class PackThread(threading.Thread): log.debug("done") self._set_bat_status("DONE") - msg = MsgDone(self.packer.output_path, self.packer.missing_files) + msg = MsgDone( + self.packer.output_path, + self.packer.actual_checkout_path, + self.packer.missing_files, + ) self.queue.put(msg) def _set_bat_status(self, status: str) -> None: diff --git a/addon/flamenco/bat/shaman.py b/addon/flamenco/bat/shaman.py index f80301b1..dba0aa60 100644 --- a/addon/flamenco/bat/shaman.py +++ b/addon/flamenco/bat/shaman.py @@ -69,13 +69,22 @@ class Packer(submodules.pack.Packer): # type: ignore def output_path(self) -> PurePath: """The path of the packed blend file in the target directory.""" assert self._output_path is not None - assert self.shaman_transferrer is not None - checkout_root = PurePosixPath(self.shaman_transferrer.checkout_path) rel_output = self._output_path.relative_to(self._target_path) - out_path: PurePath = checkout_root / rel_output + out_path: PurePath = self.actual_checkout_path / rel_output return out_path + @property + def actual_checkout_path(self) -> PurePosixPath: + """The actual Shaman checkout path. + + Only valid after packing is complete. Shaman ensures that the checkout + is unique, and thus the actual path can be different than the requested + one. + """ + assert self.shaman_transferrer is not None + return PurePosixPath(self.shaman_transferrer.checkout_path) + def execute(self): try: super().execute() diff --git a/addon/flamenco/job_submission.py b/addon/flamenco/job_submission.py index e1a6185b..1e85f26e 100644 --- a/addon/flamenco/job_submission.py +++ b/addon/flamenco/job_submission.py @@ -80,6 +80,15 @@ def set_blend_file( job.settings[BLENDFILE_SETTING_KEY] = str(blendfile) +def set_shaman_checkout_id(job: _SubmittedJob, checkout_id: PurePosixPath) -> None: + from flamenco.manager.models import JobStorageInfo + + # The job.storage attribute doesn't even exist if it's not set. + if getattr(job, "storage", None) is None: + job.storage = JobStorageInfo() + job.storage.shaman_checkout_id = checkout_id.as_posix() + + def submit_job(job: _SubmittedJob, api_client: _ApiClient) -> _Job: """Send the given job to Flamenco Manager.""" from flamenco.manager import ApiClient diff --git a/addon/flamenco/operators.py b/addon/flamenco/operators.py index 4b6127bb..82a243f6 100644 --- a/addon/flamenco/operators.py +++ b/addon/flamenco/operators.py @@ -130,6 +130,8 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): bl_options = {"REGISTER"} # No UNDO. blendfile_on_farm: Optional[PurePosixPath] = None + actual_shaman_checkout_path: Optional[PurePosixPath] = None + job_name: bpy.props.StringProperty(name="Job Name") # type: ignore job: Optional[_SubmittedJob] = None temp_blendfile: Optional[Path] = None @@ -377,6 +379,19 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): return PurePosixPath(pack_target_file.as_posix()) + def _shaman_checkout_path(self) -> PurePosixPath: + """Construct the Shaman checkout path, aka Shaman Checkout ID. + + Note that this may not be the actually used checkout ID, as that will be + made unique to this job by Flamenco Manager. That will be stored in + self.actual_shaman_checkout_path after the Shaman checkout is actually + done. + """ + assert self.job is not None + + # TODO: get project name from preferences/GUI and insert that here too. + return PurePosixPath(f"{self.job.name}") + def _bat_pack_shaman(self, context: bpy.types.Context, blendfile: Path) -> None: """Use the Manager's Shaman API to submit the BAT pack. @@ -390,9 +405,6 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): assert self.job is not None self.log.info("Sending BAT pack to Shaman") - # TODO: get project name from preferences/GUI and insert that here too. - checkout_root = PurePosixPath(f"{self.job.name}") - self.packthread = bat_interface.copy( base_blendfile=blendfile, project=blendfile.parent, # TODO: get from preferences/GUI. @@ -402,7 +414,7 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): packer_class=bat_shaman.Packer, packer_kwargs=dict( api_client=self.get_api_client(context), - checkout_path=checkout_root, + checkout_path=self._shaman_checkout_path(), ), ) @@ -422,6 +434,7 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): # resolve to the job storage directory. self.blendfile_on_farm = PurePosixPath("{jobs}") / msg.output_path + self.actual_shaman_checkout_path = msg.actual_checkout_path self._submit_job(context) return self._quit(context) @@ -449,6 +462,10 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): # The blend file is contained in the job storage path, no need to # copy anything. self.blendfile_on_farm = bpathlib.make_absolute(blendfile) + + # No Shaman is involved when using the file directly. + self.actual_shaman_checkout_path = None + self._submit_job(context) def _prepare_job_for_submission(self, context: bpy.types.Context) -> bool: @@ -466,9 +483,16 @@ class FLAMENCO_OT_submit_job(FlamencoOpMixin, bpy.types.Operator): propgroup.eval_hidden_settings_of_job(context, self.job) job_submission.set_blend_file( - propgroup.job_type, self.job, self.blendfile_on_farm + propgroup.job_type, + self.job, + # self.blendfile_on_farm is None when we're just checking the job. + self.blendfile_on_farm or "dummy-for-job-check.blend", ) + if self.actual_shaman_checkout_path: + job_submission.set_shaman_checkout_id( + self.job, self.actual_shaman_checkout_path + ) return True diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index ca4a7479..13192fe9 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -556,6 +556,12 @@ func jobDBtoAPI(dbJob *persistence.Job) api.Job { apiJob.Settings = &api.JobSettings{AdditionalProperties: dbJob.Settings} apiJob.Metadata = &api.JobMetadata{AdditionalProperties: dbJob.Metadata} + if dbJob.Storage.ShamanCheckoutID != "" { + apiJob.Storage = &api.JobStorageInfo{ + ShamanCheckoutId: &dbJob.Storage.ShamanCheckoutID, + } + } + return apiJob } diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index a6d5b4d4..98d08588 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -242,6 +242,83 @@ func TestSubmitJobWithEtag(t *testing.T) { } } +func TestSubmitJobWithShamanCheckoutID(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + submittedJob := api.SubmittedJob{ + Name: "поднео посао", + Type: "test", + Priority: 50, + SubmitterPlatform: worker.Platform, + Storage: &api.JobStorageInfo{ + ShamanCheckoutId: ptr("Весы/Синтел"), + }, + } + + mf.expectConvertTwoWayVariables(t, + config.VariableAudienceWorkers, + config.VariablePlatform(worker.Platform), + map[string]string{}, + ) + + // Expect the job compiler to be called. + authoredJob := job_compilers.AuthoredJob{ + JobID: "afc47568-bd9d-4368-8016-e91d945db36d", + Name: submittedJob.Name, + JobType: submittedJob.Type, + Priority: submittedJob.Priority, + Status: api.JobStatusUnderConstruction, + Created: mf.clock.Now(), + + Storage: job_compilers.JobStorageInfo{ + ShamanCheckoutID: "Весы/Синтел", + }, + } + mf.jobCompiler.EXPECT().Compile(gomock.Any(), submittedJob).Return(&authoredJob, nil) + + // Expect the job to be saved with 'queued' status: + queuedJob := authoredJob + queuedJob.Status = api.JobStatusQueued + + mf.persistence.EXPECT().StoreAuthoredJob(gomock.Any(), queuedJob).Return(nil) + + // Expect the job to be fetched from the database again: + dbJob := persistence.Job{ + UUID: queuedJob.JobID, + Name: queuedJob.Name, + JobType: queuedJob.JobType, + Priority: queuedJob.Priority, + Status: queuedJob.Status, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + Storage: persistence.JobStorageInfo{ + ShamanCheckoutID: "Весы/Синтел", + }, + } + mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil) + + // Expect the new job to be broadcast. + jobUpdate := api.SocketIOJobUpdate{ + Id: dbJob.UUID, + Name: &dbJob.Name, + Priority: dbJob.Priority, + Status: dbJob.Status, + Type: dbJob.JobType, + Updated: dbJob.UpdatedAt, + } + mf.broadcaster.EXPECT().BroadcastNewJob(jobUpdate) + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(submittedJob) + requestWorkerStore(echoCtx, &worker) + err := mf.flamenco.SubmitJob(echoCtx) + assert.NoError(t, err) +} + func TestGetJobTypeHappy(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/job_compilers/author.go b/internal/manager/job_compilers/author.go index 87e0b7bf..95ec1ff3 100644 --- a/internal/manager/job_compilers/author.go +++ b/internal/manager/job_compilers/author.go @@ -30,6 +30,7 @@ type AuthoredJob struct { Settings JobSettings Metadata JobMetadata + Storage JobStorageInfo Tasks []AuthoredTask } @@ -37,6 +38,10 @@ type AuthoredJob struct { type JobSettings map[string]interface{} type JobMetadata map[string]string +type JobStorageInfo struct { + ShamanCheckoutID string +} + type AuthoredTask struct { // Tasks already get their UUID in the authoring stage. This makes it simpler // to store the dependencies, as the code doesn't have to worry about value diff --git a/internal/manager/job_compilers/job_compilers.go b/internal/manager/job_compilers/job_compilers.go index c36804fb..9d9f3452 100644 --- a/internal/manager/job_compilers/job_compilers.go +++ b/internal/manager/job_compilers/job_compilers.go @@ -123,6 +123,10 @@ func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJo } } + if sj.Storage != nil && sj.Storage.ShamanCheckoutId != nil { + aj.Storage.ShamanCheckoutID = *sj.Storage.ShamanCheckoutId + } + compiler, err := vm.getCompileJob() if err != nil { return nil, err diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index 64f12b0a..998c265a 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -10,6 +10,7 @@ func (db *DB) migrate() error { err := db.gormDB.AutoMigrate( &Job{}, &JobBlock{}, + &JobStorageInfo{}, &LastRendered{}, &SleepSchedule{}, &Task{}, diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index e40bdf76..ab28335e 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -30,11 +30,21 @@ type Job struct { Settings StringInterfaceMap `gorm:"type:jsonb"` Metadata StringStringMap `gorm:"type:jsonb"` + + Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"` } type StringInterfaceMap map[string]interface{} type StringStringMap map[string]string +// JobStorageInfo contains info about where the job files are stored. It is +// intended to be used when removing a job, which may include the removal of its +// files. +type JobStorageInfo struct { + // ShamanCheckoutID is only set when the job was actually using Shaman storage. + ShamanCheckoutID string `gorm:"type:varchar(255);default:''"` +} + type Task struct { Model UUID string `gorm:"type:char(36);default:'';unique;index"` @@ -122,6 +132,9 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au Priority: authoredJob.Priority, Settings: StringInterfaceMap(authoredJob.Settings), Metadata: StringStringMap(authoredJob.Metadata), + Storage: JobStorageInfo{ + ShamanCheckoutID: authoredJob.Storage.ShamanCheckoutID, + }, } if err := tx.Create(&dbJob).Error; err != nil { diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 0666d37e..6d78678d 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "git.blender.org/flamenco/internal/manager/job_compilers" @@ -37,6 +38,7 @@ func TestStoreAuthoredJob(t *testing.T) { assert.Equal(t, api.JobStatusUnderConstruction, fetchedJob.Status) assert.EqualValues(t, map[string]interface{}(job.Settings), fetchedJob.Settings) assert.EqualValues(t, map[string]string(job.Metadata), fetchedJob.Metadata) + assert.Equal(t, "", fetchedJob.Storage.ShamanCheckoutID) // Fetch tasks of job. var dbJob Job @@ -56,6 +58,23 @@ func TestStoreAuthoredJob(t *testing.T) { assert.Equal(t, api.TaskStatusQueued, tasks[2].Status) } +func TestStoreAuthoredJobWithShamanCheckoutID(t *testing.T) { + ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) + defer cancel() + + job := createTestAuthoredJobWithTasks() + job.Storage.ShamanCheckoutID = "één/twee" + + err := db.StoreAuthoredJob(ctx, job) + require.NoError(t, err) + + fetchedJob, err := db.FetchJob(ctx, job.JobID) + require.NoError(t, err) + require.NotNil(t, fetchedJob) + + assert.Equal(t, job.Storage.ShamanCheckoutID, fetchedJob.Storage.ShamanCheckoutID) +} + func TestDeleteJob(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, 1*time.Second) defer cancel() diff --git a/pkg/shaman/checkout/checkout.go b/pkg/shaman/checkout/checkout.go index 2144ef0c..07b49ca5 100644 --- a/pkg/shaman/checkout/checkout.go +++ b/pkg/shaman/checkout/checkout.go @@ -20,16 +20,20 @@ var ( validCheckoutRegexp = regexp.MustCompile(`^[^/?*:;{}\\][^?*:;{}\\]*$`) ) +// Checkout symlinks the requested files into the checkout directory. +// Returns the actually-used checkout directory, relative to the configured checkout root. func (m *Manager) Checkout(ctx context.Context, checkout api.ShamanCheckout) (string, error) { - logger := (*zerolog.Ctx(ctx)).With(). - Str("checkoutPath", checkout.CheckoutPath).Logger() - logger.Debug().Msg("shaman: user requested checkout creation") + logger := (*zerolog.Ctx(ctx)) + logger.Debug(). + Str("checkoutPath", checkout.CheckoutPath). + Msg("shaman: user requested checkout creation") // Actually create the checkout. resolvedCheckoutInfo, err := m.PrepareCheckout(checkout.CheckoutPath) if err != nil { return "", err } + logger = logger.With().Str("checkoutPath", resolvedCheckoutInfo.RelativePath).Logger() // The checkout directory was created, so if anything fails now, it should be erased. var checkoutOK bool diff --git a/pkg/shaman/checkout/checkout_test.go b/pkg/shaman/checkout/checkout_test.go index 7f4358f4..259b6e1b 100644 --- a/pkg/shaman/checkout/checkout_test.go +++ b/pkg/shaman/checkout/checkout_test.go @@ -24,7 +24,7 @@ func TestCheckout(t *testing.T) { {Sha: "590c148428d5c35fab3ebad2f3365bb469ab9c531b60831f3e826c472027a0b9", Size: 3367, Path: "subdir/replacer.py"}, {Sha: "80b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", Size: 7488, Path: "feed.py"}, {Sha: "914853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", Size: 486, Path: "httpstuff.py"}, - {Sha: "d6fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", Size: 7217, Path: "filesystemstuff.py"}, + {Sha: "d6fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", Size: 7217, Path: "много ликова.py"}, }, } @@ -38,7 +38,7 @@ func TestCheckout(t *testing.T) { assert.FileExists(t, filepath.Join(coPath, "subdir", "replacer.py")) assert.FileExists(t, filepath.Join(coPath, "feed.py")) assert.FileExists(t, filepath.Join(coPath, "httpstuff.py")) - assert.FileExists(t, filepath.Join(coPath, "filesystemstuff.py")) + assert.FileExists(t, filepath.Join(coPath, "много ликова.py")) storePath := manager.fileStore.StoragePath() assertLinksTo(t, filepath.Join(coPath, "subdir", "replacer.py"), @@ -47,7 +47,7 @@ func TestCheckout(t *testing.T) { filepath.Join(storePath, "80", "b749c27b2fef7255e7e7b3c2029b03b31299c75ff1f1c72732081c70a713a3", "7488.blob")) assertLinksTo(t, filepath.Join(coPath, "httpstuff.py"), filepath.Join(storePath, "91", "4853599dd2c351ab7b82b219aae6e527e51518a667f0ff32244b0c94c75688", "486.blob")) - assertLinksTo(t, filepath.Join(coPath, "filesystemstuff.py"), + assertLinksTo(t, filepath.Join(coPath, "много ликова.py"), filepath.Join(storePath, "d6", "fc7289b5196cc96748ea72f882a22c39b8833b457fe854ef4c03a01f5db0d3", "7217.blob")) } diff --git a/pkg/shaman/checkout/manager.go b/pkg/shaman/checkout/manager.go index 664cb91a..d562b68d 100644 --- a/pkg/shaman/checkout/manager.go +++ b/pkg/shaman/checkout/manager.go @@ -54,7 +54,7 @@ type ResolvedCheckoutInfo struct { // The absolute path on our filesystem. absolutePath string // The path relative to the Manager.checkoutBasePath. This is what was - // received from the client. + // received from the client, updated to be unique. RelativePath string } @@ -104,14 +104,14 @@ func (m *Manager) pathForCheckout(requestedCheckoutPath string) (ResolvedCheckou // PrepareCheckout creates the root directory for a specific checkout. // Returns the path relative to the checkout root directory. -func (m *Manager) PrepareCheckout(checkoutPath string) (ResolvedCheckoutInfo, error) { +func (m *Manager) PrepareCheckout(requestedCheckoutPath string) (ResolvedCheckoutInfo, error) { // This function checks the filesystem and tries to ensure uniqueness, so it's // important that it doesn't run simultaneously in parallel threads. m.checkoutUniquenessMutex.Lock() defer m.checkoutUniquenessMutex.Unlock() var lastErr error - attemptCheckoutPath := checkoutPath + attemptCheckoutPath := requestedCheckoutPath // Just try 10 different random suffixes. If that still doesn't work, fail. for try := 0; try < 10; try++ { @@ -122,7 +122,7 @@ func (m *Manager) PrepareCheckout(checkoutPath string) (ResolvedCheckoutInfo, er logger := log.With(). Str("absolutePath", checkoutPaths.absolutePath). - Str("checkoutPath", checkoutPath). + Str("checkoutPath", requestedCheckoutPath). Logger() if stat, err := os.Stat(checkoutPaths.absolutePath); !errors.Is(err, fs.ErrNotExist) { @@ -130,13 +130,13 @@ func (m *Manager) PrepareCheckout(checkoutPath string) (ResolvedCheckoutInfo, er // No error stat'ing this path, indicating it's an existing checkout. lastErr = ErrCheckoutAlreadyExists if stat.IsDir() { - logger.Debug().Msg("shaman: checkout path exists") + logger.Debug().Msg("shaman: checkout path exists, going to add a random suffix") } else { logger.Warn().Msg("shaman: checkout path exists but is not a directory") } // Retry with (another) random suffix. - attemptCheckoutPath = fmt.Sprintf("%s-%s", checkoutPath, randomisedToken()) + attemptCheckoutPath = fmt.Sprintf("%s-%s", requestedCheckoutPath, randomisedToken()) continue } // If it's any other error, it's really a problem on our side. Don't retry. @@ -150,7 +150,10 @@ func (m *Manager) PrepareCheckout(checkoutPath string) (ResolvedCheckoutInfo, er continue } - logger.Info().Str("relPath", checkoutPaths.RelativePath).Msg("shaman: created checkout directory") + log.Debug(). + Str("requestedPath", requestedCheckoutPath). + Str("actualPath", checkoutPaths.RelativePath). + Msg("shaman: created checkout directory") return checkoutPaths, nil } diff --git a/pkg/shaman/checkout/manager_test.go b/pkg/shaman/checkout/manager_test.go index c66f5f24..7c0d71b9 100644 --- a/pkg/shaman/checkout/manager_test.go +++ b/pkg/shaman/checkout/manager_test.go @@ -26,12 +26,14 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "testing" "time" "git.blender.org/flamenco/pkg/shaman/config" "git.blender.org/flamenco/pkg/shaman/filestore" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createTestManager() (*Manager, func()) { @@ -77,3 +79,21 @@ func TestSymlinkToCheckout(t *testing.T) { assert.True(t, stat.Mode()&os.ModeType == os.ModeSymlink, "%v should be a symlink", symlinkPath) } + +func TestPrepareCheckout(t *testing.T) { + manager, cleanup := createTestManager() + defer cleanup() + + requestedCheckoutPath := "some-path/that is/unique/at first" + + // On first call, this path should be unique. + resolved, err := manager.PrepareCheckout(requestedCheckoutPath) + require.NoError(t, err) + assert.Equal(t, requestedCheckoutPath, resolved.RelativePath) + + // At the second call, it already exists and thus should be altered with a random suffix. + resolved, err = manager.PrepareCheckout(requestedCheckoutPath) + require.NoError(t, err) + assert.NotEqual(t, requestedCheckoutPath, resolved.RelativePath) + assert.True(t, strings.HasPrefix(resolved.RelativePath, requestedCheckoutPath+"-")) +}