diff --git a/CHANGELOG.md b/CHANGELOG.md index 241b1d80..c9d8bc6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ bugs in actually-released versions. - Improve speed of queueing up >100 simultaneous job deletions. - Improve logging of job deletion. -- Add Worker Cluster support. Workers can be members of any number of clusters. Workers will only work on jobs that are assigned to that cluster. Jobs that do not have a cluster will be available to all workers, regardless of their cluster assignment. As a result, clusterless workers will only work on clusterless jobs. +- Add Worker Tag support. Workers can be members of any number of tags. Workers will only work on jobs that are assigned to that tag. Jobs that do not have a tag will be available to all workers, regardless of their tag assignment. As a result, tagless workers will only work on tagless jobs. - Fix limitation where a job could have no more than 1000 tasks ([#104201](https://projects.blender.org/studio/flamenco/issues/104201)) - Add support for finding the top-level 'project' directory. When submitting files to Flamenco, the add-on will try to retain the directory structure of your Blender project as precisely as possible. This new feature allows the add-on to find the top-level directory of your project by finding a `.blender_project`, `.git`, or `.subversion` directory. This can be configured in the add-on preferences. - Worker status is remembered when they sign off, so that workers when they come back online do so to the same state ([#99549](https://projects.blender.org/studio/flamenco/issues/99549)). diff --git a/addon/flamenco/__init__.py b/addon/flamenco/__init__.py index 10de1bc9..7dca8fc3 100644 --- a/addon/flamenco/__init__.py +++ b/addon/flamenco/__init__.py @@ -26,7 +26,7 @@ if __is_first_load: comms, preferences, projects, - worker_clusters, + worker_tags, ) else: import importlib @@ -37,7 +37,7 @@ else: comms = importlib.reload(comms) preferences = importlib.reload(preferences) projects = importlib.reload(projects) - worker_clusters = importlib.reload(worker_clusters) + worker_tags = importlib.reload(worker_tags) import bpy @@ -155,7 +155,7 @@ def register() -> None: ) preferences.register() - worker_clusters.register() + worker_tags.register() operators.register() gui.register() job_types.register() @@ -173,5 +173,5 @@ def unregister() -> None: job_types.unregister() gui.unregister() operators.unregister() - worker_clusters.unregister() + worker_tags.unregister() preferences.unregister() diff --git a/addon/flamenco/gui.py b/addon/flamenco/gui.py index 9ff7d427..d9697f75 100644 --- a/addon/flamenco/gui.py +++ b/addon/flamenco/gui.py @@ -43,10 +43,10 @@ class FLAMENCO_PT_job_submission(bpy.types.Panel): col.prop(context.scene, "flamenco_job_name", text="Job Name") col.prop(context.scene, "flamenco_job_priority", text="Priority") - # Worker cluster: + # Worker tag: row = col.row(align=True) - row.prop(context.scene, "flamenco_worker_cluster", text="Cluster") - row.operator("flamenco.fetch_worker_clusters", text="", icon="FILE_REFRESH") + row.prop(context.scene, "flamenco_worker_tag", text="Tag") + row.operator("flamenco.fetch_worker_tags", text="", icon="FILE_REFRESH") layout.separator() diff --git a/addon/flamenco/job_submission.py b/addon/flamenco/job_submission.py index 95344474..cb40a5c2 100644 --- a/addon/flamenco/job_submission.py +++ b/addon/flamenco/job_submission.py @@ -54,9 +54,9 @@ def job_for_scene(scene: bpy.types.Scene) -> Optional[_SubmittedJob]: type_etag=propgroup.job_type.etag, ) - worker_cluster: str = getattr(scene, "flamenco_worker_cluster", "") - if worker_cluster and worker_cluster != "-": - job.worker_cluster = worker_cluster + worker_tag: str = getattr(scene, "flamenco_worker_tag", "") + if worker_tag and worker_tag != "-": + job.worker_tag = worker_tag return job diff --git a/addon/flamenco/operators.py b/addon/flamenco/operators.py index 5ee0a0e3..0b6c883b 100644 --- a/addon/flamenco/operators.py +++ b/addon/flamenco/operators.py @@ -10,7 +10,7 @@ from urllib3.exceptions import HTTPError, MaxRetryError import bpy -from . import job_types, job_submission, preferences, worker_clusters +from . import job_types, job_submission, preferences, worker_tags from .job_types_propgroup import JobTypePropertyGroup from .bat.submodules import bpathlib @@ -83,10 +83,10 @@ class FLAMENCO_OT_fetch_job_types(FlamencoOpMixin, bpy.types.Operator): return {"FINISHED"} -class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): - bl_idname = "flamenco.fetch_worker_clusters" - bl_label = "Fetch Worker Clusters" - bl_description = "Query Flamenco Manager to obtain the available worker clusters" +class FLAMENCO_OT_fetch_worker_tags(FlamencoOpMixin, bpy.types.Operator): + bl_idname = "flamenco.fetch_worker_tags" + bl_label = "Fetch Worker Tags" + bl_description = "Query Flamenco Manager to obtain the available worker tags" def execute(self, context: bpy.types.Context) -> set[str]: api_client = self.get_api_client(context) @@ -94,10 +94,10 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): from flamenco.manager import ApiException scene = context.scene - old_cluster = getattr(scene, "flamenco_worker_cluster", "") + old_tag = getattr(scene, "flamenco_worker_tag", "") try: - worker_clusters.refresh(context, api_client) + worker_tags.refresh(context, api_client) except ApiException as ex: self.report({"ERROR"}, "Error getting job types: %s" % ex) return {"CANCELLED"} @@ -107,9 +107,9 @@ class FLAMENCO_OT_fetch_worker_clusters(FlamencoOpMixin, bpy.types.Operator): self.report({"ERROR"}, "Unable to reach Manager") return {"CANCELLED"} - if old_cluster: - # TODO: handle cases where the old cluster no longer exists. - scene.flamenco_worker_cluster = old_cluster + if old_tag: + # TODO: handle cases where the old tag no longer exists. + scene.flamenco_worker_tag = old_tag return {"FINISHED"} @@ -669,7 +669,7 @@ class FLAMENCO3_OT_explore_file_path(bpy.types.Operator): classes = ( FLAMENCO_OT_fetch_job_types, - FLAMENCO_OT_fetch_worker_clusters, + FLAMENCO_OT_fetch_worker_tags, FLAMENCO_OT_ping_manager, FLAMENCO_OT_eval_setting, FLAMENCO_OT_submit_job, diff --git a/addon/flamenco/preferences.py b/addon/flamenco/preferences.py index 31bc231c..2b5daa27 100644 --- a/addon/flamenco/preferences.py +++ b/addon/flamenco/preferences.py @@ -43,7 +43,7 @@ _project_finder_enum_items = [ ] -class WorkerCluster(bpy.types.PropertyGroup): +class WorkerTag(bpy.types.PropertyGroup): id: bpy.props.StringProperty(name="id") # type: ignore name: bpy.props.StringProperty(name="Name") # type: ignore description: bpy.props.StringProperty(name="Description") # type: ignore @@ -93,10 +93,10 @@ class FlamencoPreferences(bpy.types.AddonPreferences): get=lambda prefs: prefs.job_storage, ) - worker_clusters: bpy.props.CollectionProperty( # type: ignore - type=WorkerCluster, - name="Worker Clusters", - description="Cache for the worker clusters available on the configured Manager", + worker_tags: bpy.props.CollectionProperty( # type: ignore + type=WorkerTag, + name="Worker Tags", + description="Cache for the worker tags available on the configured Manager", options={"HIDDEN"}, ) @@ -169,7 +169,7 @@ def manager_url(context: bpy.types.Context) -> str: classes = ( - WorkerCluster, + WorkerTag, FlamencoPreferences, ) _register, _unregister = bpy.utils.register_classes_factory(classes) diff --git a/addon/flamenco/worker_clusters.py b/addon/flamenco/worker_tags.py similarity index 62% rename from addon/flamenco/worker_clusters.py rename to addon/flamenco/worker_tags.py index 2604f488..2ed75cbb 100644 --- a/addon/flamenco/worker_clusters.py +++ b/addon/flamenco/worker_tags.py @@ -16,25 +16,25 @@ _enum_items: list[Union[tuple[str, str, str], tuple[str, str, str, int, int]]] = def refresh(context: bpy.types.Context, api_client: _ApiClient) -> None: - """Fetch the available worker clusters from the Manager.""" + """Fetch the available worker tags from the Manager.""" from flamenco.manager import ApiClient from flamenco.manager.api import worker_mgt_api - from flamenco.manager.model.worker_cluster_list import WorkerClusterList + from flamenco.manager.model.worker_tag_list import WorkerTagList assert isinstance(api_client, ApiClient) api = worker_mgt_api.WorkerMgtApi(api_client) - response: WorkerClusterList = api.fetch_worker_clusters() + response: WorkerTagList = api.fetch_worker_tags() # Store on the preferences, so a cached version persists until the next refresh. prefs = preferences.get(context) - prefs.worker_clusters.clear() + prefs.worker_tags.clear() - for cluster in response.clusters: - rna_cluster = prefs.worker_clusters.add() - rna_cluster.id = cluster.id - rna_cluster.name = cluster.name - rna_cluster.description = getattr(cluster, "description", "") + for tag in response.tags: + rna_tag = prefs.worker_tags.add() + rna_tag.id = tag.id + rna_tag.name = tag.name + rna_tag.description = getattr(tag, "description", "") # Preferences have changed, so make sure that Blender saves them (assuming # auto-save here). @@ -46,25 +46,25 @@ def _get_enum_items(self, context): prefs = preferences.get(context) _enum_items = [ - ("-", "All", "No specific cluster assigned, any worker can handle this job"), + ("-", "All", "No specific tag assigned, any worker can handle this job"), ] _enum_items.extend( - (cluster.id, cluster.name, cluster.description) - for cluster in prefs.worker_clusters + (tag.id, tag.name, tag.description) + for tag in prefs.worker_tags ) return _enum_items def register() -> None: - bpy.types.Scene.flamenco_worker_cluster = bpy.props.EnumProperty( - name="Worker Cluster", + bpy.types.Scene.flamenco_worker_tag = bpy.props.EnumProperty( + name="Worker Tag", items=_get_enum_items, description="The set of Workers that can handle tasks of this job", ) def unregister() -> None: - to_del = ((bpy.types.Scene, "flamenco_worker_cluster"),) + to_del = ((bpy.types.Scene, "flamenco_worker_tag"),) for ob, attr in to_del: try: delattr(ob, attr) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 17fd7c91..7b0c5c31 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -65,13 +65,13 @@ type PersistenceService interface { RemoveFromJobBlocklist(ctx context.Context, jobUUID, workerUUID, taskType string) error ClearJobBlocklist(ctx context.Context, job *persistence.Job) error - // Worker cluster management. - WorkerSetClusters(ctx context.Context, worker *persistence.Worker, clusterUUIDs []string) error - CreateWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error - FetchWorkerCluster(ctx context.Context, uuid string) (*persistence.WorkerCluster, error) - FetchWorkerClusters(ctx context.Context) ([]*persistence.WorkerCluster, error) - DeleteWorkerCluster(ctx context.Context, uuid string) error - SaveWorkerCluster(ctx context.Context, cluster *persistence.WorkerCluster) error + // Worker tag management. + WorkerSetTags(ctx context.Context, worker *persistence.Worker, tagUUIDs []string) error + CreateWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error + FetchWorkerTag(ctx context.Context, uuid string) (*persistence.WorkerTag, error) + FetchWorkerTags(ctx context.Context) ([]*persistence.WorkerTag, error) + DeleteWorkerTag(ctx context.Context, uuid string) error + SaveWorkerTag(ctx context.Context, tag *persistence.WorkerTag) error // WorkersLeftToRun returns a set of worker UUIDs that can run tasks of the given type on the given job. WorkersLeftToRun(ctx context.Context, job *persistence.Job, taskType string) (map[string]bool, error) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index fcd84a7e..aca62069 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -618,8 +618,8 @@ func jobDBtoAPI(dbJob *persistence.Job) api.Job { if dbJob.DeleteRequestedAt.Valid { apiJob.DeleteRequestedAt = &dbJob.DeleteRequestedAt.Time } - if dbJob.WorkerCluster != nil { - apiJob.WorkerCluster = &dbJob.WorkerCluster.UUID + if dbJob.WorkerTag != nil { + apiJob.WorkerTag = &dbJob.WorkerTag.UUID } return apiJob diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 5c64babf..fa666fe8 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -320,19 +320,19 @@ func TestSubmitJobWithShamanCheckoutID(t *testing.T) { assert.NoError(t, err) } -func TestSubmitJobWithWorkerCluster(t *testing.T) { +func TestSubmitJobWithWorkerTag(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() mf := newMockedFlamenco(mockCtrl) worker := testWorker() - workerClusterUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd" - cluster := persistence.WorkerCluster{ + workerTagUUID := "04435762-9dc8-4f13-80b7-643a6fa5b6fd" + tag := persistence.WorkerTag{ Model: persistence.Model{ID: 47}, - UUID: workerClusterUUID, - Name: "first cluster", - Description: "my first cluster", + UUID: workerTagUUID, + Name: "first tag", + Description: "my first tag", } submittedJob := api.SubmittedJob{ @@ -340,7 +340,7 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { Type: "test", Priority: 50, SubmitterPlatform: worker.Platform, - WorkerCluster: &workerClusterUUID, + WorkerTag: &workerTagUUID, } mf.expectConvertTwoWayVariables(t, @@ -351,8 +351,8 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { // Expect the job compiler to be called. authoredJob := job_compilers.AuthoredJob{ - JobID: "afc47568-bd9d-4368-8016-e91d945db36d", - WorkerClusterUUID: workerClusterUUID, + JobID: "afc47568-bd9d-4368-8016-e91d945db36d", + WorkerTagUUID: workerTagUUID, Name: submittedJob.Name, JobType: submittedJob.Type, @@ -382,8 +382,8 @@ func TestSubmitJobWithWorkerCluster(t *testing.T) { Settings: persistence.StringInterfaceMap{}, Metadata: persistence.StringStringMap{}, - WorkerClusterID: &cluster.ID, - WorkerCluster: &cluster, + WorkerTagID: &tag.ID, + WorkerTag: &tag, } mf.persistence.EXPECT().FetchJob(gomock.Any(), queuedJob.JobID).Return(&dbJob, nil) diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 31a8aafa..6817cfce 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -141,18 +141,18 @@ func (mr *MockPersistenceServiceMockRecorder) CreateWorker(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorker", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorker), arg0, arg1) } -// CreateWorkerCluster mocks base method. -func (m *MockPersistenceService) CreateWorkerCluster(arg0 context.Context, arg1 *persistence.WorkerCluster) error { +// CreateWorkerTag mocks base method. +func (m *MockPersistenceService) CreateWorkerTag(arg0 context.Context, arg1 *persistence.WorkerTag) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateWorkerCluster", arg0, arg1) + ret := m.ctrl.Call(m, "CreateWorkerTag", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// CreateWorkerCluster indicates an expected call of CreateWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) CreateWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// CreateWorkerTag indicates an expected call of CreateWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) CreateWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).CreateWorkerTag), arg0, arg1) } // DeleteWorker mocks base method. @@ -169,18 +169,18 @@ func (mr *MockPersistenceServiceMockRecorder) DeleteWorker(arg0, arg1 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorker", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorker), arg0, arg1) } -// DeleteWorkerCluster mocks base method. -func (m *MockPersistenceService) DeleteWorkerCluster(arg0 context.Context, arg1 string) error { +// DeleteWorkerTag mocks base method. +func (m *MockPersistenceService) DeleteWorkerTag(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteWorkerCluster", arg0, arg1) + ret := m.ctrl.Call(m, "DeleteWorkerTag", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// DeleteWorkerCluster indicates an expected call of DeleteWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) DeleteWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// DeleteWorkerTag indicates an expected call of DeleteWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) DeleteWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).DeleteWorkerTag), arg0, arg1) } // FetchJob mocks base method. @@ -258,34 +258,34 @@ func (mr *MockPersistenceServiceMockRecorder) FetchWorker(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorker", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorker), arg0, arg1) } -// FetchWorkerCluster mocks base method. -func (m *MockPersistenceService) FetchWorkerCluster(arg0 context.Context, arg1 string) (*persistence.WorkerCluster, error) { +// FetchWorkerTag mocks base method. +func (m *MockPersistenceService) FetchWorkerTag(arg0 context.Context, arg1 string) (*persistence.WorkerTag, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWorkerCluster", arg0, arg1) - ret0, _ := ret[0].(*persistence.WorkerCluster) + ret := m.ctrl.Call(m, "FetchWorkerTag", arg0, arg1) + ret0, _ := ret[0].(*persistence.WorkerTag) ret1, _ := ret[1].(error) return ret0, ret1 } -// FetchWorkerCluster indicates an expected call of FetchWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) FetchWorkerCluster(arg0, arg1 interface{}) *gomock.Call { +// FetchWorkerTag indicates an expected call of FetchWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTag(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerCluster), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTag), arg0, arg1) } -// FetchWorkerClusters mocks base method. -func (m *MockPersistenceService) FetchWorkerClusters(arg0 context.Context) ([]*persistence.WorkerCluster, error) { +// FetchWorkerTags mocks base method. +func (m *MockPersistenceService) FetchWorkerTags(arg0 context.Context) ([]*persistence.WorkerTag, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWorkerClusters", arg0) - ret0, _ := ret[0].([]*persistence.WorkerCluster) + ret := m.ctrl.Call(m, "FetchWorkerTags", arg0) + ret0, _ := ret[0].([]*persistence.WorkerTag) ret1, _ := ret[1].(error) return ret0, ret1 } -// FetchWorkerClusters indicates an expected call of FetchWorkerClusters. -func (mr *MockPersistenceServiceMockRecorder) FetchWorkerClusters(arg0 interface{}) *gomock.Call { +// FetchWorkerTags indicates an expected call of FetchWorkerTags. +func (mr *MockPersistenceServiceMockRecorder) FetchWorkerTags(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerClusters", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerClusters), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerTags", reflect.TypeOf((*MockPersistenceService)(nil).FetchWorkerTags), arg0) } // FetchWorkerTask mocks base method. @@ -433,20 +433,6 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorker(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorker", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorker), arg0, arg1) } -// SaveWorkerCluster mocks base method. -func (m *MockPersistenceService) SaveWorkerCluster(arg0 context.Context, arg1 *persistence.WorkerCluster) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SaveWorkerCluster", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SaveWorkerCluster indicates an expected call of SaveWorkerCluster. -func (mr *MockPersistenceServiceMockRecorder) SaveWorkerCluster(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerCluster", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerCluster), arg0, arg1) -} - // SaveWorkerStatus mocks base method. func (m *MockPersistenceService) SaveWorkerStatus(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() @@ -461,6 +447,20 @@ func (mr *MockPersistenceServiceMockRecorder) SaveWorkerStatus(arg0, arg1 interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerStatus", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerStatus), arg0, arg1) } +// SaveWorkerTag mocks base method. +func (m *MockPersistenceService) SaveWorkerTag(arg0 context.Context, arg1 *persistence.WorkerTag) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveWorkerTag", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveWorkerTag indicates an expected call of SaveWorkerTag. +func (mr *MockPersistenceServiceMockRecorder) SaveWorkerTag(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveWorkerTag", reflect.TypeOf((*MockPersistenceService)(nil).SaveWorkerTag), arg0, arg1) +} + // ScheduleTask mocks base method. func (m *MockPersistenceService) ScheduleTask(arg0 context.Context, arg1 *persistence.Worker) (*persistence.Task, error) { m.ctrl.T.Helper() @@ -532,18 +532,18 @@ func (mr *MockPersistenceServiceMockRecorder) WorkerSeen(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSeen", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSeen), arg0, arg1) } -// WorkerSetClusters mocks base method. -func (m *MockPersistenceService) WorkerSetClusters(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { +// WorkerSetTags mocks base method. +func (m *MockPersistenceService) WorkerSetTags(arg0 context.Context, arg1 *persistence.Worker, arg2 []string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WorkerSetClusters", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "WorkerSetTags", arg0, arg1, arg2) ret0, _ := ret[0].(error) return ret0 } -// WorkerSetClusters indicates an expected call of WorkerSetClusters. -func (mr *MockPersistenceServiceMockRecorder) WorkerSetClusters(arg0, arg1, arg2 interface{}) *gomock.Call { +// WorkerSetTags indicates an expected call of WorkerSetTags. +func (mr *MockPersistenceServiceMockRecorder) WorkerSetTags(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSetClusters", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSetClusters), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WorkerSetTags", reflect.TypeOf((*MockPersistenceService)(nil).WorkerSetTags), arg0, arg1, arg2) } // WorkersLeftToRun mocks base method. diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index c1f8ab30..509f525f 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -182,7 +182,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { +func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) logger = logger.With().Str("worker", workerUUID).Logger() @@ -192,7 +192,7 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { } // Decode the request body. - var change api.WorkerClusterChangeRequest + var change api.WorkerTagChangeRequest if err := e.Bind(&change); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") @@ -210,13 +210,13 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { } logger = logger.With(). - Strs("clusters", change.ClusterIds). + Strs("tags", change.TagIds). Logger() - logger.Info().Msg("worker cluster change requested") + logger.Info().Msg("worker tag change requested") - // Store the new cluster assignment. - if err := f.persist.WorkerSetClusters(ctx, dbWorker, change.ClusterIds); err != nil { - logger.Error().Err(err).Msg("saving worker after cluster change request") + // Store the new tag assignment. + if err := f.persist.WorkerSetTags(ctx, dbWorker, change.TagIds); err != nil { + logger.Error().Err(err).Msg("saving worker after tag change request") return sendAPIError(e, http.StatusInternalServerError, "error saving worker: %v", err) } @@ -227,155 +227,155 @@ func (f *Flamenco) SetWorkerClusters(e echo.Context, workerUUID string) error { return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) DeleteWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - err := f.persist.DeleteWorkerCluster(ctx, clusterUUID) + err := f.persist.DeleteWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("deleting worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error deleting worker cluster: %v", err) + logger.Error().Err(err).Msg("deleting worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error deleting worker tag: %v", err) } - // TODO: SocketIO broadcast of cluster deletion. + // TODO: SocketIO broadcast of tag deletion. - logger.Info().Msg("worker cluster deleted") + logger.Info().Msg("worker tag deleted") return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) FetchWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) FetchWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } - cluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID) + tag, err := f.persist.FetchWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("fetching worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err) + logger.Error().Err(err).Msg("fetching worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err) } - return e.JSON(http.StatusOK, workerClusterDBtoAPI(*cluster)) + return e.JSON(http.StatusOK, workerTagDBtoAPI(*tag)) } -func (f *Flamenco) UpdateWorkerCluster(e echo.Context, clusterUUID string) error { +func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error { ctx := e.Request().Context() logger := requestLogger(e) - logger = logger.With().Str("cluster", clusterUUID).Logger() + logger = logger.With().Str("tag", tagUUID).Logger() - if !uuid.IsValid(clusterUUID) { + if !uuid.IsValid(tagUUID) { return sendAPIError(e, http.StatusBadRequest, "not a valid UUID") } // Decode the request body. - var update api.UpdateWorkerClusterJSONBody + var update api.UpdateWorkerTagJSONBody if err := e.Bind(&update); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } - dbCluster, err := f.persist.FetchWorkerCluster(ctx, clusterUUID) + dbTag, err := f.persist.FetchWorkerTag(ctx, tagUUID) switch { - case errors.Is(err, persistence.ErrWorkerClusterNotFound): - logger.Debug().Msg("non-existent worker cluster requested") - return sendAPIError(e, http.StatusNotFound, "worker cluster %q not found", clusterUUID) + case errors.Is(err, persistence.ErrWorkerTagNotFound): + logger.Debug().Msg("non-existent worker tag requested") + return sendAPIError(e, http.StatusNotFound, "worker tag %q not found", tagUUID) case err != nil: - logger.Error().Err(err).Msg("fetching worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error fetching worker cluster: %v", err) + logger.Error().Err(err).Msg("fetching worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error fetching worker tag: %v", err) } - // Update the cluster. - dbCluster.Name = update.Name + // Update the tag. + dbTag.Name = update.Name if update.Description == nil { - dbCluster.Description = "" + dbTag.Description = "" } else { - dbCluster.Description = *update.Description + dbTag.Description = *update.Description } - if err := f.persist.SaveWorkerCluster(ctx, dbCluster); err != nil { - logger.Error().Err(err).Msg("saving worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster") + if err := f.persist.SaveWorkerTag(ctx, dbTag); err != nil { + logger.Error().Err(err).Msg("saving worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag") } - // TODO: SocketIO broadcast of cluster update. + // TODO: SocketIO broadcast of tag update. return e.NoContent(http.StatusNoContent) } -func (f *Flamenco) FetchWorkerClusters(e echo.Context) error { +func (f *Flamenco) FetchWorkerTags(e echo.Context) error { ctx := e.Request().Context() logger := requestLogger(e) - dbClusters, err := f.persist.FetchWorkerClusters(ctx) + dbTags, err := f.persist.FetchWorkerTags(ctx) if err != nil { - logger.Error().Err(err).Msg("fetching worker clusters") - return sendAPIError(e, http.StatusInternalServerError, "error saving worker cluster") + logger.Error().Err(err).Msg("fetching worker tags") + return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag") } - apiClusters := []api.WorkerCluster{} - for _, dbCluster := range dbClusters { - apiCluster := workerClusterDBtoAPI(*dbCluster) - apiClusters = append(apiClusters, apiCluster) + apiTags := []api.WorkerTag{} + for _, dbTag := range dbTags { + apiTag := workerTagDBtoAPI(*dbTag) + apiTags = append(apiTags, apiTag) } - clusterList := api.WorkerClusterList{ - Clusters: &apiClusters, + tagList := api.WorkerTagList{ + Tags: &apiTags, } - return e.JSON(http.StatusOK, &clusterList) + return e.JSON(http.StatusOK, &tagList) } -func (f *Flamenco) CreateWorkerCluster(e echo.Context) error { +func (f *Flamenco) CreateWorkerTag(e echo.Context) error { ctx := e.Request().Context() logger := requestLogger(e) // Decode the request body. - var apiCluster api.CreateWorkerClusterJSONBody - if err := e.Bind(&apiCluster); err != nil { + var apiTag api.CreateWorkerTagJSONBody + if err := e.Bind(&apiTag); err != nil { logger.Warn().Err(err).Msg("bad request received") return sendAPIError(e, http.StatusBadRequest, "invalid format") } // Convert to persistence layer model. - var clusterUUID string - if apiCluster.Id != nil && *apiCluster.Id != "" { - clusterUUID = *apiCluster.Id + var tagUUID string + if apiTag.Id != nil && *apiTag.Id != "" { + tagUUID = *apiTag.Id } else { - clusterUUID = uuid.New() + tagUUID = uuid.New() } - dbCluster := persistence.WorkerCluster{ - UUID: clusterUUID, - Name: apiCluster.Name, + dbTag := persistence.WorkerTag{ + UUID: tagUUID, + Name: apiTag.Name, } - if apiCluster.Description != nil { - dbCluster.Description = *apiCluster.Description + if apiTag.Description != nil { + dbTag.Description = *apiTag.Description } // Store in the database. - if err := f.persist.CreateWorkerCluster(ctx, &dbCluster); err != nil { - logger.Error().Err(err).Msg("creating worker cluster") - return sendAPIError(e, http.StatusInternalServerError, "error creating worker cluster") + if err := f.persist.CreateWorkerTag(ctx, &dbTag); err != nil { + logger.Error().Err(err).Msg("creating worker tag") + return sendAPIError(e, http.StatusInternalServerError, "error creating worker tag") } - // TODO: SocketIO broadcast of cluster creation. + // TODO: SocketIO broadcast of tag creation. - return e.JSON(http.StatusOK, workerClusterDBtoAPI(dbCluster)) + return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag)) } func workerSummary(w persistence.Worker) api.WorkerSummary { @@ -407,26 +407,26 @@ func workerDBtoAPI(w persistence.Worker) api.Worker { SupportedTaskTypes: w.TaskTypes(), } - if len(w.Clusters) > 0 { - clusters := []api.WorkerCluster{} - for i := range w.Clusters { - clusters = append(clusters, workerClusterDBtoAPI(*w.Clusters[i])) + if len(w.Tags) > 0 { + tags := []api.WorkerTag{} + for i := range w.Tags { + tags = append(tags, workerTagDBtoAPI(*w.Tags[i])) } - apiWorker.Clusters = &clusters + apiWorker.Tags = &tags } return apiWorker } -func workerClusterDBtoAPI(wc persistence.WorkerCluster) api.WorkerCluster { +func workerTagDBtoAPI(wc persistence.WorkerTag) api.WorkerTag { uuid := wc.UUID // Take a copy for safety. - apiCluster := api.WorkerCluster{ + apiTag := api.WorkerTag{ Id: &uuid, Name: wc.Name, } if len(wc.Description) > 0 { - apiCluster.Description = &wc.Description + apiTag.Description = &wc.Description } - return apiCluster + return apiTag } diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index f914121d..352125c3 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -262,58 +262,58 @@ func TestRequestWorkerStatusChangeRevert(t *testing.T) { assertResponseNoContent(t, echo) } -func TestWorkerClusterCRUDHappyFlow(t *testing.T) { +func TestWorkerTagCRUDHappyFlow(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() mf := newMockedFlamenco(mockCtrl) - // Create a cluster. + // Create a tag. UUID := "18d9234e-5135-458f-a1ba-a350c3d4e837" - apiCluster := api.WorkerCluster{ + apiTag := api.WorkerTag{ Id: &UUID, Name: "ʻO nā manu ʻino", Description: ptr("Ke aloha"), } - expectDBCluster := persistence.WorkerCluster{ + expectDBTag := persistence.WorkerTag{ UUID: UUID, - Name: apiCluster.Name, - Description: *apiCluster.Description, + Name: apiTag.Name, + Description: *apiTag.Description, } - mf.persistence.EXPECT().CreateWorkerCluster(gomock.Any(), &expectDBCluster) - // TODO: expect SocketIO broadcast of the cluster creation. - echo := mf.prepareMockedJSONRequest(apiCluster) - require.NoError(t, mf.flamenco.CreateWorkerCluster(echo)) - assertResponseJSON(t, echo, http.StatusOK, &apiCluster) + mf.persistence.EXPECT().CreateWorkerTag(gomock.Any(), &expectDBTag) + // TODO: expect SocketIO broadcast of the tag creation. + echo := mf.prepareMockedJSONRequest(apiTag) + require.NoError(t, mf.flamenco.CreateWorkerTag(echo)) + assertResponseJSON(t, echo, http.StatusOK, &apiTag) - // Fetch the cluster - mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil) + // Fetch the tag + mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) echo = mf.prepareMockedRequest(nil) - require.NoError(t, mf.flamenco.FetchWorkerCluster(echo, UUID)) - assertResponseJSON(t, echo, http.StatusOK, &apiCluster) + require.NoError(t, mf.flamenco.FetchWorkerTag(echo, UUID)) + assertResponseJSON(t, echo, http.StatusOK, &apiTag) // Update & save. newUUID := "60442762-83d3-4fc3-bf75-6ab5799cdbaa" - newAPICluster := api.WorkerCluster{ + newAPITag := api.WorkerTag{ Id: &newUUID, // Intentionally change the UUID. This should just be ignored. Name: "updated name", } - expectNewDBCluster := persistence.WorkerCluster{ + expectNewDBTag := persistence.WorkerTag{ UUID: UUID, - Name: newAPICluster.Name, + Name: newAPITag.Name, Description: "", } - // TODO: expect SocketIO broadcast of the cluster update. - mf.persistence.EXPECT().FetchWorkerCluster(gomock.Any(), UUID).Return(&expectDBCluster, nil) - mf.persistence.EXPECT().SaveWorkerCluster(gomock.Any(), &expectNewDBCluster) - echo = mf.prepareMockedJSONRequest(newAPICluster) - require.NoError(t, mf.flamenco.UpdateWorkerCluster(echo, UUID)) + // TODO: expect SocketIO broadcast of the tag update. + mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) + mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag) + echo = mf.prepareMockedJSONRequest(newAPITag) + require.NoError(t, mf.flamenco.UpdateWorkerTag(echo, UUID)) assertResponseNoContent(t, echo) // Delete. - mf.persistence.EXPECT().DeleteWorkerCluster(gomock.Any(), UUID) - // TODO: expect SocketIO broadcast of the cluster deletion. - echo = mf.prepareMockedJSONRequest(newAPICluster) - require.NoError(t, mf.flamenco.DeleteWorkerCluster(echo, UUID)) + mf.persistence.EXPECT().DeleteWorkerTag(gomock.Any(), UUID) + // TODO: expect SocketIO broadcast of the tag deletion. + echo = mf.prepareMockedJSONRequest(newAPITag) + require.NoError(t, mf.flamenco.DeleteWorkerTag(echo, UUID)) assertResponseNoContent(t, echo) } diff --git a/internal/manager/job_compilers/author.go b/internal/manager/job_compilers/author.go index 7a457feb..c1d5dd3a 100644 --- a/internal/manager/job_compilers/author.go +++ b/internal/manager/job_compilers/author.go @@ -20,8 +20,8 @@ type Author struct { } type AuthoredJob struct { - JobID string - WorkerClusterUUID string + JobID string + WorkerTagUUID string Name string JobType string diff --git a/internal/manager/job_compilers/job_compilers.go b/internal/manager/job_compilers/job_compilers.go index 3b4cae4c..41228d0f 100644 --- a/internal/manager/job_compilers/job_compilers.go +++ b/internal/manager/job_compilers/job_compilers.go @@ -127,8 +127,8 @@ func (s *Service) Compile(ctx context.Context, sj api.SubmittedJob) (*AuthoredJo aj.Storage.ShamanCheckoutID = *sj.Storage.ShamanCheckoutId } - if sj.WorkerCluster != nil { - aj.WorkerClusterUUID = *sj.WorkerCluster + if sj.WorkerTag != nil { + aj.WorkerTagUUID = *sj.WorkerTag } compiler, err := vm.getCompileJob() diff --git a/internal/manager/job_compilers/job_compilers_test.go b/internal/manager/job_compilers/job_compilers_test.go index ad746b51..3fcccfaa 100644 --- a/internal/manager/job_compilers/job_compilers_test.go +++ b/internal/manager/job_compilers/job_compilers_test.go @@ -45,12 +45,12 @@ func exampleSubmittedJob() api.SubmittedJob { "user.name": "Sybren Stüvel", }} sj := api.SubmittedJob{ - Name: "3Д рендеринг", - Priority: 50, - Type: "simple-blender-render", - Settings: &settings, - Metadata: &metadata, - WorkerCluster: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"), + Name: "3Д рендеринг", + Priority: 50, + Type: "simple-blender-render", + Settings: &settings, + Metadata: &metadata, + WorkerTag: ptr("acce9983-e663-4210-b3cc-f7bfa629cb21"), } return sj } @@ -80,7 +80,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) { // Properties should be copied as-is. assert.Equal(t, sj.Name, aj.Name) - assert.Equal(t, *sj.WorkerCluster, aj.WorkerClusterUUID) + assert.Equal(t, *sj.WorkerTag, aj.WorkerTagUUID) assert.Equal(t, sj.Type, aj.JobType) assert.Equal(t, sj.Priority, aj.Priority) assert.EqualValues(t, sj.Settings.AdditionalProperties, aj.Settings) @@ -139,7 +139,7 @@ func TestSimpleBlenderRenderHappy(t *testing.T) { assert.Equal(t, expectDeps, tVideo.Dependencies) } -func TestJobWithoutCluster(t *testing.T) { +func TestJobWithoutTag(t *testing.T) { c := mockedClock(t) s, err := Load(c) @@ -151,20 +151,20 @@ func TestJobWithoutCluster(t *testing.T) { sj := exampleSubmittedJob() - // Try with nil WorkerCluster. + // Try with nil WorkerTag. { - sj.WorkerCluster = nil + sj.WorkerTag = nil aj, err := s.Compile(ctx, sj) require.NoError(t, err) - assert.Zero(t, aj.WorkerClusterUUID) + assert.Zero(t, aj.WorkerTagUUID) } - // Try with empty WorkerCluster. + // Try with empty WorkerTag. { - sj.WorkerCluster = ptr("") + sj.WorkerTag = ptr("") aj, err := s.Compile(ctx, sj) require.NoError(t, err) - assert.Zero(t, aj.WorkerClusterUUID) + assert.Zero(t, aj.WorkerTagUUID) } } diff --git a/internal/manager/persistence/db_migration.go b/internal/manager/persistence/db_migration.go index b60d9230..6c8bc570 100644 --- a/internal/manager/persistence/db_migration.go +++ b/internal/manager/persistence/db_migration.go @@ -16,7 +16,7 @@ func (db *DB) migrate() error { &Task{}, &TaskFailure{}, &Worker{}, - &WorkerCluster{}, + &WorkerTag{}, ) if err != nil { return fmt.Errorf("failed to automigrate database: %v", err) diff --git a/internal/manager/persistence/errors.go b/internal/manager/persistence/errors.go index 1316f481..0b7ebd6f 100644 --- a/internal/manager/persistence/errors.go +++ b/internal/manager/persistence/errors.go @@ -9,10 +9,10 @@ import ( ) var ( - ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} - ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} - ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} - ErrWorkerClusterNotFound = PersistenceError{Message: "worker cluster not found", Err: gorm.ErrRecordNotFound} + ErrJobNotFound = PersistenceError{Message: "job not found", Err: gorm.ErrRecordNotFound} + ErrTaskNotFound = PersistenceError{Message: "task not found", Err: gorm.ErrRecordNotFound} + ErrWorkerNotFound = PersistenceError{Message: "worker not found", Err: gorm.ErrRecordNotFound} + ErrWorkerTagNotFound = PersistenceError{Message: "worker tag not found", Err: gorm.ErrRecordNotFound} ) type PersistenceError struct { @@ -40,8 +40,8 @@ func workerError(errorToWrap error, message string, msgArgs ...interface{}) erro return wrapError(translateGormWorkerError(errorToWrap), message, msgArgs...) } -func workerClusterError(errorToWrap error, message string, msgArgs ...interface{}) error { - return wrapError(translateGormWorkerClusterError(errorToWrap), message, msgArgs...) +func workerTagError(errorToWrap error, message string, msgArgs ...interface{}) error { + return wrapError(translateGormWorkerTagError(errorToWrap), message, msgArgs...) } func wrapError(errorToWrap error, message string, format ...interface{}) error { @@ -86,11 +86,11 @@ func translateGormWorkerError(gormError error) error { return gormError } -// translateGormWorkerClusterError translates a Gorm error to a persistence layer error. +// translateGormWorkerTagError translates a Gorm error to a persistence layer error. // This helps to keep Gorm as "implementation detail" of the persistence layer. -func translateGormWorkerClusterError(gormError error) error { +func translateGormWorkerTagError(gormError error) error { if errors.Is(gormError, gorm.ErrRecordNotFound) { - return ErrWorkerClusterNotFound + return ErrWorkerTagNotFound } return gormError } diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 66b9e0fa..8f6329d9 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -36,8 +36,8 @@ type Job struct { Storage JobStorageInfo `gorm:"embedded;embeddedPrefix:storage_"` - WorkerClusterID *uint - WorkerCluster *WorkerCluster `gorm:"foreignkey:WorkerClusterID;references:ID;constraint:OnDelete:SET NULL"` + WorkerTagID *uint + WorkerTag *WorkerTag `gorm:"foreignkey:WorkerTagID;references:ID;constraint:OnDelete:SET NULL"` } type StringInterfaceMap map[string]interface{} @@ -148,14 +148,14 @@ func (db *DB) StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.Au }, } - // Find and assign the worker cluster. - if authoredJob.WorkerClusterUUID != "" { - dbCluster, err := fetchWorkerCluster(tx, authoredJob.WorkerClusterUUID) + // Find and assign the worker tag. + if authoredJob.WorkerTagUUID != "" { + dbTag, err := fetchWorkerTag(tx, authoredJob.WorkerTagUUID) if err != nil { return err } - dbJob.WorkerClusterID = &dbCluster.ID - dbJob.WorkerCluster = dbCluster + dbJob.WorkerTagID = &dbTag.ID + dbJob.WorkerTag = dbTag } if err := tx.Create(&dbJob).Error; err != nil { @@ -233,7 +233,7 @@ func (db *DB) FetchJob(ctx context.Context, jobUUID string) (*Job, error) { dbJob := Job{} findResult := db.gormDB.WithContext(ctx). Limit(1). - Preload("WorkerCluster"). + Preload("WorkerTag"). Find(&dbJob, "uuid = ?", jobUUID) if findResult.Error != nil { return nil, jobError(findResult.Error, "fetching job") diff --git a/internal/manager/persistence/jobs_blocklist.go b/internal/manager/persistence/jobs_blocklist.go index c12b09b7..da041ecf 100644 --- a/internal/manager/persistence/jobs_blocklist.go +++ b/internal/manager/persistence/jobs_blocklist.go @@ -108,16 +108,16 @@ func (db *DB) WorkersLeftToRun(ctx context.Context, job *Job, taskType string) ( Select("uuid"). Where("id not in (?)", blockedWorkers) - if job.WorkerClusterID == nil { + if job.WorkerTagID == nil { // Count all workers, so no extra restrictions are necessary. } else { - // Only count workers in the job's cluster. - jobCluster := db.gormDB. - Table("worker_cluster_membership"). + // Only count workers in the job's tag. + jobTag := db.gormDB. + Table("worker_tag_membership"). Select("worker_id"). - Where("worker_cluster_id = ?", *job.WorkerClusterID) + Where("worker_tag_id = ?", *job.WorkerTagID) query = query. - Where("id in (?)", jobCluster) + Where("id in (?)", jobTag) } // Find the workers NOT blocked. diff --git a/internal/manager/persistence/jobs_blocklist_test.go b/internal/manager/persistence/jobs_blocklist_test.go index 97e3503d..d82a4092 100644 --- a/internal/manager/persistence/jobs_blocklist_test.go +++ b/internal/manager/persistence/jobs_blocklist_test.go @@ -126,14 +126,14 @@ func TestWorkersLeftToRun(t *testing.T) { worker1 := createWorker(ctx, t, db) worker2 := createWorkerFrom(ctx, t, db, *worker1) - // Create one worker cluster. It will not be used by this job, but one of the + // Create one worker tag. It will not be used by this job, but one of the // workers will be assigned to it. It can get this job's tasks, though. - // Because the job is clusterless, it can be run by all. - cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) + // Because the job is tagless, it can be run by all. + tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) uuidMap := func(workers ...*Worker) map[string]bool { @@ -172,43 +172,43 @@ func TestWorkersLeftToRun(t *testing.T) { } } -func TestWorkersLeftToRunWithClusters(t *testing.T) { +func TestWorkersLeftToRunWithTags(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create clusters. - cluster1 := WorkerCluster{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - cluster2 := WorkerCluster{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Cluster 2"} - cluster3 := WorkerCluster{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Cluster 3"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster3)) + // Create tags. + tag1 := WorkerTag{UUID: "11157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + tag2 := WorkerTag{UUID: "22257623-4b14-4801-bee2-271dddab6309", Name: "Tag 2"} + tag3 := WorkerTag{UUID: "33357623-4b14-4801-bee2-271dddab6309", Name: "Tag 3"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag3)) - // Create a job in cluster1. + // Create a job in tag1. authoredJob := createTestAuthoredJobWithTasks() - authoredJob.WorkerClusterUUID = cluster1.UUID + authoredJob.WorkerTagUUID = tag1.UUID job := persistAuthoredJob(t, ctx, db, authoredJob) - // Clusters 1 + 3 + // Tags 1 + 3 workerC13 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c13c1313-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1, &cluster3} + w.Tags = []*WorkerTag{&tag1, &tag3} }) - // Cluster 1 + // Tag 1 workerC1 := createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c1c1c1c1-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Cluster 2 worker, this one should never appear. + // Tag 2 worker, this one should never appear. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "c2c2c2c2-0000-1111-2222-333333333333" - w.Clusters = []*WorkerCluster{&cluster2} + w.Tags = []*WorkerTag{&tag2} }) - // No clusters, so should be able to run only clusterless jobs. Which is none + // No tags, so should be able to run only tagless jobs. Which is none // in this test. createWorker(ctx, t, db, func(w *Worker) { w.UUID = "00000000-0000-1111-2222-333333333333" - w.Clusters = nil + w.Tags = nil }) uuidMap := func(workers ...*Worker) map[string]bool { @@ -219,7 +219,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) { return theMap } - // All Cluster 1 workers, no blocklist. + // All Tag 1 workers, no blocklist. left, err := db.WorkersLeftToRun(ctx, job, "blender") require.NoError(t, err) assert.Equal(t, uuidMap(workerC13, workerC1), left) @@ -230,7 +230,7 @@ func TestWorkersLeftToRunWithClusters(t *testing.T) { require.NoError(t, err) assert.Equal(t, uuidMap(workerC13), left) - // All clustered workers blocked. + // All taged workers blocked. _ = db.AddWorkerToJobBlocklist(ctx, job, workerC13, "blender") left, err = db.WorkersLeftToRun(ctx, job, "blender") assert.NoError(t, err) diff --git a/internal/manager/persistence/jobs_query.go b/internal/manager/persistence/jobs_query.go index 63773b4a..77f295b8 100644 --- a/internal/manager/persistence/jobs_query.go +++ b/internal/manager/persistence/jobs_query.go @@ -64,7 +64,7 @@ func (db *DB) QueryJobs(ctx context.Context, apiQ api.JobsQuery) ([]*Job, error) } } - q.Preload("Cluster") + q.Preload("Tag") result := []*Job{} tx := q.Scan(&result) diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index 6fd36f66..f9623a4c 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -757,7 +757,7 @@ func createWorker(ctx context.Context, t *testing.T, db *DB, updaters ...func(*W Software: "3.0", Status: api.WorkerStatusAwake, SupportedTaskTypes: "blender,ffmpeg,file-management", - Clusters: nil, + Tags: nil, } for _, updater := range updaters { diff --git a/internal/manager/persistence/task_scheduler.go b/internal/manager/persistence/task_scheduler.go index 08f135f9..47560e5d 100644 --- a/internal/manager/persistence/task_scheduler.go +++ b/internal/manager/persistence/task_scheduler.go @@ -26,7 +26,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { logger := log.With().Str("worker", w.UUID).Logger() logger.Trace().Msg("finding task for worker") - hasWorkerClusters, err := db.HasWorkerClusters(ctx) + hasWorkerTags, err := db.HasWorkerTags(ctx) if err != nil { return nil, err } @@ -37,7 +37,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { var task *Task txErr := db.gormDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var err error - task, err = findTaskForWorker(tx, w, hasWorkerClusters) + task, err = findTaskForWorker(tx, w, hasWorkerTags) if err != nil { if isDatabaseBusyError(err) { logger.Trace().Err(err).Msg("database busy while finding task for worker") @@ -84,7 +84,7 @@ func (db *DB) ScheduleTask(ctx context.Context, w *Worker) (*Task, error) { return task, nil } -func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, error) { +func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerTags bool) (*Task, error) { task := Task{} // If a task is alreay active & assigned to this worker, return just that. @@ -129,21 +129,21 @@ func findTaskForWorker(tx *gorm.DB, w *Worker, checkWorkerClusters bool) (*Task, Where("TF.worker_id is NULL"). // Not failed before Where("tasks.type not in (?)", blockedTaskTypesQuery) // Non-blocklisted - if checkWorkerClusters { - // The system has one or more clusters, so limit the available jobs to those - // that have no cluster, or overlap with the Worker's clusters. - if len(w.Clusters) == 0 { - // Clusterless workers only get clusterless jobs. + if checkWorkerTags { + // The system has one or more tags, so limit the available jobs to those + // that have no tag, or overlap with the Worker's tags. + if len(w.Tags) == 0 { + // Tagless workers only get tagless jobs. findTaskQuery = findTaskQuery. - Where("jobs.worker_cluster_id is NULL") + Where("jobs.worker_tag_id is NULL") } else { - // Clustered workers get clusterless jobs AND jobs of their own clusters. - clusterIDs := []uint{} - for _, cluster := range w.Clusters { - clusterIDs = append(clusterIDs, cluster.ID) + // Taged workers get tagless jobs AND jobs of their own tags. + tagIDs := []uint{} + for _, tag := range w.Tags { + tagIDs = append(tagIDs, tag.ID) } findTaskQuery = findTaskQuery. - Where("jobs.worker_cluster_id is NULL or worker_cluster_id in ?", clusterIDs) + Where("jobs.worker_tag_id is NULL or worker_tag_id in ?", tagIDs) } } diff --git a/internal/manager/persistence/task_scheduler_test.go b/internal/manager/persistence/task_scheduler_test.go index 90fe0b8f..0005bdfc 100644 --- a/internal/manager/persistence/task_scheduler_test.go +++ b/internal/manager/persistence/task_scheduler_test.go @@ -291,87 +291,87 @@ func TestPreviouslyFailed(t *testing.T) { assert.Equal(t, att2.Name, task.Name, "the second task should have been chosen") } -func TestWorkerClusterJobWithCluster(t *testing.T) { +func TestWorkerTagJobWithTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create worker clusters: - cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - cluster2 := WorkerCluster{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Cluster 2"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster2)) + // Create worker tags: + tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + tag2 := WorkerTag{UUID: "2f71dba1-cf92-4752-8386-f5926affabd5", Name: "Tag 2"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) + require.NoError(t, db.CreateWorkerTag(ctx, &tag2)) - // Create a worker in cluster1: + // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Create a worker without cluster: + // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Clusters = nil + w.Tags = nil }) - { // Test job with different cluster: + { // Test job with different tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("499cf0f8-e83d-4cb1-837a-df94789d07db", "simple-blender-render", authTask) - job.WorkerClusterUUID = cluster2.UUID + job.WorkerTagUUID = tag2.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - assert.Nil(t, task, "job with different cluster should not be scheduled") + assert.Nil(t, task, "job with different tag should not be scheduled") } - { // Test job with matching cluster: + { // Test job with matching tag: authTask := authorTestTask("the task", "blender") job := authorTestJob("5d4c2321-0bb7-4c13-a9dd-32a2c0cd156e", "simple-blender-render", authTask) - job.WorkerClusterUUID = cluster1.UUID + job.WorkerTagUUID = tag1.UUID constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - require.NotNil(t, task, "job with matching cluster should be scheduled") + require.NotNil(t, task, "job with matching tag should be scheduled") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) - assert.Nil(t, task, "job with cluster should not be scheduled for worker without cluster") + assert.Nil(t, task, "job with tag should not be scheduled for worker without tag") } } -func TestWorkerClusterJobWithoutCluster(t *testing.T) { +func TestWorkerTagJobWithoutTag(t *testing.T) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) defer cancel() - // Create worker cluster: - cluster1 := WorkerCluster{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Cluster 1"} - require.NoError(t, db.CreateWorkerCluster(ctx, &cluster1)) + // Create worker tag: + tag1 := WorkerTag{UUID: "f0157623-4b14-4801-bee2-271dddab6309", Name: "Tag 1"} + require.NoError(t, db.CreateWorkerTag(ctx, &tag1)) - // Create a worker in cluster1: + // Create a worker in tag1: workerC := linuxWorker(t, db, func(w *Worker) { - w.Clusters = []*WorkerCluster{&cluster1} + w.Tags = []*WorkerTag{&tag1} }) - // Create a worker without cluster: + // Create a worker without tag: workerNC := linuxWorker(t, db, func(w *Worker) { w.UUID = "c53f8f68-4149-4790-991c-ba73a326551e" - w.Clusters = nil + w.Tags = nil }) - // Test cluster-less job: + // Test tag-less job: authTask := authorTestTask("the task", "blender") job := authorTestJob("b6a1d859-122f-4791-8b78-b943329a9989", "simple-blender-render", authTask) constructTestJob(ctx, t, db, job) task, err := db.ScheduleTask(ctx, &workerC) require.NoError(t, err) - require.NotNil(t, task, "job without cluster should always be scheduled to worker in some cluster") + require.NotNil(t, task, "job without tag should always be scheduled to worker in some tag") assert.Equal(t, authTask.UUID, task.UUID) task, err = db.ScheduleTask(ctx, &workerNC) require.NoError(t, err) - require.NotNil(t, task, "job without cluster should always be scheduled to worker without cluster") + require.NotNil(t, task, "job without tag should always be scheduled to worker without tag") assert.Equal(t, authTask.UUID, task.UUID) } diff --git a/internal/manager/persistence/test_support.go b/internal/manager/persistence/test_support.go index 80832819..2b1351c4 100644 --- a/internal/manager/persistence/test_support.go +++ b/internal/manager/persistence/test_support.go @@ -96,8 +96,8 @@ type WorkerTestFixture struct { ctx context.Context done func() - worker *Worker - cluster *WorkerCluster + worker *Worker + tag *WorkerTag } func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTestFixture { @@ -113,21 +113,21 @@ func workerTestFixtures(t *testing.T, testContextTimeout time.Duration) WorkerTe SupportedTaskTypes: "blender,ffmpeg,file-management", } - wc := WorkerCluster{ + wc := WorkerTag{ UUID: uuid.New(), Name: "arbejdsklynge", - Description: "Worker cluster in Danish", + Description: "Worker tag in Danish", } require.NoError(t, db.CreateWorker(ctx, &w)) - require.NoError(t, db.CreateWorkerCluster(ctx, &wc)) + require.NoError(t, db.CreateWorkerTag(ctx, &wc)) return WorkerTestFixture{ db: db, ctx: ctx, done: cancel, - worker: &w, - cluster: &wc, + worker: &w, + tag: &wc, } } diff --git a/internal/manager/persistence/worker_cluster.go b/internal/manager/persistence/worker_cluster.go deleted file mode 100644 index 60974e6d..00000000 --- a/internal/manager/persistence/worker_cluster.go +++ /dev/null @@ -1,112 +0,0 @@ -package persistence - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "context" - "fmt" - - "gorm.io/gorm" -) - -type WorkerCluster struct { - Model - - UUID string `gorm:"type:char(36);default:'';unique;index"` - Name string `gorm:"type:varchar(64);default:'';unique"` - Description string `gorm:"type:varchar(255);default:''"` - - Workers []*Worker `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"` -} - -func (db *DB) CreateWorkerCluster(ctx context.Context, wc *WorkerCluster) error { - if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil { - return fmt.Errorf("creating new worker cluster: %w", err) - } - return nil -} - -// HasWorkerClusters returns whether there are any clusters defined at all. -func (db *DB) HasWorkerClusters(ctx context.Context) (bool, error) { - var count int64 - tx := db.gormDB.WithContext(ctx). - Model(&WorkerCluster{}). - Count(&count) - if err := tx.Error; err != nil { - return false, workerClusterError(err, "counting worker clusters") - } - return count > 0, nil -} - -func (db *DB) FetchWorkerCluster(ctx context.Context, uuid string) (*WorkerCluster, error) { - tx := db.gormDB.WithContext(ctx) - return fetchWorkerCluster(tx, uuid) -} - -// fetchWorkerCluster fetches the worker cluster using the given database instance. -func fetchWorkerCluster(gormDB *gorm.DB, uuid string) (*WorkerCluster, error) { - w := WorkerCluster{} - tx := gormDB.First(&w, "uuid = ?", uuid) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching worker cluster") - } - return &w, nil -} - -func (db *DB) SaveWorkerCluster(ctx context.Context, cluster *WorkerCluster) error { - if err := db.gormDB.WithContext(ctx).Save(cluster).Error; err != nil { - return workerClusterError(err, "saving worker cluster") - } - return nil -} - -// DeleteWorkerCluster deletes the given cluster, after unassigning all workers from it. -func (db *DB) DeleteWorkerCluster(ctx context.Context, uuid string) error { - tx := db.gormDB.WithContext(ctx). - Where("uuid = ?", uuid). - Delete(&WorkerCluster{}) - if tx.Error != nil { - return workerClusterError(tx.Error, "deleting worker cluster") - } - if tx.RowsAffected == 0 { - return ErrWorkerClusterNotFound - } - return nil -} - -func (db *DB) FetchWorkerClusters(ctx context.Context) ([]*WorkerCluster, error) { - clusters := make([]*WorkerCluster, 0) - tx := db.gormDB.WithContext(ctx).Model(&WorkerCluster{}).Scan(&clusters) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching all worker clusters") - } - return clusters, nil -} - -func (db *DB) fetchWorkerClustersWithUUID(ctx context.Context, clusterUUIDs []string) ([]*WorkerCluster, error) { - clusters := make([]*WorkerCluster, 0) - tx := db.gormDB.WithContext(ctx). - Model(&WorkerCluster{}). - Where("uuid in ?", clusterUUIDs). - Scan(&clusters) - if tx.Error != nil { - return nil, workerClusterError(tx.Error, "fetching all worker clusters") - } - return clusters, nil -} - -func (db *DB) WorkerSetClusters(ctx context.Context, worker *Worker, clusterUUIDs []string) error { - clusters, err := db.fetchWorkerClustersWithUUID(ctx, clusterUUIDs) - if err != nil { - return workerClusterError(err, "fetching worker clusters") - } - - err = db.gormDB.WithContext(ctx). - Model(worker). - Association("Clusters"). - Replace(clusters) - if err != nil { - return workerClusterError(err, "updating worker clusters") - } - return nil -} diff --git a/internal/manager/persistence/worker_cluster_test.go b/internal/manager/persistence/worker_cluster_test.go deleted file mode 100644 index ed054c6b..00000000 --- a/internal/manager/persistence/worker_cluster_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package persistence - -// SPDX-License-Identifier: GPL-3.0-or-later - -import ( - "testing" - "time" - - "git.blender.org/flamenco/internal/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestCreateFetchCluster(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Test fetching non-existent cluster - fetchedCluster, err := f.db.FetchWorkerCluster(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f") - assert.ErrorIs(t, err, ErrWorkerClusterNotFound) - assert.Nil(t, fetchedCluster) - - // New cluster creation is already done in the workerTestFixtures() call. - assert.NotNil(t, f.cluster) - - fetchedCluster, err = f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) - require.NoError(t, err) - assert.NotNil(t, fetchedCluster) - - // Test contents of fetched cluster. - assert.Equal(t, f.cluster.UUID, fetchedCluster.UUID) - assert.Equal(t, f.cluster.Name, fetchedCluster.Name) - assert.Equal(t, f.cluster.Description, fetchedCluster.Description) - assert.Zero(t, fetchedCluster.Workers) -} - -func TestFetchDeleteClusters(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Single cluster was created by fixture. - has, err := f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.True(t, has, "expecting HasWorkerClusters to return true") - - secondCluster := WorkerCluster{ - UUID: uuid.New(), - Name: "arbeiderscluster", - Description: "Worker cluster in Dutch", - } - - require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster)) - - allClusters, err := f.db.FetchWorkerClusters(f.ctx) - require.NoError(t, err) - - require.Len(t, allClusters, 2) - var allClusterIDs [2]string - for idx := range allClusters { - allClusterIDs[idx] = allClusters[idx].UUID - } - assert.Contains(t, allClusterIDs, f.cluster.UUID) - assert.Contains(t, allClusterIDs, secondCluster.UUID) - - has, err = f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.True(t, has, "expecting HasWorkerClusters to return true") - - // Test deleting the 2nd cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, secondCluster.UUID)) - - allClusters, err = f.db.FetchWorkerClusters(f.ctx) - require.NoError(t, err) - require.Len(t, allClusters, 1) - assert.Equal(t, f.cluster.UUID, allClusters[0].UUID) - - // Test deleting the 1st cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID)) - has, err = f.db.HasWorkerClusters(f.ctx) - require.NoError(t, err) - assert.False(t, has, "expecting HasWorkerClusters to return false") -} - -func TestAssignUnassignWorkerClusters(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - assertClusters := func(msgLabel string, clusterUUIDs ...string) { - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) - require.NoError(t, err) - - // Catch doubly-reported clusters, as the maps below would hide those cases. - assert.Len(t, w.Clusters, len(clusterUUIDs), msgLabel) - - expectClusters := make(map[string]bool) - for _, cid := range clusterUUIDs { - expectClusters[cid] = true - } - - actualClusters := make(map[string]bool) - for _, c := range w.Clusters { - actualClusters[c.UUID] = true - } - - assert.Equal(t, expectClusters, actualClusters, msgLabel) - } - - secondCluster := WorkerCluster{ - UUID: uuid.New(), - Name: "arbeiderscluster", - Description: "Worker cluster in Dutch", - } - - require.NoError(t, f.db.CreateWorkerCluster(f.ctx, &secondCluster)) - - // By default the Worker should not be part of a cluster. - assertClusters("default cluster assignment") - - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) - assertClusters("setting one cluster", f.cluster.UUID) - - // Double assignments should also just work. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, f.cluster.UUID})) - assertClusters("setting twice the same cluster", f.cluster.UUID) - - // Multiple cluster memberships. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID, secondCluster.UUID})) - assertClusters("setting two different clusters", f.cluster.UUID, secondCluster.UUID) - - // Remove memberships. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{secondCluster.UUID})) - assertClusters("unassigning from first cluster", secondCluster.UUID) - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{})) - assertClusters("unassigning from second cluster") -} - -func TestSaveWorkerCluster(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - f.cluster.Name = "übercluster" - f.cluster.Description = "ʻO kēlā hui ma laila" - require.NoError(t, f.db.SaveWorkerCluster(f.ctx, f.cluster)) - - fetched, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) - require.NoError(t, err) - assert.Equal(t, f.cluster.Name, fetched.Name) - assert.Equal(t, f.cluster.Description, fetched.Description) -} - -func TestDeleteWorkerClusterWithWorkersAssigned(t *testing.T) { - f := workerTestFixtures(t, 1*time.Second) - defer f.done() - - // Assign the worker. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) - - // Delete the cluster. - require.NoError(t, f.db.DeleteWorkerCluster(f.ctx, f.cluster.UUID)) - - // Check the Worker has been unassigned from the cluster. - w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) - require.NoError(t, err) - assert.Empty(t, w.Clusters) -} diff --git a/internal/manager/persistence/worker_tag.go b/internal/manager/persistence/worker_tag.go new file mode 100644 index 00000000..6e0c1506 --- /dev/null +++ b/internal/manager/persistence/worker_tag.go @@ -0,0 +1,112 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + + "gorm.io/gorm" +) + +type WorkerTag struct { + Model + + UUID string `gorm:"type:char(36);default:'';unique;index"` + Name string `gorm:"type:varchar(64);default:'';unique"` + Description string `gorm:"type:varchar(255);default:''"` + + Workers []*Worker `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"` +} + +func (db *DB) CreateWorkerTag(ctx context.Context, wc *WorkerTag) error { + if err := db.gormDB.WithContext(ctx).Create(wc).Error; err != nil { + return fmt.Errorf("creating new worker tag: %w", err) + } + return nil +} + +// HasWorkerTags returns whether there are any tags defined at all. +func (db *DB) HasWorkerTags(ctx context.Context) (bool, error) { + var count int64 + tx := db.gormDB.WithContext(ctx). + Model(&WorkerTag{}). + Count(&count) + if err := tx.Error; err != nil { + return false, workerTagError(err, "counting worker tags") + } + return count > 0, nil +} + +func (db *DB) FetchWorkerTag(ctx context.Context, uuid string) (*WorkerTag, error) { + tx := db.gormDB.WithContext(ctx) + return fetchWorkerTag(tx, uuid) +} + +// fetchWorkerTag fetches the worker tag using the given database instance. +func fetchWorkerTag(gormDB *gorm.DB, uuid string) (*WorkerTag, error) { + w := WorkerTag{} + tx := gormDB.First(&w, "uuid = ?", uuid) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching worker tag") + } + return &w, nil +} + +func (db *DB) SaveWorkerTag(ctx context.Context, tag *WorkerTag) error { + if err := db.gormDB.WithContext(ctx).Save(tag).Error; err != nil { + return workerTagError(err, "saving worker tag") + } + return nil +} + +// DeleteWorkerTag deletes the given tag, after unassigning all workers from it. +func (db *DB) DeleteWorkerTag(ctx context.Context, uuid string) error { + tx := db.gormDB.WithContext(ctx). + Where("uuid = ?", uuid). + Delete(&WorkerTag{}) + if tx.Error != nil { + return workerTagError(tx.Error, "deleting worker tag") + } + if tx.RowsAffected == 0 { + return ErrWorkerTagNotFound + } + return nil +} + +func (db *DB) FetchWorkerTags(ctx context.Context) ([]*WorkerTag, error) { + tags := make([]*WorkerTag, 0) + tx := db.gormDB.WithContext(ctx).Model(&WorkerTag{}).Scan(&tags) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching all worker tags") + } + return tags, nil +} + +func (db *DB) fetchWorkerTagsWithUUID(ctx context.Context, tagUUIDs []string) ([]*WorkerTag, error) { + tags := make([]*WorkerTag, 0) + tx := db.gormDB.WithContext(ctx). + Model(&WorkerTag{}). + Where("uuid in ?", tagUUIDs). + Scan(&tags) + if tx.Error != nil { + return nil, workerTagError(tx.Error, "fetching all worker tags") + } + return tags, nil +} + +func (db *DB) WorkerSetTags(ctx context.Context, worker *Worker, tagUUIDs []string) error { + tags, err := db.fetchWorkerTagsWithUUID(ctx, tagUUIDs) + if err != nil { + return workerTagError(err, "fetching worker tags") + } + + err = db.gormDB.WithContext(ctx). + Model(worker). + Association("Tags"). + Replace(tags) + if err != nil { + return workerTagError(err, "updating worker tags") + } + return nil +} diff --git a/internal/manager/persistence/worker_tag_test.go b/internal/manager/persistence/worker_tag_test.go new file mode 100644 index 00000000..372332fd --- /dev/null +++ b/internal/manager/persistence/worker_tag_test.go @@ -0,0 +1,165 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "testing" + "time" + + "git.blender.org/flamenco/internal/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateFetchTag(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Test fetching non-existent tag + fetchedTag, err := f.db.FetchWorkerTag(f.ctx, "7ee21bc8-ff1a-42d2-a6b6-cc4b529b189f") + assert.ErrorIs(t, err, ErrWorkerTagNotFound) + assert.Nil(t, fetchedTag) + + // New tag creation is already done in the workerTestFixtures() call. + assert.NotNil(t, f.tag) + + fetchedTag, err = f.db.FetchWorkerTag(f.ctx, f.tag.UUID) + require.NoError(t, err) + assert.NotNil(t, fetchedTag) + + // Test contents of fetched tag. + assert.Equal(t, f.tag.UUID, fetchedTag.UUID) + assert.Equal(t, f.tag.Name, fetchedTag.Name) + assert.Equal(t, f.tag.Description, fetchedTag.Description) + assert.Zero(t, fetchedTag.Workers) +} + +func TestFetchDeleteTags(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Single tag was created by fixture. + has, err := f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerTags to return true") + + secondTag := WorkerTag{ + UUID: uuid.New(), + Name: "arbeiderstag", + Description: "Worker tag in Dutch", + } + + require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag)) + + allTags, err := f.db.FetchWorkerTags(f.ctx) + require.NoError(t, err) + + require.Len(t, allTags, 2) + var allTagIDs [2]string + for idx := range allTags { + allTagIDs[idx] = allTags[idx].UUID + } + assert.Contains(t, allTagIDs, f.tag.UUID) + assert.Contains(t, allTagIDs, secondTag.UUID) + + has, err = f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.True(t, has, "expecting HasWorkerTags to return true") + + // Test deleting the 2nd tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, secondTag.UUID)) + + allTags, err = f.db.FetchWorkerTags(f.ctx) + require.NoError(t, err) + require.Len(t, allTags, 1) + assert.Equal(t, f.tag.UUID, allTags[0].UUID) + + // Test deleting the 1st tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) + has, err = f.db.HasWorkerTags(f.ctx) + require.NoError(t, err) + assert.False(t, has, "expecting HasWorkerTags to return false") +} + +func TestAssignUnassignWorkerTags(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + assertTags := func(msgLabel string, tagUUIDs ...string) { + w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + require.NoError(t, err) + + // Catch doubly-reported tags, as the maps below would hide those cases. + assert.Len(t, w.Tags, len(tagUUIDs), msgLabel) + + expectTags := make(map[string]bool) + for _, cid := range tagUUIDs { + expectTags[cid] = true + } + + actualTags := make(map[string]bool) + for _, c := range w.Tags { + actualTags[c.UUID] = true + } + + assert.Equal(t, expectTags, actualTags, msgLabel) + } + + secondTag := WorkerTag{ + UUID: uuid.New(), + Name: "arbeiderstag", + Description: "Worker tag in Dutch", + } + + require.NoError(t, f.db.CreateWorkerTag(f.ctx, &secondTag)) + + // By default the Worker should not be part of a tag. + assertTags("default tag assignment") + + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) + assertTags("setting one tag", f.tag.UUID) + + // Double assignments should also just work. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, f.tag.UUID})) + assertTags("setting twice the same tag", f.tag.UUID) + + // Multiple tag memberships. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID, secondTag.UUID})) + assertTags("setting two different tags", f.tag.UUID, secondTag.UUID) + + // Remove memberships. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{secondTag.UUID})) + assertTags("unassigning from first tag", secondTag.UUID) + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{})) + assertTags("unassigning from second tag") +} + +func TestSaveWorkerTag(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + f.tag.Name = "übertag" + f.tag.Description = "ʻO kēlā hui ma laila" + require.NoError(t, f.db.SaveWorkerTag(f.ctx, f.tag)) + + fetched, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID) + require.NoError(t, err) + assert.Equal(t, f.tag.Name, fetched.Name) + assert.Equal(t, f.tag.Description, fetched.Description) +} + +func TestDeleteWorkerTagWithWorkersAssigned(t *testing.T) { + f := workerTestFixtures(t, 1*time.Second) + defer f.done() + + // Assign the worker. + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) + + // Delete the tag. + require.NoError(t, f.db.DeleteWorkerTag(f.ctx, f.tag.UUID)) + + // Check the Worker has been unassigned from the tag. + w, err := f.db.FetchWorker(f.ctx, f.worker.UUID) + require.NoError(t, err) + assert.Empty(t, w.Tags) +} diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index b71996ab..4fe25431 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -31,7 +31,7 @@ type Worker struct { SupportedTaskTypes string `gorm:"type:varchar(255);default:''"` // comma-separated list of task types. - Clusters []*WorkerCluster `gorm:"many2many:worker_cluster_membership;constraint:OnDelete:CASCADE"` + Tags []*WorkerTag `gorm:"many2many:worker_tag_membership;constraint:OnDelete:CASCADE"` } func (w *Worker) Identifier() string { @@ -73,7 +73,7 @@ func (db *DB) CreateWorker(ctx context.Context, w *Worker) error { func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { w := Worker{} tx := db.gormDB.WithContext(ctx). - Preload("Clusters"). + Preload("Tags"). First(&w, "uuid = ?", uuid) if tx.Error != nil { return nil, workerError(tx.Error, "fetching worker") diff --git a/internal/manager/persistence/workers_test.go b/internal/manager/persistence/workers_test.go index 38417f01..a92bf705 100644 --- a/internal/manager/persistence/workers_test.go +++ b/internal/manager/persistence/workers_test.go @@ -319,18 +319,18 @@ func TestDeleteWorker(t *testing.T) { } } -func TestDeleteWorkerWithClusterAssigned(t *testing.T) { +func TestDeleteWorkerWithTagAssigned(t *testing.T) { f := workerTestFixtures(t, 1*time.Second) defer f.done() // Assign the worker. - require.NoError(t, f.db.WorkerSetClusters(f.ctx, f.worker, []string{f.cluster.UUID})) + require.NoError(t, f.db.WorkerSetTags(f.ctx, f.worker, []string{f.tag.UUID})) // Delete the Worker. require.NoError(t, f.db.DeleteWorker(f.ctx, f.worker.UUID)) - // Check the Worker has been unassigned from the cluster. - cluster, err := f.db.FetchWorkerCluster(f.ctx, f.cluster.UUID) + // Check the Worker has been unassigned from the tag. + tag, err := f.db.FetchWorkerTag(f.ctx, f.tag.UUID) require.NoError(t, err) - assert.Empty(t, cluster.Workers) + assert.Empty(t, tag.Workers) } diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/webupdates/worker_updates.go index a088cdb3..a60f40d8 100644 --- a/internal/manager/webupdates/worker_updates.go +++ b/internal/manager/webupdates/worker_updates.go @@ -32,7 +32,7 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { workerUpdate.LastSeen = &worker.LastSeenAt } - // TODO: add cluster IDs. + // TODO: add tag IDs. return workerUpdate } diff --git a/web/app/src/components/jobs/JobDetails.vue b/web/app/src/components/jobs/JobDetails.vue index 6198b7ee..55ca5d08 100644 --- a/web/app/src/components/jobs/JobDetails.vue +++ b/web/app/src/components/jobs/JobDetails.vue @@ -32,12 +32,17 @@
ID
{{ jobData.id }}
-