SocketIO broadcasting for worker tags CUD operations

Broadcast create/update/delete operations on worker tags via SocketIO.

Ref: #104204
This commit is contained in:
Sybren A. Stüvel 2023-08-23 15:09:02 +02:00
parent 621f050a68
commit ef726da17b
9 changed files with 154 additions and 13 deletions

View File

@ -120,6 +120,9 @@ type ChangeBroadcaster interface {
BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate)
BroadcastWorkerTagUpdate(workerTagUpdate api.SocketIOWorkerTagUpdate)
BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.

View File

@ -632,6 +632,30 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0)
}
// BroadcastNewWorkerTag mocks base method.
func (m *MockChangeBroadcaster) BroadcastNewWorkerTag(arg0 api.SocketIOWorkerTagUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastNewWorkerTag", arg0)
}
// BroadcastNewWorkerTag indicates an expected call of BroadcastNewWorkerTag.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorkerTag(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorkerTag", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorkerTag), arg0)
}
// BroadcastWorkerTagUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerTagUpdate(arg0 api.SocketIOWorkerTagUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastWorkerTagUpdate", arg0)
}
// BroadcastWorkerTagUpdate indicates an expected call of BroadcastWorkerTagUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerTagUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerTagUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerTagUpdate), arg0)
}
// BroadcastWorkerUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
m.ctrl.T.Helper()

View File

@ -266,7 +266,9 @@ func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error {
return sendAPIError(e, http.StatusInternalServerError, "error deleting worker tag: %v", err)
}
// TODO: SocketIO broadcast of tag deletion.
// SocketIO broadcast of tag deletion.
update := webupdates.NewWorkerTagDeletedUpdate(tagUUID)
f.broadcaster.BroadcastWorkerTagUpdate(update)
logger.Info().Msg("worker tag deleted")
return e.NoContent(http.StatusNoContent)
@ -344,7 +346,10 @@ func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error {
return sendAPIError(e, http.StatusInternalServerError, "error saving worker tag")
}
// TODO: SocketIO broadcast of tag update.
// SocketIO broadcast of tag update.
sioUpdate := webupdates.NewWorkerTagUpdate(dbTag)
f.broadcaster.BroadcastWorkerTagUpdate(sioUpdate)
logger.Info().Msg("worker tag updated")
return e.NoContent(http.StatusNoContent)
}
@ -412,7 +417,10 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error {
}
logger.Info().Msg("created new worker tag")
// TODO: SocketIO broadcast of tag creation.
// SocketIO broadcast of tag creation.
sioUpdate := webupdates.NewWorkerTagUpdate(&dbTag)
f.broadcaster.BroadcastNewWorkerTag(sioUpdate)
return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag))
}

View File

@ -281,7 +281,9 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) {
Description: *apiTag.Description,
}
mf.persistence.EXPECT().CreateWorkerTag(gomock.Any(), &expectDBTag)
// TODO: expect SocketIO broadcast of the tag creation.
mf.broadcaster.EXPECT().BroadcastNewWorkerTag(api.SocketIOWorkerTagUpdate{
Tag: apiTag,
})
echo := mf.prepareMockedJSONRequest(apiTag)
require.NoError(t, mf.flamenco.CreateWorkerTag(echo))
assertResponseJSON(t, echo, http.StatusOK, &apiTag)
@ -303,7 +305,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) {
Name: newAPITag.Name,
Description: *apiTag.Description, // Not mentioning new description should keep old one.
}
// TODO: expect SocketIO broadcast of the tag update.
mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{
Id: &UUID,
Name: newAPITag.Name,
Description: apiTag.Description,
},
})
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag)
echo = mf.prepareMockedJSONRequest(newAPITag)
@ -320,7 +328,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) {
Name: newAPITag.Name,
Description: "",
}
// TODO: expect SocketIO broadcast of the tag update.
mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{
Id: &UUID,
Name: newAPITag.Name,
Description: newAPITag.Description,
},
})
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag)
echo = mf.prepareMockedJSONRequest(newAPITag)
@ -337,7 +351,13 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) {
Name: newAPITag.Name,
Description: *newAPITag.Description,
}
// TODO: expect SocketIO broadcast of the tag update.
mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{
Id: &UUID,
Name: newAPITag.Name,
Description: newAPITag.Description,
},
})
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
mf.persistence.EXPECT().SaveWorkerTag(gomock.Any(), &expectNewDBTag)
echo = mf.prepareMockedJSONRequest(newAPITag)
@ -347,7 +367,10 @@ func TestWorkerTagCRUDHappyFlow(t *testing.T) {
// Delete.
mf.persistence.EXPECT().FetchWorkerTag(gomock.Any(), UUID).Return(&expectDBTag, nil)
mf.persistence.EXPECT().DeleteWorkerTag(gomock.Any(), UUID)
// TODO: expect SocketIO broadcast of the tag deletion.
mf.broadcaster.EXPECT().BroadcastWorkerTagUpdate(api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{Id: &UUID},
WasDeleted: ptr(true),
})
echo = mf.prepareMockedJSONRequest(newAPITag)
require.NoError(t, mf.flamenco.DeleteWorkerTag(echo, UUID))
assertResponseNoContent(t, echo)

View File

