From 830c3fe794c5530f9590075376e48a2a3f97b2f6 Mon Sep 17 00:00:00 2001 From: Eveline Anderson Date: Mon, 10 Jul 2023 11:06:53 +0200 Subject: [PATCH] Rename worker 'clusters' to 'tags' As it was decided that the name "tags" would be better for the clarity of the feature, all files and code named "cluster" or "worker cluster" have been removed and replaced with "tag" and "worker tag". This is only a name change, no other features were touched. This addresses part of #104204. Reviewed-on: https://projects.blender.org/studio/flamenco/pulls/104223 As a note to anyone who already ran a pre-release version of Flamenco and configured some worker clusters, with the help of an SQLite client you can migrate the clusters to tags. First build Flamenco Manager and start it, to create the new database schema. Then run these SQL queries via an sqlite commandline client: ```sql insert into worker_tags (id, created_at, updated_at, uuid, name, description) select id, created_at, updated_at, uuid, name, description from worker_clusters; insert into worker_tag_membership (worker_tag_id, worker_id) select worker_cluster_id, worker_id from worker_cluster_membership; ``` --- CHANGELOG.md | 2 +- addon/flamenco/__init__.py | 8 +- addon/flamenco/gui.py | 6 +- addon/flamenco/job_submission.py | 6 +- addon/flamenco/operators.py | 22 +-- addon/flamenco/preferences.py | 12 +- .../{worker_clusters.py => worker_tags.py} | 30 ++-- internal/manager/api_impl/interfaces.go | 14 +- internal/manager/api_impl/jobs.go | 4 +- internal/manager/api_impl/jobs_test.go | 22 +-- .../api_impl/mocks/api_impl_mock.gen.go | 92 +++++----- internal/manager/api_impl/worker_mgt.go | 166 +++++++++--------- internal/manager/api_impl/worker_mgt_test.go | 54 +++--- internal/manager/job_compilers/author.go | 4 +- .../manager/job_compilers/job_compilers.go | 4 +- .../job_compilers/job_compilers_test.go | 28 +-- internal/manager/persistence/db_migration.go | 2 +- internal/manager/persistence/errors.go | 18 +- internal/manager/persistence/jobs.go | 16 +- .../manager/persistence/jobs_blocklist.go | 12 +- .../persistence/jobs_blocklist_test.go | 50 +++--- internal/manager/persistence/jobs_query.go | 2 +- internal/manager/persistence/jobs_test.go | 2 +- .../manager/persistence/task_scheduler.go | 28 +-- .../persistence/task_scheduler_test.go | 56 +++--- internal/manager/persistence/test_support.go | 14 +- .../manager/persistence/worker_cluster.go | 112 ------------ .../persistence/worker_cluster_test.go | 165 ----------------- internal/manager/persistence/worker_tag.go | 112 ++++++++++++ .../manager/persistence/worker_tag_test.go | 165 +++++++++++++++++ internal/manager/persistence/workers.go | 4 +- internal/manager/persistence/workers_test.go | 10 +- internal/manager/webupdates/worker_updates.go | 2 +- web/app/src/components/jobs/JobDetails.vue | 32 ++-- .../src/components/workers/WorkerDetails.vue | 100 ++++++----- web/app/src/stores/workers.js | 33 ++-- 36 files changed, 708 insertions(+), 701 deletions(-) rename addon/flamenco/{worker_clusters.py => worker_tags.py} (62%) delete mode 100644 internal/manager/persistence/worker_cluster.go delete mode 100644 internal/manager/persistence/worker_cluster_test.go create mode 100644 internal/manager/persistence/worker_tag.go create mode 100644 internal/manager/persistence/worker_tag_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 241b1d80..c9d8bc6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ bugs in actually-released versions. - Improve speed of queueing up >100 simultaneous job deletions. - Improve logging of job deletion. -- Add Worker Cluster support. Workers can be members of any number of clusters. Workers will only work on jobs that are assigned to that cluster. Jobs that do not have a cluster will be available to all workers, regardless of their cluster assignment. As a result, clusterless workers will only work on clusterless jobs. +- Add Worker Tag support. Workers can be members of any number of tags. Workers will only work on jobs that are assigned to that tag. Jobs that do not have a tag will be available to all workers, regardless of their tag assignment. As a result, tagless workers will only work on tagless jobs. - Fix limitation where a job could have no more than 1000 tasks ([#104201](https://projects.blender.org/studio/flamenco/issues/104201)) - Add support for finding the top-level 'project' directory. When submitting files to Flamenco, the add-on will try to retain the directory structure of your Blender project as precisely as possible. This new feature allows the add-on to find the top-level directory of your project by finding a `.blender_project`, `.git`, or `.subversion` directory. This can be configured in the add-on preferences. - Worker status is remembered when they sign off, so that workers when they come back online do so to the same state ([#99549](https://projects.blender.org/studio/flamenco/issues/99549)). diff --git a/addon/flamenco/__init__.py b/addon/flamenco/__init__.py index 10de1bc9..7dca8fc3 100644 --- a/addon/flamenco/__init__.py +++ b/addon/flamenco/__init__.py @@ -26,7 +26,7 @@ if __is_first_load: comms, preferences, projects, - worker_clusters, + worker_tags, ) else: import importlib @@ -37,7 +37,7 @@ else: comms = importlib.reload(comms) preferences = importlib.reload(preferences) projects = importlib.reload(projects) - worker_clusters = importlib.reload(worker_clusters) + worker_tags = importlib.reload(worker_tags) import bpy @@ -155,7 +155,7 @@ def register() -> None: ) preferences.register() - worker_clusters.register() + worker_tags.register() operators.register() gui.register() job_types.register() @@ -173,5 +173,5 @@ def unregister() -> None: job_types.unregister() gui.unregister() operators.unregister() - worker_clusters.unregister() + worker_tags.unregister() preferences.unregister() diff --git a/addon/flamenco/gui.py b/addon/flamenco/gui.py index 9ff7d427..d9697f75 100644 --- a/addon/flamenco/gui.py +++ b/addon/flamenco/gui.py @@ -43,10 +43,10 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel): col.prop(context.scene, "flamenco_job_name", text="Job Name") col.prop(context.scene, "flamenco_job_priority", text="Priority") - # Worker cluster: + # Worker tag: row = col.row(align=True) - row.prop(context.scene, "flamenco_worker_cluster", text="Cluster") - row.operator("flamenco.fetch_worker_clusters", text="", icon="FILE_REFRESH") + row.prop(context.scene, "flamenco_worker_tag", text="Tag") + row.operator("flamenco.fetch_worker_tags", text="", icon="FILE_REFRESH") layout.separator() diff --git a/addon/flamenco/job_submission.py b/addon/flamenco/job_submission.py index 95344474..cb40a5c2 100644 --- a/addon/flamenco/job_submission.py +++ b/addon/flamenco/job_submission.py @@ -54,9 +54,9 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]: type_etag=propgroup.job_type.etag, ) - worker_cluster: str = getattr(scene, "flamenco_worker_cluster", "") - if worker_cluster and worker_cluster != "-": - job.worker_cluster = worker_cluster + worker_tag: str = getattr(scene, "flamenco_worker_tag", "") + if worker_tag and worker_tag != "-": + job.worker_tag = worker_tag return job diff --git a/addon/flamenco/operators.py b/addon/flamenco/operators.py index 5ee0a0e3..0b6c883b 100644 --- a/addon/flamenco/operators.py +++ b/addon/flamenco/operators.py @@ -10,7 +10,7 @@ from urllib3.exceptions import HTTPError, MaxRetryError import bpy -from . import job_types, job_submission, preferences, worker_clusters +from . import job_types, job_submission, preferences, worker_tags from .job_types_propgroup import JobTypePropertyGroup from .bat.submodules import bpathlib @@ -83,10 +83,10 @@ class FLAMENCO_OT_fetch_job_types(FlamencoOpMixin, bpy.types.Operator): return {"FINISHED"} -class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): - bl_idname = "flamenco.fetch_worker_clusters" - bl_label = "Fetch Worker Clusters" - bl_description = "Query Flamenco Manager to obtain the available worker clusters" +class FLAMENCO_OT_fetch_worker_tags(FlamencoOpMixin, bpy.types.Operator): + bl_idname = "flamenco.fetch_worker_tags" + bl_label = "Fetch Worker Tags" + bl_description = "Query Flamenco Manager to obtain the available worker tags" def execute(self, context: bpy.types.Context) -> set[str]: api_client = self.get_api_client(context) @@ -94,10 +94,10 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): from flamenco.manager import ApiException scene = context.scene - old_cluster = getattr(scene, "flamenco_worker_cluster", "") + old_tag = getattr(scene, "flamenco_worker_tag", "") try: - worker_clusters.refresh(context, api_client) + worker_tags.refresh(context, api_client) except ApiException as ex: self.report({"ERROR"}, "Error getting job types: %s" % ex) return {"CANCELLED"} @@ -107,9 +107,9 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): self.report({"ERROR"}, "Unable to reach Manager") return {"CANCELLED"} - if old_cluster: - # TODO: handle cases where the old cluster no longer exists. - scene.flamenco_worker_cluster = old_cluster + if old_tag: + # TODO: handle cases where the old tag no longer exists. + scene.flamenco_worker_tag = old_tag return {"FINISHED"} @@ -669,7 +669,7 @@ class FLAMENCO3_OT_explore_file_path(bpy.types.Operator): classes = ( FLAMENCO_OT_fetch_job_types, - FLAMENCO_OT_fetch_worker_clusters, + FLAMENCO_OT_fetch_worker_tags, FLAMENCO_OT_ping_manager, FLAMENCO_OT_eval_setting, FLAMENCO_OT_submit_job, diff --git a/addon/flamenco/preferences.py b/addon/flamenco/preferences.py index 31bc231c..2b5daa27 100644 --- a/addon/flamenco/preferences.py +++ b/addon/flamenco/preferences.py @@ -43,7 +43,7 @@ _project_finder_enum_items = [ ] -class WorkerCluster(bpy.types.PropertyGroup): +class WorkerTag(bpy.types.PropertyGroup): id: bpy.props.StringProperty(name="id") # type: ignore name: bpy.props.StringProperty(name="Name") # type: ignore description: bpy.props.StringProperty(name="Description") # type: ignore @@ -93,10 +93,10 @@ class FlamencoPreferences(bpy.types.AddonPreferences): get=lambda prefs: prefs.job_storage, ) - worker_clusters: bpy.props.CollectionProperty( # type: ignore - type=WorkerCluster, - name="Worker Clusters", - description="Cache for the worker clusters available on the configured Manager", + worker_tags: bpy.props.CollectionProperty( # type: ignore + type=WorkerTag, + name="Worker Tags", + description="Cache for the worker tags available on the configured Manager", options={"HIDDEN"}, ) @@ -169,7 +169,7 @@ def manager_url(context: bpy.types.Context) -> str: classes = ( - WorkerCluster, + WorkerTag, FlamencoPreferences, ) _register, _unregister = bpy.utils.register_classes_factory(classes) diff --git a/addon/flamenco/worker_clusters.py b/addon/flamenco/worker_tags.py similarity index 62% rename from addon/flamenco/worker_clusters.py rename to addon/flamenco/worker_tags.py index 2604f488..2ed75cbb 100644 --- a/addon/flamenco/worker_clusters.py +++ b/addon/flamenco/worker_tags.py @@ -16,25 +16,25 @@ _enum_items: list[Union[tuple[str, str, str], tuple[str, str, str, int, int]]] = def refresh(context: bpy.types.Context, api_client: _ApiClient) -> None: - """Fetch the available worker clusters from the Manager.""" + """Fetch the available worker tags from the Manager.""" from flamenco.manager import ApiClient from flamenco.manager.api import worker_mgt_api - from flamenco.manager.model.worker_cluster_list import WorkerClusterList + from flamenco.manager.model.worker_tag_list import WorkerTagList assert isinstance(api_client, ApiClient) api = worker_mgt_api.WorkerMgtApi(api_client) - response: WorkerClusterList = api.fetch_worker_clusters() + response: WorkerTagList = api.fetch_worker_tags() # Store on the preferences, so a cached version persists until the next refresh. prefs = preferences.get(context) - prefs.worker_clusters.clear() + prefs.worker_tags.clear() - for cluster in response.clusters: - rna_cluster = prefs.worker_clusters.add() - rna_cluster.id = cluster.id - rna_cluster.name = cluster.name - rna_cluster.description = getattr(cluster, "description", "") + for tag in response.tags: + rna_tag = prefs.worker_tags.add() + rna_tag.id = tag.id + rna_tag.name = tag.name + rna_tag.description = getattr(tag, "description", "") # Preferences have changed, so make sure that Blender saves them (assuming # auto-save here). @@ -46,25 +46,25 @@ def _get_enum_items(self, context): prefs = preferences.get(context) _enum_items = [ - ("-", "All", "No specific cluster assigned, any worker can handle this job"), + ("-", "All", "No specific tag assigned, any worker can handle this job"), ] _enum_items.extend( - (cluster.id, cluster.name, cluster.description) - for cluster in prefs.worker_clusters + (tag.id, tag.name, tag.description) + for tag in prefs.worker_tags ) return _enum_items def register() -> None: - bpy.types.Scene.flamenco_worker_cluster = bpy.props.EnumProperty( - name="Worker Cluster", + bpy.types.Scene.flamenco_worker_tag = bpy.props.EnumProperty( + name="Worker Tag", items=_get_enum_items, description="The set of Workers that can handle tasks of this job", ) def unregister() -> None: - to_del = ((bpy.types.Scene, "flamenco_worker_cluster"),) + to_del = ((bpy.types.Scene, "flamenco_worker_tag"),) for ob, attr in to_del: try: delattr(ob, attr) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 17fd7c91..7b0c5c31 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -65,13 +65,13 @@ type PersistenceService interface { RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error ClearJobBlocklist(ctx context.Context, job *persistence.Job) error - // Worker cluster management. - WorkerSetClusters(ctx context.Context, worker *persistence.Worker, clusterUUIDs []string) error - CreateWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error - FetchWorkerCluster(ctx context.Context, uuid string) (*persistence.WorkerCluster, error) - FetchWorkerClusters(ctx context.Context) ([]*persistence.WorkerCluster, error) - DeleteWorkerCluster(ctx context.Context, uuid string) error - SaveWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error + // Worker tag management. + WorkerSetTags(ctx context.Context, worker *persistence.Worker, tagUUIDs []string) error + CreateWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error + FetchWorkerTag(ctx context.Context, uuid string) (*persistence.WorkerTag, error) + FetchWorkerTags(ctx context.Context) ([]*persistence.WorkerTag, error) + DeleteWorkerTag(ctx context.Context, uuid string) error + SaveWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index fcd84a7e..aca62069 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -618,8 +618,8 @@ func jobDBtoAPI(dbJob *persistence.Job) api.Job { if dbJob.DeleteRequestedAt.Valid { apiJob.DeleteRequestedAt = &dbJob.DeleteRequestedAt.Time } - if dbJob.WorkerCluster != nil { - apiJob.WorkerCluster = &dbJob.WorkerCluster.UUID + if dbJob.WorkerTag != nil { + apiJob.WorkerTag = &dbJob.WorkerTag.UUID } return apiJob diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 5c64babf..fa666fe8 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -320,19 +320,19 @@ func TestSubmitJobWithShamanCheckoutID(t *testing.T) { assert.NoError(t, err) } -func TestSubmitJobWithWorkerCluster(t *testing.T) { +func TestSubmitJobWithWorkerTag(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() mf := newMockedFlamenco(mockCtrl) worker := testWorker() - workerClusterUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd" - cluster := persistence.WorkerCluster{ + workerTagUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd" + tag := persistence.WorkerTag{ Model: persistence.Model{ID: 47}, - UUID: workerClusterUUID, - Name: "first cluster", - Description: "my first cluster", + UUID: workerTagUUID, + Name: "first tag", + Description: "my first tag", } submittedJob := api.SubmittedJob{ @@ -340,7 +340,7 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { Type: "test", Priority: 50, SubmitterPlatform: worker.Platform, - WorkerCluster: &workerClusterUUID, + WorkerTag: &workerTagUUID, } mf.expectConvertTwoWayVariables(t, @@ -351,8 +351,8 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { // Expect the job compiler to be called. authoredJob := job_compilers.AuthoredJob{ - JobID: "afc47568-bd9d-4368-8016-e91d945db36d", - WorkerClusterUUID: workerClusterUUID, + JobID: "afc47568-bd9d-4368-8016-e91d945db36d", + WorkerTagUUID: workerTagUUID, Name: submittedJob.Name, JobType: submittedJob.Type, @@ -382,8 +382,8 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { Settings: persistence.StringInterfaceMap{}, Metadata: persistence.StringStringMap{}, - WorkerClusterID: &cluster.ID, - WorkerCluster: &cluster, + WorkerTagID: &tag.ID, + WorkerTag: &tag, } mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 31a8aafa..6817cfce 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -141,18 +141,18 @@ func (mr *MockPersistenceServiceMockRecorder) CreateWorker(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorker", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorker), arg0, arg1) } -// CreateWorkerCluster mocks base method. -func (m *MockPersistenceService) CreateWorkerCluster(arg0 context.Context, arg1 *persistence.WorkerCluster) error { +// CreateWorkerTag mocks base method. +func (m *MockPersistenceService) CreateWorkerTag(arg0 context.Context, arg1 *persistence.WorkerTag) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkerCluster", arg0, arg1) + ret := m.ctrl.Call(m, "CreateWorkerTag", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// CreateWorkerCluster indicates an expected call of CreateWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) CreateWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// CreateWorkerTag indicates an expected call of CreateWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) CreateWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorkerTag), arg0, arg1) } // DeleteWorker mocks base method. @@ -169,18 +169,18 @@ func (mr *MockPersistenceServiceMockRecorder) DeleteWorker(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorker", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorker), arg0, arg1) } -// DeleteWorkerCluster mocks base method. -func (m *MockPersistenceService) DeleteWorkerCluster(arg0 context.Context, arg1 string) error { +// DeleteWorkerTag mocks base method. +func (m *MockPersistenceService) DeleteWorkerTag(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteWorkerCluster", arg0, arg1) + ret := m.ctrl.Call(m, "DeleteWorkerTag", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// DeleteWorkerCluster indicates an expected call of DeleteWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) DeleteWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// DeleteWorkerTag indicates an expected call of DeleteWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) DeleteWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorkerTag), arg0, arg1) } // FetchJob mocks base method. @@ -258,34 +258,34 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } -// FetchWorkerCluster mocks base method. -func (m *MockPersistenceService) FetchWorkerCluster(arg0 context.Context, arg1 string) (*persistence.WorkerCluster, error) { +// FetchWorkerTag mocks base method. +func (m *MockPersistenceService) FetchWorkerTag(arg0 context.Context, arg1 string) (*persistence.WorkerTag, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWorkerCluster", arg0, arg1) - ret0, _ := ret[0].(*persistence.WorkerCluster) + ret := m.ctrl.Call(m, "FetchWorkerTag", arg0, arg1) + ret0, _ := ret[0].(*persistence.WorkerTag) ret1, _ := ret[1].(error) return ret0, ret1 } -// FetchWorkerCluster indicates an expected call of FetchWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) FetchWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// FetchWorkerTag indicates an expected call of FetchWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTag), arg0, arg1) } -// FetchWorkerClusters mocks base method. -func (m *MockPersistenceService) FetchWorkerClusters(arg0 context.Context) ([]*persistence.WorkerCluster, error) { +// FetchWorkerTags mocks base method. +func (m *MockPersistenceService) FetchWorkerTags(arg0 context.Context) ([]*persistence.WorkerTag, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWorkerClusters", arg0) - ret0, _ := ret[0].([]*persistence.WorkerCluster) + ret := m.ctrl.Call(m, "FetchWorkerTags", arg0) + ret0, _ := ret[0].([]*persistence.WorkerTag) ret1, _ := ret[1].(error) return ret0, ret1 } -// FetchWorkerClusters indicates an expected call of FetchWorkerClusters. -func (mr *MockPersistenceServiceMockRecorder) FetchWorkerClusters(arg0 interface{}) *gomock.Call { +// FetchWorkerTags indicates an expected call of FetchWorkerTags. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTags(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerClusters", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerClusters), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTags", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTags), arg0) } // FetchWorkerTask mocks base method. @@ -433,20 +433,6 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorker", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorker), arg0, arg1) } -// SaveWorkerCluster mocks base method. -func (m *MockPersistenceService) SaveWorkerCluster(arg0 context.Context, arg1 *persistence.WorkerCluster) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SaveWorkerCluster", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SaveWorkerCluster indicates an expected call of SaveWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) SaveWorkerCluster(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerCluster), arg0, arg1) -} - // SaveWorkerStatus mocks base method. func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() @@ -461,6 +447,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorkerStatus(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerStatus), arg0, arg1) } +// SaveWorkerTag mocks base method. +func (m *MockPersistenceService) SaveWorkerTag(arg0 context.Context, arg1 *persistence.WorkerTag) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveWorkerTag", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveWorkerTag indicates an expected call of SaveWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) SaveWorkerTag(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerTag), arg0, arg1) +} + // ScheduleTask mocks base method. func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() @@ -532,18 +532,18 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1) } -// WorkerSetClusters mocks base method. -func (m *MockPersistenceService) WorkerSetClusters(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { +// WorkerSetTags mocks base method. +func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WorkerSetClusters", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "WorkerSetTags", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// WorkerSetClusters indicates an expected call of WorkerSetClusters. -func (mr *MockPersistenceServiceMockRecorder) WorkerSetClusters(arg0, arg1, arg2 interface{}) *gomock.Call { +// WorkerSetTags indicates an expected call of WorkerSetTags. +func (mr *MockPersistenceServiceMockRecorder) WorkerSetTags(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSetClusters", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSetClusters), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSetTags", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSetTags), arg0, arg1, arg2) } // WorkersLeftToRun mocks base method. diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index c1f8ab30..509f525f 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -182,7 +182,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { +func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) logger = logger.With().Str("worker", workerUUID).Logger() @@ -192,7 +192,7 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { } // Decode the request body. - var change api.WorkerClusterChangeRequest + var change api.WorkerTagChangeRequest if err := e.Bind(&change); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") @@ -210,13 +210,13 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { } logger = logger.With(). - Strs("clusters", change.ClusterIds). + Strs("tags", change.TagIds). Logger() - logger.Info().Msg("worker cluster change requested") + logger.Info().Msg("worker tag change requested") - // Store the new cluster assignment. - if err := f.persist.WorkerSetClusters(ctx, dbWorker, change.ClusterIds); err != nil { - logger.Error().Err(err).Msg("saving worker after cluster change request") + // Store the new tag assignment. + if err := f.persist.WorkerSetTags(ctx, dbWorker, change.TagIds); err != nil { + logger.Error().Err(err).Msg("saving worker after tag change request") return sendAPIError(e, http.StatusInternalServerError, "error saving worker: %v", err) } @@ -227,155 +227,155 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) DeleteWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - err := f.persist.DeleteWorkerCluster(ctx, clusterUUID) + err := f.persist.DeleteWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("deleting worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error deleting worker cluster: %v", err) + logger.Error().Err(err).Msg("deleting worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error deleting worker tag: %v", err) } - // TODO: SocketIO broadcast of cluster deletion. + // TODO: SocketIO broadcast of tag deletion. - logger.Info().Msg("worker cluster deleted") + logger.Info().Msg("worker tag deleted") return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) FetchWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) FetchWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - cluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID) + tag, err := f.persist.FetchWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("fetching worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err) + logger.Error().Err(err).Msg("fetching worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err) } - return e.JSON(http.StatusOK, workerClusterDBtoAPI(*cluster)) + return e.JSON(http.StatusOK, workerTagDBtoAPI(*tag)) } -func (f *Flamenco) UpdateWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } // Decode the request body. - var update api.UpdateWorkerClusterJSONBody + var update api.UpdateWorkerTagJSONBody if err := e.Bind(&update); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } - dbCluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID) + dbTag, err := f.persist.FetchWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("fetching worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err) + logger.Error().Err(err).Msg("fetching worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err) } - // Update the cluster. - dbCluster.Name = update.Name + // Update the tag. + dbTag.Name = update.Name if update.Description == nil { - dbCluster.Description = "" + dbTag.Description = "" } else { - dbCluster.Description = *update.Description + dbTag.Description = *update.Description } - if err := f.persist.SaveWorkerCluster(ctx, dbCluster); err != nil { - logger.Error().Err(err).Msg("saving worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster") + if err := f.persist.SaveWorkerTag(ctx, dbTag); err != nil { + logger.Error().Err(err).Msg("saving worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag") } - // TODO: SocketIO broadcast of cluster update. + // TODO: SocketIO broadcast of tag update. return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) FetchWorkerClusters(e echo.Context) error { +func (f *Flamenco) FetchWorkerTags(e echo.Context) error { ctx := e.Request().Context() logger := requestLogger(e) - dbClusters, err := f.persist.FetchWorkerClusters(ctx) + dbTags, err := f.persist.FetchWorkerTags(ctx) if err != nil { - logger.Error().Err(err).Msg("fetching worker clusters") - return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster") + logger.Error().Err(err).Msg("fetching worker tags") + return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag") } - apiClusters := []api.WorkerCluster{} - for _, dbCluster := range dbClusters { - apiCluster := workerClusterDBtoAPI(*dbCluster) - apiClusters = append(apiClusters, apiCluster) + apiTags := []api.WorkerTag{} + for _, dbTag := range dbTags { + apiTag := workerTagDBtoAPI(*dbTag) + apiTags = append(apiTags, apiTag) } - clusterList := api.WorkerClusterList{ - Clusters: &apiClusters, + tagList := api.WorkerTagList{ + Tags: &apiTags, } - return e.JSON(http.StatusOK, &clusterList) + return e.JSON(http.StatusOK, &tagList) } -func (f *Flamenco) CreateWorkerCluster(e echo.Context) error { +func (f *Flamenco) CreateWorkerTag(e echo.Context) error { ctx := e.Request().Context() logger := requestLogger(e) // Decode the request body. - var apiCluster api.CreateWorkerClusterJSONBody - if err := e.Bind(&apiCluster); err != nil { + var apiTag api.CreateWorkerTagJSONBody + if err := e.Bind(&apiTag); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } // Convert to persistence layer model. - var clusterUUID string - if apiCluster.Id != nil && *apiCluster.Id != "" { - clusterUUID = *apiCluster.Id + var tagUUID string + if apiTag.Id != nil && *apiTag.Id != "" { + tagUUID = *apiTag.Id } else { - clusterUUID = uuid.New() + tagUUID = uuid.New() } - dbCluster := persistence.WorkerCluster{ - UUID: clusterUUID, - Name: apiCluster.Name, + dbTag := persistence.WorkerTag{ + UUID: tagUUID, + Name: apiTag.Name, } - if apiCluster.Description != nil { - dbCluster.Description = *apiCluster.Description + if apiTag.Description != nil { + dbTag.Description = *apiTag.Description } // Store in the database. - if err := f.persist.CreateWorkerCluster(ctx, &dbCluster); err != nil { - logger.Error().Err(err).Msg("creating worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error creating worker cluster") + if err := f.persist.CreateWorkerTag(ctx, &dbTag); err != nil { + logger.Error().Err(err).Msg("creating worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error creating worker tag") } - // TODO: SocketIO broadcast of cluster creation. + // TODO: SocketIO broadcast of tag creation. - return e.JSON(http.StatusOK, workerClusterDBtoAPI(dbCluster)) + return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag)) } func workerSummary(w persistence.Worker) api.WorkerSummary { @@ -407,26 +407,26 @@ func workerDBtoAPI(w persistence.Worker) api.Worker { SupportedTaskTypes: w.TaskTypes(), } - if len(w.Clusters) > 0 { - clusters := []api.WorkerCluster{} - for i := range w.Clusters { - clusters = append(clusters, workerClusterDBtoAPI(*w.Clusters[i])) + if len(w.Tags) > 0 { + tags := []api.WorkerTag{} + for i := range w.Tags { + tags = append(tags, workerTagDBtoAPI(*w.Tags[i])) } - apiWorker.Clusters = &clusters + apiWorker.Tags = &tags } return apiWorker } -func workerClusterDBtoAPI(wc persistence.WorkerCluster) api.WorkerCluster { +func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag { uuid := wc.UUID // Take a copy for safety. - apiCluster := api.WorkerCluster{ + apiTag := api.WorkerTag{ Id: &uuid, Name: wc.Name, } if len(wc.Description) > 0 { - apiCluster.Description = &wc.Description + apiTag.Description = &wc.Description } - return apiCluster + return apiTag } diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index f914121d..352125c3 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -262,58 +262,58 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) { assertResponseNoContent(t, echo) } -func TestWorkerClusterCRUDHappyFlow(t *testing.T) { +func TestWorkerTagCRUDHappyFlow(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() mf := newMockedFlamenco(mockCtrl) - // Create a cluster. + // Create a tag. UUID := "18d9234e-5135-458f-a1ba-a350c3d4e837" - apiCluster := api.WorkerCluster{ + apiTag := api.WorkerTag{ Id: &UUID, Name: "ʻO nā manu ʻino", Description: ptr("Ke aloha"), } - expectDBCluster := persistence.WorkerCluster{ + expectDBTag := persistence.WorkerTag{ UUID: UUID, - Name: apiCluster.Name, - Description: *apiCluster.Description, + Name: apiTag.Name, + Description: *apiTag.Description, } - mf.persistence.EXPECT().CreateWorkerCluster(gomock.Any(), &expectDBCluster) - // TODO: expect SocketIO broadcast of the cluster creation. - echo := mf.prepareMockedJSONRequest(apiCluster) - require.NoError(t, mf.flamenco.CreateWorkerCluster(echo)) - assertResponseJSON(t, echo, http.StatusOK, &apiCluster) + mf.persistence.EXPECT().CreateWorkerTag(gomock.Any(), &expectDBTag) + // TODO: expect SocketIO broadcast of the tag creation. + echo := mf.prepareMockedJSONRequest(apiTag) + require.NoError(t, mf.flamenco.CreateWorkerTag(echo)) + assertResponseJSON(t, echo, http.StatusOK, &apiTag) - // Fetch the cluster - mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil) + // Fetch the tag + mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) echo = mf.prepareMockedRequest(nil) - require.NoError(t, mf.flamenco.FetchWorkerCluster(echo, UUID)) - assertResponseJSON(t, echo, http.StatusOK, &apiCluster) + require.NoError(t, mf.flamenco.FetchWorkerTag(echo, UUID)) + assertResponseJSON(t, echo, http.StatusOK, &apiTag) // Update & save. newUUID := "60442762-83d3-4fc3-bf75-6ab5799cdbaa" - newAPICluster := api.WorkerCluster{ + newAPITag := api.WorkerTag{ Id: &newUUID, // Intentionally change the UUID. This should just be ignored. Name: "updated name", } - expectNewDBCluster := persistence.WorkerCluster{ + expectNewDBTag := persistence.WorkerTag{ UUID: UUID, - Name: newAPICluster.Name, + Name: newAPITag.Name, Description: "", } - // TODO: expect SocketIO broadcast of the cluster update. - mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil) - mf.persistence.EXPECT().SaveWorkerCluster(gomock.Any(), &expectNewDBCluster) - echo = mf.prepareMockedJSONRequest(newAPICluster) - require.NoError(t, mf.flamenco.UpdateWorkerCluster(echo, UUID)) + // TODO: expect SocketIO broadcast of the tag update. + mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) + mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag) + echo = mf.prepareMockedJSONRequest(newAPITag) + require.NoError(t, mf.flamenco.UpdateWorkerTag(echo, UUID)) assertResponseNoContent(t, echo) // Delete. - mf.persistence.EXPECT().DeleteWorkerCluster(gomock.Any(), UUID) - // TODO: expect SocketIO broadcast of the cluster deletion. - echo = mf.prepareMockedJSONRequest(newAPICluster) - require.NoError(t, mf.flamenco.DeleteWorkerCluster(echo, UUID)) + mf.persistence.EXPECT().DeleteWorkerTag(gomock.Any(), UUID) + // TODO: expect SocketIO broadcast of the tag deletion. + echo = mf.prepareMockedJSONRequest(newAPITag) + require.NoError(t, mf.flamenco.DeleteWorkerTag(echo, UUID)) assertResponseNoContent(t, echo) } diff --git a/internal/manager/job_compilers/author.go b/internal/manager/job_compilers/author.go index 7a457feb..c1d5dd3a 100644 --- a/internal/manager/job_compilers/author.go +++ b/internal/manager/job_compilers/author.go @@ -20,8 +20,8 @@ type Author struct { } type AuthoredJob struct { - JobID string - WorkerClusterUUID string + JobID string + WorkerTagUUID string Name string JobType string diff --git a/internal/manager/job_compilers/job_compilers.go b/internal/manager/job_compilers/job_compilers.go index 3b4cae4c..41228d0f 100644 --- a/internal/manager/job_compilers/job_compilers.go +++ b/internal/manager/job_compilers/job_compilers.go @@ -127,8 +127,8 @@ func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJo aj.Storage.ShamanCheckoutID = *sj.Storage.ShamanCheckoutId } - if sj.WorkerCluster != nil { - aj.WorkerClusterUUID = *sj.WorkerCluster + if sj.WorkerTag != nil { + aj.WorkerTagUUID = *sj.WorkerTag } compiler, err := vm.getCompileJob() diff --git a/internal/manager/job_compilers/job_compilers_test.go b/internal/manager/job_compilers/job_compilers_test.go index ad746b51..3fcccfaa 100644 --- a/internal/manager/job_compilers/job_compilers_test.go +++ b/internal/manager/job_compilers/job_compilers_test.go @@ -45,12 +45,12 @@ func exampleSubmittedJob() api.SubmittedJob { "user.name": "Sybren Stüvel", }} sj := api.SubmittedJob{ - Name: "3Д рендеринг", - Priority: 50, - Type: "simple-blender-render", - Settings: &settings, - Metadata: &metadata, - WorkerCluster: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"), + Name: "3Д рендеринг", + Priority: 50, + Type: "simple-blender-render", + Settings: &settings, + Metadata: &metadata, + WorkerTag: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"), } return sj } @@ -80,7 +80,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) { // Properties should be copied as-is. assert.Equal(t, sj.Name, aj.Name) - assert.Equal(t, *sj.WorkerCluster, aj.WorkerClusterUUID) + assert.Equal(t, *sj.WorkerTag, aj.WorkerTagUUID) assert.Equal(t, sj.Type, aj.JobType) assert.Equal(t, sj.Priority, aj.Priority) assert.EqualValues(t, sj.Settings.AdditionalProperties, aj.Settings) @@ -139,7 +139,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) { assert.Equal(t, expectDeps, tVideo.Dependencies) } -func TestJobWithoutCluster(t *testing.T) { +func TestJobWithoutTag(t *testing.T) { c := mockedClock(t) s, err := Load(c) @@ -151,20 +151,20 @@ func TestJobWithoutCluster(t *testing.T) { sj := exampleSubmittedJob() - // Try with nil WorkerCluster. + // Try with nil WorkerTag. { - sj.WorkerCluster = nil + sj.WorkerTag = nil aj, err := s.Compile(ctx, sj) require.NoError(t, err) - assert.Zero(t, aj.WorkerClusterUUID) + assert.Zero(t, aj.WorkerTagUUID) } - // Try with empty WorkerCluster. + // Try with empty WorkerTag. { - sj.WorkerCluster = ptr("") + sj.WorkerTag = ptr("") aj, err := s.Compile(ctx, sj) require.NoError(t, err) - assert.Zero(t, aj.WorkerClusterUUID) + assert.Zero(t, aj.WorkerTagUUID) } } diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index b60d9230..6c8bc570 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -16,7 +16,7 @@ func (db *DB) migrate() error { &Task{}, &TaskFailure{}, &Worker{}, - &WorkerCluster{}, + &WorkerTag{}, ) if err != nil { return fmt.Errorf("failed to automigrate database: %v", err) diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index 1316f481..0b7ebd6f 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -9,10 +9,10 @@ import ( ) var ( - ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} - ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} - ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} - ErrWorkerClusterNotFound = PersistenceError{Message: "worker cluster not found", Err: gorm.ErrRecordNotFound} + ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} + ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} + ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} + ErrWorkerTagNotFound = PersistenceError{Message: "worker tag not found", Err: gorm.ErrRecordNotFound} ) type PersistenceError struct { @@ -40,8 +40,8 @@ func workerError(errorToWrap error, message string, msgArgs ...interface{}) erro return wrapError(translateGormWorkerError(errorToWrap), message, msgArgs...) } -func workerClusterError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerClusterError(errorToWrap), message, msgArgs...) +func workerTagError(errorToWrap error, message string, msgArgs ...interface{}) error { + return wrapError(translateGormWorkerTagError(errorToWrap), message, msgArgs...) } func wrapError(errorToWrap error, message string, format ...interface{}) error { @@ -86,11 +86,11 @@ func translateGormWorkerError(gormError error) error { return gormError } -// translateGormWorkerClusterError translates a Gorm error to a persistence layer error. +// translateGormWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerClusterError(gormError error) error { +func translateGormWorkerTagError(gormError error) error { if errors.Is(gormError, gorm.ErrRecordNotFound) { - return ErrWorkerClusterNotFound + return ErrWorkerTagNotFound } return gormError } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 66b9e0fa..8f6329d9 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -36,8 +36,8 @@ type Job struct { Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"` - WorkerClusterID *uint - WorkerCluster *WorkerCluster `gorm:"foreignkey:WorkerClusterID;references:ID;constraint:OnDelete:SET NULL"` + WorkerTagID *uint + WorkerTag *WorkerTag `gorm:"foreignkey:WorkerTagID;references:ID;constraint:OnDelete:SET NULL"` } type StringInterfaceMap map[string]interface{} @@ -148,14 +148,14 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au }, } - // Find and assign the worker cluster. - if authoredJob.WorkerClusterUUID != "" { - dbCluster, err := fetchWorkerCluster(tx, authoredJob.WorkerClusterUUID) + // Find and assign the worker tag. + if authoredJob.WorkerTagUUID != "" { + dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID) if err != nil { return err } - dbJob.WorkerClusterID = &dbCluster.ID - dbJob.WorkerCluster = dbCluster + dbJob.WorkerTagID = &dbTag.ID + dbJob.WorkerTag = dbTag } if err := tx.Create(&dbJob).Error; err != nil { @@ -233,7 +233,7 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { dbJob := Job{} findResult := db.gormDB.WithContext(ctx). Limit(1). - Preload("WorkerCluster"). + Preload("WorkerTag"). Find(&dbJob, "uuid = ?", jobUUID) if findResult.Error != nil { return nil, jobError(findResult.Error, "fetching job") diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index c12b09b7..da041ecf 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -108,16 +108,16 @@ func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) ( Select("uuid"). Where("id not in (?)", blockedWorkers) - if job.WorkerClusterID == nil { + if job.WorkerTagID == nil { // Count all workers, so no extra restrictions are necessary. } else { - // Only count workers in the job's cluster. - jobCluster := db.gormDB. - Table("worker_cluster_membership"). + // Only count workers in the job's tag. + jobTag := db.gormDB. + Table("worker_tag_membership"). Select("worker_id"). - Where("worker_cluster_id = ?", *job.WorkerClusterID) + Where("worker_tag_id = ?", *job.WorkerTagID) query = query. - Where("id in (?)", jobCluster) + Where("id in (?)", jobTag) } // Find the workers NOT blocked. diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index 97e3503d..d82a4092 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -126,14 +126,14 @@ func TestWorkersLeftToRun(t *testing.T) { worker1 := createWorker(ctx, t, db) worker2 := createWorkerFrom(ctx, t, db, *worker1) - // Create one worker cluster. It will not be used by this job, but one of the + // Create one worker tag. It will not be used by this job, but one of the // workers will be assigned to it. It can get this job's tasks, though. - // Because the job is clusterless, it can be run by all. - cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) + // Because the job is tagless, it can be run by all. + tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) uuidMap := func(workers ...*Worker) map[string]bool { @@ -172,43 +172,43 @@ func TestWorkersLeftToRun(t *testing.T) { } } -func TestWorkersLeftToRunWithClusters(t *testing.T) { +func TestWorkersLeftToRunWithTags(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create clusters. - cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - cluster2 := WorkerCluster{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Cluster 2"} - cluster3 := WorkerCluster{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Cluster 3"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster3)) + // Create tags. + tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + tag2 := WorkerTag{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Tag 2"} + tag3 := WorkerTag{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Tag 3"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag3)) - // Create a job in cluster1. + // Create a job in tag1. authoredJob := createTestAuthoredJobWithTasks() - authoredJob.WorkerClusterUUID = cluster1.UUID + authoredJob.WorkerTagUUID = tag1.UUID job := persistAuthoredJob(t, ctx, db, authoredJob) - // Clusters 1 + 3 + // Tags 1 + 3 workerC13 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c13c1313-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1, &cluster3} + w.Tags = []*WorkerTag{&tag1, &tag3} }) - // Cluster 1 + // Tag 1 workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Cluster 2 worker, this one should never appear. + // Tag 2 worker, this one should never appear. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c2c2c2c2-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster2} + w.Tags = []*WorkerTag{&tag2} }) - // No clusters, so should be able to run only clusterless jobs. Which is none + // No tags, so should be able to run only tagless jobs. Which is none // in this test. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "00000000-0000-1111-2222-333333333333" - w.Clusters = nil + w.Tags = nil }) uuidMap := func(workers ...*Worker) map[string]bool { @@ -219,7 +219,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) { return theMap } - // All Cluster 1 workers, no blocklist. + // All Tag 1 workers, no blocklist. left, err := db.WorkersLeftToRun(ctx, job, "blender") require.NoError(t, err) assert.Equal(t, uuidMap(workerC13, workerC1), left) @@ -230,7 +230,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) { require.NoError(t, err) assert.Equal(t, uuidMap(workerC13), left) - // All clustered workers blocked. + // All taged workers blocked. _ = db.AddWorkerToJobBlocklist(ctx, job, workerC13, "blender") left, err = db.WorkersLeftToRun(ctx, job, "blender") assert.NoError(t, err) diff --git a/internal/manager/persistence/jobs_query.go b/internal/manager/persistence/jobs_query.go index 63773b4a..77f295b8 100644 --- a/internal/manager/persistence/jobs_query.go +++ b/internal/manager/persistence/jobs_query.go @@ -64,7 +64,7 @@ func (db *DB) QueryJobs(ctx context.Context, apiQ api.JobsQuery) ([]*Job, error) } } - q.Preload("Cluster") + q.Preload("Tag") result := []*Job{} tx := q.Scan(&result) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 6fd36f66..f9623a4c 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -757,7 +757,7 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W Software: "3.0", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management", - Clusters: nil, + Tags: nil, } for _, updater := range updaters { diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 08f135f9..47560e5d 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -26,7 +26,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { logger := log.With().Str("worker", w.UUID).Logger() logger.Trace().Msg("finding task for worker") - hasWorkerClusters, err := db.HasWorkerClusters(ctx) + hasWorkerTags, err := db.HasWorkerTags(ctx) if err != nil { return nil, err } @@ -37,7 +37,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { var task *Task txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var err error - task, err = findTaskForWorker(tx, w, hasWorkerClusters) + task, err = findTaskForWorker(tx, w, hasWorkerTags) if err != nil { if isDatabaseBusyError(err) { logger.Trace().Err(err).Msg("database busy while finding task for worker") @@ -84,7 +84,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { return task, nil } -func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, error) { +func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerTags bool) (*Task, error) { task := Task{} // If a task is alreay active & assigned to this worker, return just that. @@ -129,21 +129,21 @@ func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, Where("TF.worker_id is NULL"). // Not failed before Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted - if checkWorkerClusters { - // The system has one or more clusters, so limit the available jobs to those - // that have no cluster, or overlap with the Worker's clusters. - if len(w.Clusters) == 0 { - // Clusterless workers only get clusterless jobs. + if checkWorkerTags { + // The system has one or more tags, so limit the available jobs to those + // that have no tag, or overlap with the Worker's tags. + if len(w.Tags) == 0 { + // Tagless workers only get tagless jobs. findTaskQuery = findTaskQuery. - Where("jobs.worker_cluster_id is NULL") + Where("jobs.worker_tag_id is NULL") } else { - // Clustered workers get clusterless jobs AND jobs of their own clusters. - clusterIDs := []uint{} - for _, cluster := range w.Clusters { - clusterIDs = append(clusterIDs, cluster.ID) + // Taged workers get tagless jobs AND jobs of their own tags. + tagIDs := []uint{} + for _, tag := range w.Tags { + tagIDs = append(tagIDs, tag.ID) } findTaskQuery = findTaskQuery. - Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs) + Where("jobs.worker_tag_id is NULL or worker_tag_id in ?", tagIDs) } } diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 90fe0b8f..0005bdfc 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -291,87 +291,87 @@ func TestPreviouslyFailed(t *testing.T) { assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") } -func TestWorkerClusterJobWithCluster(t *testing.T) { +func TestWorkerTagJobWithTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create worker clusters: - cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - cluster2 := WorkerCluster{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Cluster 2"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2)) + // Create worker tags: + tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + tag2 := WorkerTag{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Tag 2"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) - // Create a worker in cluster1: + // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Create a worker without cluster: + // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Clusters = nil + w.Tags = nil }) - { // Test job with different cluster: + { // Test job with different tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask) - job.WorkerClusterUUID = cluster2.UUID + job.WorkerTagUUID = tag2.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - assert.Nil(t, task, "job with different cluster should not be scheduled") + assert.Nil(t, task, "job with different tag should not be scheduled") } - { // Test job with matching cluster: + { // Test job with matching tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask) - job.WorkerClusterUUID = cluster1.UUID + job.WorkerTagUUID = tag1.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - require.NotNil(t, task, "job with matching cluster should be scheduled") + require.NotNil(t, task, "job with matching tag should be scheduled") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) - assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster") + assert.Nil(t, task, "job with tag should not be scheduled for worker without tag") } } -func TestWorkerClusterJobWithoutCluster(t *testing.T) { +func TestWorkerTagJobWithoutTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create worker cluster: - cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) + // Create worker tag: + tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) - // Create a worker in cluster1: + // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Create a worker without cluster: + // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Clusters = nil + w.Tags = nil }) - // Test cluster-less job: + // Test tag-less job: authTask := authorTestTask("the task", "blender") job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster") + require.NotNil(t, task, "job without tag should always be scheduled to worker in some tag") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) - require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster") + require.NotNil(t, task, "job without tag should always be scheduled to worker without tag") assert.Equal(t, authTask.UUID, task.UUID) } diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index 80832819..2b1351c4 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -96,8 +96,8 @@ type WorkerTestFixture struct { ctx context.Context done func() - worker *Worker - cluster *WorkerCluster + worker *Worker + tag *WorkerTag } func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTestFixture { @@ -113,21 +113,21 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe SupportedTaskTypes: "blender,ffmpeg,file-management", } - wc := WorkerCluster{ + wc := WorkerTag{ UUID: uuid.New(), Name: "arbejdsklynge", - Description: "Worker cluster in Danish", + Description: "Worker tag in Danish", } require.NoError(t, db.CreateWorker(ctx, &w)) - require.NoError(t, db.CreateWorkerCluster(ctx, &wc)) + require.NoError(t, db.CreateWorkerTag(ctx, &wc)) return WorkerTestFixture{ db: db, ctx: ctx, done: cancel, - worker: &w, - cluster: &wc, + worker: &w, + tag: &wc, } } diff --git a/internal/manager/persistence/worker_cluster.go b/internal/manager/persistence/worker_cluster.go deleted file mode 100644 index 60974e6d..00000000 --- a/internal/manager/persistence/worker_cluster.go +++ /dev/null @@ -1,112 +0,0 @@ -package persistence - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "context" - "fmt" - - "gorm.io/gorm" -) - -type WorkerCluster struct { - Model - - UUID string `gorm:"type:char(36);default:'';unique;index"` - Name string `gorm:"type:varchar(64);default:'';unique"` - Description string `gorm:"type:varchar(255);default:''"` - - Workers []*Worker `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"` -} - -func (db *DB) CreateWorkerCluster(ctx context.Context, wc *WorkerCluster) error { - if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil { - return fmt.Errorf("creating new worker cluster: %w", err) - } - return nil -} - -// HasWorkerClusters returns whether there are any clusters defined at all. -func (db *DB) HasWorkerClusters(ctx context.Context) (bool, error) { - var count int64 - tx := db.gormDB.WithContext(ctx). - Model(&WorkerCluster{}). - Count(&count) - if err := tx.Error; err != nil { - return false, workerClusterError(err, "counting worker clusters") - } - return count > 0, nil -} - -func (db *DB) FetchWorkerCluster(ctx context.Context, uuid string) (*WorkerCluster, error) { - tx := db.gormDB.WithContext(ctx) - return fetchWorkerCluster(tx, uuid) -} - -// fetchWorkerCluster fetches the worker cluster using the given database instance. -func fetchWorkerCluster(gormDB *gorm.DB, uuid string) (*WorkerCluster, error) { - w := WorkerCluster{} - tx := gormDB.First(&w, "uuid = ?", uuid) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching worker cluster") - } - return &w, nil -} - -func (db *DB) SaveWorkerCluster(ctx context.Context, cluster *WorkerCluster) error { - if err := db.gormDB.WithContext(ctx).Save(cluster).Error; err != nil { - return workerClusterError(err, "saving worker cluster") - } - return nil -} - -// DeleteWorkerCluster deletes the given cluster, after unassigning all workers from it. -func (db *DB) DeleteWorkerCluster(ctx context.Context, uuid string) error { - tx := db.gormDB.WithContext(ctx). - Where("uuid = ?", uuid). - Delete(&WorkerCluster{}) - if tx.Error != nil { - return workerClusterError(tx.Error, "deleting worker cluster") - } - if tx.RowsAffected == 0 { - return ErrWorkerClusterNotFound - } - return nil -} - -func (db *DB) FetchWorkerClusters(ctx context.Context) ([]*WorkerCluster, error) { - clusters := make([]*WorkerCluster, 0) - tx := db.gormDB.WithContext(ctx).Model(&WorkerCluster{}).Scan(&clusters) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching all worker clusters") - } - return clusters, nil -} - -func (db *DB) fetchWorkerClustersWithUUID(ctx context.Context, clusterUUIDs []string) ([]*WorkerCluster, error) { - clusters := make([]*WorkerCluster, 0) - tx := db.gormDB.WithContext(ctx). - Model(&WorkerCluster{}). - Where("uuid in ?", clusterUUIDs). - Scan(&clusters) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching all worker clusters") - } - return clusters, nil -} - -func (db *DB) WorkerSetClusters(ctx context.Context, worker *Worker, clusterUUIDs []string) error { - clusters, err := db.fetchWorkerClustersWithUUID(ctx, clusterUUIDs) - if err != nil { - return workerClusterError(err, "fetching worker clusters") - } - - err = db.gormDB.WithContext(ctx). - Model(worker). - Association("Clusters"). - Replace(clusters) - if err != nil { - return workerClusterError(err, "updating worker clusters") - } - return nil -} diff --git a/internal/manager/persistence/worker_cluster_test.go b/internal/manager/persistence/worker_cluster_test.go deleted file mode 100644 index ed054c6b..00000000 --- a/internal/manager/persistence/worker_cluster_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package persistence - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "testing" - "time" - - "git.blender.org/flamenco/internal/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCreateFetchCluster(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Test fetching non-existent cluster - fetchedCluster, err := f.db.FetchWorkerCluster(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f") - assert.ErrorIs(t, err, ErrWorkerClusterNotFound) - assert.Nil(t, fetchedCluster) - - // New cluster creation is already done in the workerTestFixtures() call. - assert.NotNil(t, f.cluster) - - fetchedCluster, err = f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) - require.NoError(t, err) - assert.NotNil(t, fetchedCluster) - - // Test contents of fetched cluster. - assert.Equal(t, f.cluster.UUID, fetchedCluster.UUID) - assert.Equal(t, f.cluster.Name, fetchedCluster.Name) - assert.Equal(t, f.cluster.Description, fetchedCluster.Description) - assert.Zero(t, fetchedCluster.Workers) -} - -func TestFetchDeleteClusters(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Single cluster was created by fixture. - has, err := f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.True(t, has, "expecting HasWorkerClusters to return true") - - secondCluster := WorkerCluster{ - UUID: uuid.New(), - Name: "arbeiderscluster", - Description: "Worker cluster in Dutch", - } - - require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster)) - - allClusters, err := f.db.FetchWorkerClusters(f.ctx) - require.NoError(t, err) - - require.Len(t, allClusters, 2) - var allClusterIDs [2]string - for idx := range allClusters { - allClusterIDs[idx] = allClusters[idx].UUID - } - assert.Contains(t, allClusterIDs, f.cluster.UUID) - assert.Contains(t, allClusterIDs, secondCluster.UUID) - - has, err = f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.True(t, has, "expecting HasWorkerClusters to return true") - - // Test deleting the 2nd cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, secondCluster.UUID)) - - allClusters, err = f.db.FetchWorkerClusters(f.ctx) - require.NoError(t, err) - require.Len(t, allClusters, 1) - assert.Equal(t, f.cluster.UUID, allClusters[0].UUID) - - // Test deleting the 1st cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID)) - has, err = f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.False(t, has, "expecting HasWorkerClusters to return false") -} - -func TestAssignUnassignWorkerClusters(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - assertClusters := func(msgLabel string, clusterUUIDs ...string) { - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) - require.NoError(t, err) - - // Catch doubly-reported clusters, as the maps below would hide those cases. - assert.Len(t, w.Clusters, len(clusterUUIDs), msgLabel) - - expectClusters := make(map[string]bool) - for _, cid := range clusterUUIDs { - expectClusters[cid] = true - } - - actualClusters := make(map[string]bool) - for _, c := range w.Clusters { - actualClusters[c.UUID] = true - } - - assert.Equal(t, expectClusters, actualClusters, msgLabel) - } - - secondCluster := WorkerCluster{ - UUID: uuid.New(), - Name: "arbeiderscluster", - Description: "Worker cluster in Dutch", - } - - require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster)) - - // By default the Worker should not be part of a cluster. - assertClusters("default cluster assignment") - - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) - assertClusters("setting one cluster", f.cluster.UUID) - - // Double assignments should also just work. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, f.cluster.UUID})) - assertClusters("setting twice the same cluster", f.cluster.UUID) - - // Multiple cluster memberships. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, secondCluster.UUID})) - assertClusters("setting two different clusters", f.cluster.UUID, secondCluster.UUID) - - // Remove memberships. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{secondCluster.UUID})) - assertClusters("unassigning from first cluster", secondCluster.UUID) - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{})) - assertClusters("unassigning from second cluster") -} - -func TestSaveWorkerCluster(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - f.cluster.Name = "übercluster" - f.cluster.Description = "ʻO kēlā hui ma laila" - require.NoError(t, f.db.SaveWorkerCluster(f.ctx, f.cluster)) - - fetched, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) - require.NoError(t, err) - assert.Equal(t, f.cluster.Name, fetched.Name) - assert.Equal(t, f.cluster.Description, fetched.Description) -} - -func TestDeleteWorkerClusterWithWorkersAssigned(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Assign the worker. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) - - // Delete the cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID)) - - // Check the Worker has been unassigned from the cluster. - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) - require.NoError(t, err) - assert.Empty(t, w.Clusters) -} diff --git a/internal/manager/persistence/worker_tag.go b/internal/manager/persistence/worker_tag.go new file mode 100644 index 00000000..6e0c1506 --- /dev/null +++ b/internal/manager/persistence/worker_tag.go @@ -0,0 +1,112 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + + "gorm.io/gorm" +) + +type WorkerTag struct { + Model + + UUID string `gorm:"type:char(36);default:'';unique;index"` + Name string `gorm:"type:varchar(64);default:'';unique"` + Description string `gorm:"type:varchar(255);default:''"` + + Workers []*Worker `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"` +} + +func (db *DB) CreateWorkerTag(ctx context.Context, wc *WorkerTag) error { + if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil { + return fmt.Errorf("creating new worker tag: %w", err) + } + return nil +} + +// HasWorkerTags returns whether there are any tags defined at all. +func (db *DB) HasWorkerTags(ctx context.Context) (bool, error) { + var count int64 + tx := db.gormDB.WithContext(ctx). + Model(&WorkerTag{}). + Count(&count) + if err := tx.Error; err != nil { + return false, workerTagError(err, "counting worker tags") + } + return count > 0, nil +} + +func (db *DB) FetchWorkerTag(ctx context.Context, uuid string) (*WorkerTag, error) { + tx := db.gormDB.WithContext(ctx) + return fetchWorkerTag(tx, uuid) +} + +// fetchWorkerTag fetches the worker tag using the given database instance. +func fetchWorkerTag(gormDB *gorm.DB, uuid string) (*WorkerTag, error) { + w := WorkerTag{} + tx := gormDB.First(&w, "uuid = ?", uuid) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching worker tag") + } + return &w, nil +} + +func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error { + if err := db.gormDB.WithContext(ctx).Save(tag).Error; err != nil { + return workerTagError(err, "saving worker tag") + } + return nil +} + +// DeleteWorkerTag deletes the given tag, after unassigning all workers from it. +func (db *DB) DeleteWorkerTag(ctx context.Context, uuid string) error { + tx := db.gormDB.WithContext(ctx). + Where("uuid = ?", uuid). + Delete(&WorkerTag{}) + if tx.Error != nil { + return workerTagError(tx.Error, "deleting worker tag") + } + if tx.RowsAffected == 0 { + return ErrWorkerTagNotFound + } + return nil +} + +func (db *DB) FetchWorkerTags(ctx context.Context) ([]*WorkerTag, error) { + tags := make([]*WorkerTag, 0) + tx := db.gormDB.WithContext(ctx).Model(&WorkerTag{}).Scan(&tags) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching all worker tags") + } + return tags, nil +} + +func (db *DB) fetchWorkerTagsWithUUID(ctx context.Context, tagUUIDs []string) ([]*WorkerTag, error) { + tags := make([]*WorkerTag, 0) + tx := db.gormDB.WithContext(ctx). + Model(&WorkerTag{}). + Where("uuid in ?", tagUUIDs). + Scan(&tags) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching all worker tags") + } + return tags, nil +} + +func (db *DB) WorkerSetTags(ctx context.Context, worker *Worker, tagUUIDs []string) error { + tags, err := db.fetchWorkerTagsWithUUID(ctx, tagUUIDs) + if err != nil { + return workerTagError(err, "fetching worker tags") + } + + err = db.gormDB.WithContext(ctx). + Model(worker). + Association("Tags"). + Replace(tags) + if err != nil { + return workerTagError(err, "updating worker tags") + } + return nil +} diff --git a/internal/manager/persistence/worker_tag_test.go b/internal/manager/persistence/worker_tag_test.go new file mode 100644 index 00000000..372332fd --- /dev/null +++ b/internal/manager/persistence/worker_tag_test.go @@ -0,0 +1,165 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + "time" + + "git.blender.org/flamenco/internal/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateFetchTag(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Test fetching non-existent tag + fetchedTag, err := f.db.FetchWorkerTag(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f") + assert.ErrorIs(t, err, ErrWorkerTagNotFound) + assert.Nil(t, fetchedTag) + + // New tag creation is already done in the workerTestFixtures() call. + assert.NotNil(t, f.tag) + + fetchedTag, err = f.db.FetchWorkerTag(f.ctx, f.tag.UUID) + require.NoError(t, err) + assert.NotNil(t, fetchedTag) + + // Test contents of fetched tag. + assert.Equal(t, f.tag.UUID, fetchedTag.UUID) + assert.Equal(t, f.tag.Name, fetchedTag.Name) + assert.Equal(t, f.tag.Description, fetchedTag.Description) + assert.Zero(t, fetchedTag.Workers) +} + +func TestFetchDeleteTags(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Single tag was created by fixture. + has, err := f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerTags to return true") + + secondTag := WorkerTag{ + UUID: uuid.New(), + Name: "arbeiderstag", + Description: "Worker tag in Dutch", + } + + require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag)) + + allTags, err := f.db.FetchWorkerTags(f.ctx) + require.NoError(t, err) + + require.Len(t, allTags, 2) + var allTagIDs [2]string + for idx := range allTags { + allTagIDs[idx] = allTags[idx].UUID + } + assert.Contains(t, allTagIDs, f.tag.UUID) + assert.Contains(t, allTagIDs, secondTag.UUID) + + has, err = f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerTags to return true") + + // Test deleting the 2nd tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, secondTag.UUID)) + + allTags, err = f.db.FetchWorkerTags(f.ctx) + require.NoError(t, err) + require.Len(t, allTags, 1) + assert.Equal(t, f.tag.UUID, allTags[0].UUID) + + // Test deleting the 1st tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) + has, err = f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.False(t, has, "expecting HasWorkerTags to return false") +} + +func TestAssignUnassignWorkerTags(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + assertTags := func(msgLabel string, tagUUIDs ...string) { + w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + require.NoError(t, err) + + // Catch doubly-reported tags, as the maps below would hide those cases. + assert.Len(t, w.Tags, len(tagUUIDs), msgLabel) + + expectTags := make(map[string]bool) + for _, cid := range tagUUIDs { + expectTags[cid] = true + } + + actualTags := make(map[string]bool) + for _, c := range w.Tags { + actualTags[c.UUID] = true + } + + assert.Equal(t, expectTags, actualTags, msgLabel) + } + + secondTag := WorkerTag{ + UUID: uuid.New(), + Name: "arbeiderstag", + Description: "Worker tag in Dutch", + } + + require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag)) + + // By default the Worker should not be part of a tag. + assertTags("default tag assignment") + + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) + assertTags("setting one tag", f.tag.UUID) + + // Double assignments should also just work. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, f.tag.UUID})) + assertTags("setting twice the same tag", f.tag.UUID) + + // Multiple tag memberships. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, secondTag.UUID})) + assertTags("setting two different tags", f.tag.UUID, secondTag.UUID) + + // Remove memberships. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{secondTag.UUID})) + assertTags("unassigning from first tag", secondTag.UUID) + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{})) + assertTags("unassigning from second tag") +} + +func TestSaveWorkerTag(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + f.tag.Name = "übertag" + f.tag.Description = "ʻO kēlā hui ma laila" + require.NoError(t, f.db.SaveWorkerTag(f.ctx, f.tag)) + + fetched, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID) + require.NoError(t, err) + assert.Equal(t, f.tag.Name, fetched.Name) + assert.Equal(t, f.tag.Description, fetched.Description) +} + +func TestDeleteWorkerTagWithWorkersAssigned(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Assign the worker. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) + + // Delete the tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) + + // Check the Worker has been unassigned from the tag. + w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + require.NoError(t, err) + assert.Empty(t, w.Tags) +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index b71996ab..4fe25431 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -31,7 +31,7 @@ type Worker struct { SupportedTaskTypes string `gorm:"type:varchar(255);default:''"` // comma-separated list of task types. - Clusters []*WorkerCluster `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"` + Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"` } func (w *Worker) Identifier() string { @@ -73,7 +73,7 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { w := Worker{} tx := db.gormDB.WithContext(ctx). - Preload("Clusters"). + Preload("Tags"). First(&w, "uuid = ?", uuid) if tx.Error != nil { return nil, workerError(tx.Error, "fetching worker") diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index 38417f01..a92bf705 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -319,18 +319,18 @@ func TestDeleteWorker(t *testing.T) { } } -func TestDeleteWorkerWithClusterAssigned(t *testing.T) { +func TestDeleteWorkerWithTagAssigned(t *testing.T) { f := workerTestFixtures(t, 1*time.Second) defer f.done() // Assign the worker. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) // Delete the Worker. require.NoError(t, f.db.DeleteWorker(f.ctx, f.worker.UUID)) - // Check the Worker has been unassigned from the cluster. - cluster, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) + // Check the Worker has been unassigned from the tag. + tag, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID) require.NoError(t, err) - assert.Empty(t, cluster.Workers) + assert.Empty(t, tag.Workers) } diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/webupdates/worker_updates.go index a088cdb3..a60f40d8 100644 --- a/internal/manager/webupdates/worker_updates.go +++ b/internal/manager/webupdates/worker_updates.go @@ -32,7 +32,7 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { workerUpdate.LastSeen = &worker.LastSeenAt } - // TODO: add cluster IDs. + // TODO: add tag IDs. return workerUpdate } diff --git a/web/app/src/components/jobs/JobDetails.vue b/web/app/src/components/jobs/JobDetails.vue index 6198b7ee..55ca5d08 100644 --- a/web/app/src/components/jobs/JobDetails.vue +++ b/web/app/src/components/jobs/JobDetails.vue @@ -32,12 +32,17 @@
ID
{{ jobData.id }}
-