diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index c33fd76e..0b1f9e91 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -120,6 +120,9 @@ type ChangeBroadcaster interface { BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) + + BroadcastWorkerTagUpdate(workerTagUpdate api.SocketIOWorkerTagUpdate) + BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) } // ChangeBroadcaster should be a subset of webupdates.BiDirComms. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index cd5726d8..92c58264 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -632,6 +632,30 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0) } +// BroadcastNewWorkerTag mocks base method. +func (m *MockChangeBroadcaster) BroadcastNewWorkerTag(arg0 api.SocketIOWorkerTagUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastNewWorkerTag", arg0) +} + +// BroadcastNewWorkerTag indicates an expected call of BroadcastNewWorkerTag. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorkerTag(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorkerTag", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorkerTag), arg0) +} + +// BroadcastWorkerTagUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastWorkerTagUpdate(arg0 api.SocketIOWorkerTagUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastWorkerTagUpdate", arg0) +} + +// BroadcastWorkerTagUpdate indicates an expected call of BroadcastWorkerTagUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerTagUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerTagUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerTagUpdate), arg0) +} + // BroadcastWorkerUpdate mocks base method. func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) { m.ctrl.T.Helper() diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 4c013169..590a481c 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -266,7 +266,9 @@ func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error deleting worker tag: %v", err) } - // TODO: SocketIO broadcast of tag deletion. + // SocketIO broadcast of tag deletion. + update := webupdates.NewWorkerTagDeletedUpdate(tagUUID) + f.broadcaster.BroadcastWorkerTagUpdate(update) logger.Info().Msg("worker tag deleted") return e.NoContent(http.StatusNoContent) @@ -344,7 +346,10 @@ func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error { return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag") } - // TODO: SocketIO broadcast of tag update. + // SocketIO broadcast of tag update. + sioUpdate := webupdates.NewWorkerTagUpdate(dbTag) + f.broadcaster.BroadcastWorkerTagUpdate(sioUpdate) + logger.Info().Msg("worker tag updated") return e.NoContent(http.StatusNoContent) } @@ -412,7 +417,10 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error { } logger.Info().Msg("created new worker tag") - // TODO: SocketIO broadcast of tag creation. + + // SocketIO broadcast of tag creation. + sioUpdate := webupdates.NewWorkerTagUpdate(&dbTag) + f.broadcaster.BroadcastNewWorkerTag(sioUpdate) return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag)) } diff --git a/internal/manager/api_impl/worker_mgt_test.go b/internal/manager/api_impl/worker_mgt_test.go index 3ddda15a..2e4493d1 100644 --- a/internal/manager/api_impl/worker_mgt_test.go +++ b/internal/manager/api_impl/worker_mgt_test.go @@ -281,7 +281,9 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) { Description: *apiTag.Description, } mf.persistence.EXPECT().CreateWorkerTag(gomock.Any(), &expectDBTag) - // TODO: expect SocketIO broadcast of the tag creation. + mf.broadcaster.EXPECT().BroadcastNewWorkerTag(api.SocketIOWorkerTagUpdate{ + Tag: apiTag, + }) echo := mf.prepareMockedJSONRequest(apiTag) require.NoError(t, mf.flamenco.CreateWorkerTag(echo)) assertResponseJSON(t, echo, http.StatusOK, &apiTag) @@ -303,7 +305,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) { Name: newAPITag.Name, Description: *apiTag.Description, // Not mentioning new description should keep old one. } - // TODO: expect SocketIO broadcast of the tag update. + mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{ + Id: &UUID, + Name: newAPITag.Name, + Description: apiTag.Description, + }, + }) mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag) echo = mf.prepareMockedJSONRequest(newAPITag) @@ -320,7 +328,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) { Name: newAPITag.Name, Description: "", } - // TODO: expect SocketIO broadcast of the tag update. + mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{ + Id: &UUID, + Name: newAPITag.Name, + Description: newAPITag.Description, + }, + }) mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag) echo = mf.prepareMockedJSONRequest(newAPITag) @@ -337,7 +351,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) { Name: newAPITag.Name, Description: *newAPITag.Description, } - // TODO: expect SocketIO broadcast of the tag update. + mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{ + Id: &UUID, + Name: newAPITag.Name, + Description: newAPITag.Description, + }, + }) mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag) echo = mf.prepareMockedJSONRequest(newAPITag) @@ -347,7 +367,10 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) { // Delete. mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil) mf.persistence.EXPECT().DeleteWorkerTag(gomock.Any(), UUID) - // TODO: expect SocketIO broadcast of the tag deletion. + mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{Id: &UUID}, + WasDeleted: ptr(true), + }) echo = mf.prepareMockedJSONRequest(newAPITag) require.NoError(t, mf.flamenco.DeleteWorkerTag(echo, UUID)) assertResponseNoContent(t, echo) diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 54749bc5..2ddba497 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -21,9 +21,10 @@ const ( // Predefined SocketIO rooms. There will be others, but those will have a // dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be // listed here as constants. See `roomXXX()` functions for those. - SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. - SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. - SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. + SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. + SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. + SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. + SocketIORoomWorkerTags SocketIORoomName = "WorkerTags" // For worker tag updates. // For updates about ALL last-rendered images. Normally these are sent to a // room specific to a particular job, but for the global "last rendered image" @@ -40,6 +41,7 @@ const ( SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate + SIOEventWorkerTagUpdate SocketIOEventType = "/workertags" // sends api.SocketIOWorkerTagUpdate SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) @@ -74,6 +76,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock sioRoom = SocketIORoomWorkers case api.SocketIOSubscriptionTypeAllLastRendered: sioRoom = SocketIORoomLastRendered + case api.SocketIOSubscriptionTypeAllWorkerTags: + sioRoom = SocketIORoomWorkerTags case api.SocketIOSubscriptionTypeJob: if subs.Uuid == nil { logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") diff --git a/internal/manager/webupdates/workertag_updates.go b/internal/manager/webupdates/workertag_updates.go new file mode 100644 index 00000000..95d0ec71 --- /dev/null +++ b/internal/manager/webupdates/workertag_updates.go @@ -0,0 +1,48 @@ +package webupdates + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/persistence" + "projects.blender.org/studio/flamenco/pkg/api" +) + +// NewWorkerTagUpdate returns a partial SocketIOWorkerTagUpdate struct for the +// given worker tag. It only fills in the fields that represent the current +// state of the tag. +func NewWorkerTagUpdate(tag *persistence.WorkerTag) api.SocketIOWorkerTagUpdate { + tagUpdate := api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{ + Id: &tag.UUID, + Name: tag.Name, + Description: &tag.Description, + }, + } + return tagUpdate +} + +// NewWorkerTagDeletedUpdate returns a SocketIOWorkerTagUpdate struct that indicates +// the worker tag has been deleted. +func NewWorkerTagDeletedUpdate(tagUUID string) api.SocketIOWorkerTagUpdate { + wasDeleted := true + tagUpdate := api.SocketIOWorkerTagUpdate{ + Tag: api.WorkerTag{ + Id: &tagUUID, + }, + WasDeleted: &wasDeleted, + } + return tagUpdate +} + +// BroadcastWorkerTagUpdate sends the worker tag update to clients. +func (b *BiDirComms) BroadcastWorkerTagUpdate(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting worker tag update") + b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +} + +// BroadcastNewWorkerTag sends a "new worker tag" notification to clients. +func (b *BiDirComms) BroadcastNewWorkerTag(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting new worker tag") + b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +} diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index e5df5719..945d38af 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -13,12 +13,14 @@ const websocketURL = ws(); export default { emits: [ // Data from Flamenco Manager: - "jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate", "lastRenderedUpdate", + "jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate", + "lastRenderedUpdate", "workerTagUpdate", // SocketIO events: "sioReconnected", "sioDisconnected" ], props: [ "mainSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`. + "extraSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`. "subscribedJobID", "subscribedTaskID", ], @@ -66,6 +68,14 @@ export default { this._updateMainSubscription("subscribe", newType); } }, + extraSubscription(newType, oldType) { + if (oldType) { + this._updateMainSubscription("unsubscribe", oldType); + } + if (newType) { + this._updateMainSubscription("subscribe", newType); + } + }, }, methods: { connectToWebsocket() { @@ -160,6 +170,13 @@ export default { this.$emit("workerUpdate", apiWorkerUpdate); }); + this.socket.on("/workertags", (workerTagUpdate) => { + // Convert to API object, in order to have the same parsing of data as + // when we'd do an API call. + const apiWorkerTagUpdate = API.SocketIOWorkerTagUpdate.constructFromObject(workerTagUpdate) + this.$emit("workerTagUpdate", apiWorkerTagUpdate); + }); + // Chat system, useful for debugging. this.socket.on("/message", (message) => { this.$emit("message", message); @@ -219,6 +236,7 @@ export default { if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID); if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID); if (this.mainSubscription) this._updateMainSubscription("subscribe", this.mainSubscription); + if (this.extraSubscription) this._updateMainSubscription("subscribe", this.extraSubscription); }, }, }; diff --git a/web/app/src/manager-api/index.js b/web/app/src/manager-api/index.js index 3d6e4fa9..e1ff873e 100644 --- a/web/app/src/manager-api/index.js +++ b/web/app/src/manager-api/index.js @@ -63,6 +63,7 @@ import SocketIOSubscriptionOperation from './model/SocketIOSubscriptionOperation import SocketIOSubscriptionType from './model/SocketIOSubscriptionType'; import SocketIOTaskLogUpdate from './model/SocketIOTaskLogUpdate'; import SocketIOTaskUpdate from './model/SocketIOTaskUpdate'; +import SocketIOWorkerTagUpdate from './model/SocketIOWorkerTagUpdate'; import SocketIOWorkerUpdate from './model/SocketIOWorkerUpdate'; import SubmittedJob from './model/SubmittedJob'; import Task from './model/Task'; @@ -433,6 +434,12 @@ export { */ SocketIOTaskUpdate, + /** + * The SocketIOWorkerTagUpdate model constructor. + * @property {module:model/SocketIOWorkerTagUpdate} + */ + SocketIOWorkerTagUpdate, + /** * The SocketIOWorkerUpdate model constructor. * @property {module:model/SocketIOWorkerUpdate} diff --git a/web/app/src/views/WorkersView.vue b/web/app/src/views/WorkersView.vue index 711a8aeb..def831ca 100644 --- a/web/app/src/views/WorkersView.vue +++ b/web/app/src/views/WorkersView.vue @@ -7,7 +7,9 @@ @@ -85,6 +87,10 @@ export default { this._fetchWorker(this.workerID); }, + onSIOWorkerTagsUpdate(workerTagsUpdate) { + this.workers.refreshTags() + .then(() => this._fetchWorker(this.workerID)); + }, onTableWorkerClicked(rowData) { if (rowData.id == this.workerID) return;