@ -24,6 +24,7 @@ const (
SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates.
SocketIORoomWorkerTags SocketIORoomName = "WorkerTags" // For worker tag updates.
// For updates about ALL last-rendered images. Normally these are sent to a
// room specific to a particular job, but for the global "last rendered image"
@ -40,6 +41,7 @@ const (
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate
SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate
SIOEventWorkerTagUpdate SocketIOEventType = "/workertags" // sends api.SocketIOWorkerTagUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)
@ -74,6 +76,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
sioRoom = SocketIORoomWorkers
case api.SocketIOSubscriptionTypeAllLastRendered:
sioRoom = SocketIORoomLastRendered
case api.SocketIOSubscriptionTypeAllWorkerTags:
sioRoom = SocketIORoomWorkerTags
case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")

View File

@ -0,0 +1,48 @@
package webupdates
// SPDX-License-Identifier: GPL-3.0-or-later
import (
"github.com/rs/zerolog/log"
"projects.blender.org/studio/flamenco/internal/manager/persistence"
"projects.blender.org/studio/flamenco/pkg/api"
)
// NewWorkerTagUpdate returns a partial SocketIOWorkerTagUpdate struct for the
// given worker tag. It only fills in the fields that represent the current
// state of the tag.
func NewWorkerTagUpdate(tag *persistence.WorkerTag) api.SocketIOWorkerTagUpdate {
tagUpdate := api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{
Id: &tag.UUID,
Name: tag.Name,
Description: &tag.Description,
},
}
return tagUpdate
}
// NewWorkerTagDeletedUpdate returns a SocketIOWorkerTagUpdate struct that indicates
// the worker tag has been deleted.
func NewWorkerTagDeletedUpdate(tagUUID string) api.SocketIOWorkerTagUpdate {
wasDeleted := true
tagUpdate := api.SocketIOWorkerTagUpdate{
Tag: api.WorkerTag{
Id: &tagUUID,
},
WasDeleted: &wasDeleted,
}
return tagUpdate
}
// BroadcastWorkerTagUpdate sends the worker tag update to clients.
func (b *BiDirComms) BroadcastWorkerTagUpdate(WorkerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting worker tag update")
b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate)
}
// BroadcastNewWorkerTag sends a "new worker tag" notification to clients.
func (b *BiDirComms) BroadcastNewWorkerTag(WorkerTagUpdate api.SocketIOWorkerTagUpdate) {
log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting new worker tag")
b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate)
}

View File

@ -13,12 +13,14 @@ const websocketURL = ws();
export default {
emits: [
// Data from Flamenco Manager:
"jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate", "lastRenderedUpdate",
"jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate",
"lastRenderedUpdate", "workerTagUpdate",
// SocketIO events:
"sioReconnected", "sioDisconnected"
],
props: [
"mainSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`.
"extraSubscription", // One of the 'allXXX' subscription types, see `SocketIOSubscriptionType` in `flamenco-openapi.yaml`.
"subscribedJobID",
"subscribedTaskID",
],
@ -66,6 +68,14 @@ export default {
this._updateMainSubscription("subscribe", newType);
}
},
extraSubscription(newType, oldType) {
if (oldType) {
this._updateMainSubscription("unsubscribe", oldType);
}
if (newType) {
this._updateMainSubscription("subscribe", newType);
}
},
},
methods: {
connectToWebsocket() {
@ -160,6 +170,13 @@ export default {
this.$emit("workerUpdate", apiWorkerUpdate);
});
this.socket.on("/workertags", (workerTagUpdate) => {
// Convert to API object, in order to have the same parsing of data as
// when we'd do an API call.
const apiWorkerTagUpdate = API.SocketIOWorkerTagUpdate.constructFromObject(workerTagUpdate)
this.$emit("workerTagUpdate", apiWorkerTagUpdate);
});
// Chat system, useful for debugging.
this.socket.on("/message", (message) => {
this.$emit("message", message);
@ -219,6 +236,7 @@ export default {
if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID);
if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID);
if (this.mainSubscription) this._updateMainSubscription("subscribe", this.mainSubscription);
if (this.extraSubscription) this._updateMainSubscription("subscribe", this.extraSubscription);
},
},
};

View File

@ -63,6 +63,7 @@ import SocketIOSubscriptionOperation from './model/SocketIOSubscriptionOperation
import SocketIOSubscriptionType from './model/SocketIOSubscriptionType';
import SocketIOTaskLogUpdate from './model/SocketIOTaskLogUpdate';
import SocketIOTaskUpdate from './model/SocketIOTaskUpdate';
import SocketIOWorkerTagUpdate from './model/SocketIOWorkerTagUpdate';
import SocketIOWorkerUpdate from './model/SocketIOWorkerUpdate';
import SubmittedJob from './model/SubmittedJob';
import Task from './model/Task';
@ -433,6 +434,12 @@ export {
*/
SocketIOTaskUpdate,
/**
* The SocketIOWorkerTagUpdate model constructor.
* @property {module:model/SocketIOWorkerTagUpdate}
*/
SocketIOWorkerTagUpdate,
/**
* The SocketIOWorkerUpdate model constructor.
* @property {module:model/SocketIOWorkerUpdate}

View File

@ -7,7 +7,9 @@
</div>
<footer class="app-footer">
<notification-bar />
<update-listener ref="updateListener" mainSubscription="allWorkers" @workerUpdate="onSIOWorkerUpdate"
<update-listener ref="updateListener"
mainSubscription="allWorkers" extraSubscription="allWorkerTags"
@workerUpdate="onSIOWorkerUpdate" @workerTagUpdate="onSIOWorkerTagsUpdate"
@sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
</footer>
</template>
@ -85,6 +87,10 @@ export default {
this._fetchWorker(this.workerID);
},
onSIOWorkerTagsUpdate(workerTagsUpdate) {
this.workers.refreshTags()
.then(() => this._fetchWorker(this.workerID));
},
onTableWorkerClicked(rowData) {
if (rowData.id == this.workerID) return;