diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go
index 824ce919..3c186e1a 100644
--- a/internal/manager/api_impl/api_impl.go
+++ b/internal/manager/api_impl/api_impl.go
@@ -89,6 +89,9 @@ type ChangeBroadcaster interface {
// separate broadcast per task.
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate)
+
+ BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate)
+ BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.
diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go
index 9460b85c..a5043bd2 100644
--- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go
+++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go
@@ -279,6 +279,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0)
}
+// BroadcastNewWorker mocks base method.
+func (m *MockChangeBroadcaster) BroadcastNewWorker(arg0 api.SocketIOWorkerUpdate) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "BroadcastNewWorker", arg0)
+}
+
+// BroadcastNewWorker indicates an expected call of BroadcastNewWorker.
+func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewWorker(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewWorker", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewWorker), arg0)
+}
+
// BroadcastTaskLogUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) {
m.ctrl.T.Helper()
@@ -291,6 +303,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interfa
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0)
}
+// BroadcastWorkerUpdate mocks base method.
+func (m *MockChangeBroadcaster) BroadcastWorkerUpdate(arg0 api.SocketIOWorkerUpdate) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "BroadcastWorkerUpdate", arg0)
+}
+
+// BroadcastWorkerUpdate indicates an expected call of BroadcastWorkerUpdate.
+func (mr *MockChangeBroadcasterMockRecorder) BroadcastWorkerUpdate(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastWorkerUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastWorkerUpdate), arg0)
+}
+
// MockJobCompiler is a mock of JobCompiler interface.
type MockJobCompiler struct {
ctrl *gomock.Controller
diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go
index b369d63a..e5ee4ecb 100644
--- a/internal/manager/api_impl/workers.go
+++ b/internal/manager/api_impl/workers.go
@@ -84,25 +84,37 @@ func (f *Flamenco) SignOn(e echo.Context) error {
}
logger.Info().Msg("worker signing on")
- w, err := f.workerUpdateAfterSignOn(e, req)
+ w, prevStatus, err := f.workerUpdateAfterSignOn(e, req)
if err != nil {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
}
+ // Broadcast the status change.
+ update := webupdates.NewWorkerUpdate(w)
+ if prevStatus != "" {
+ update.PreviousStatus = &prevStatus
+ }
+ if w.StatusRequested != "" {
+ update.StatusRequested = &w.StatusRequested
+ }
+ f.broadcaster.BroadcastWorkerUpdate(update)
+
resp := api.WorkerStateChange{}
if w.StatusRequested != "" {
resp.StatusRequested = w.StatusRequested
} else {
resp.StatusRequested = api.WorkerStatusAwake
}
+
return e.JSON(http.StatusOK, resp)
}
-func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, error) {
+func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSONBody) (*persistence.Worker, api.WorkerStatus, error) {
logger := requestLogger(e)
w := requestWorkerOrPanic(e)
// Update the worker for with the new sign-on info.
+ prevStatus := w.Status
w.Status = api.WorkerStatusStarting
w.Address = e.RealIP()
w.Name = update.Nickname
@@ -120,10 +132,10 @@ func (f *Flamenco) workerUpdateAfterSignOn(e echo.Context, update api.SignOnJSON
logger.Warn().Err(err).
Str("newStatus", string(w.Status)).
Msg("error storing Worker in database")
- return nil, err
+ return nil, "", err
}
- return w, nil
+ return w, prevStatus, nil
}
func (f *Flamenco) SignOff(e echo.Context) error {
@@ -138,6 +150,7 @@ func (f *Flamenco) SignOff(e echo.Context) error {
logger.Info().Msg("worker signing off")
w := requestWorkerOrPanic(e)
+ prevStatus := w.Status
w.Status = api.WorkerStatusOffline
if w.StatusRequested == api.WorkerStatusShutdown {
w.StatusRequested = ""
@@ -163,6 +176,10 @@ func (f *Flamenco) SignOff(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks")
}
+ update := webupdates.NewWorkerUpdate(w)
+ update.PreviousStatus = &prevStatus
+ f.broadcaster.BroadcastWorkerUpdate(update)
+
return e.NoContent(http.StatusNoContent)
}
@@ -234,6 +251,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
Str("newStatus", string(req.Status)).
Logger()
+ prevStatus := w.Status
w.Status = req.Status
if w.StatusRequested != "" && req.Status != w.StatusRequested {
logger.Warn().
@@ -254,6 +272,10 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
}
+ update := webupdates.NewWorkerUpdate(w)
+ update.PreviousStatus = &prevStatus
+ f.broadcaster.BroadcastWorkerUpdate(update)
+
return e.NoContent(http.StatusNoContent)
}
diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go
index 427003bd..f0e18b81 100644
--- a/internal/manager/api_impl/workers_test.go
+++ b/internal/manager/api_impl/workers_test.go
@@ -96,6 +96,17 @@ func TestWorkerSignOn(t *testing.T) {
mf := newMockedFlamenco(mockCtrl)
worker := testWorker()
worker.Status = api.WorkerStatusOffline
+ prevStatus := worker.Status
+
+ mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
+ Id: worker.UUID,
+ Nickname: "Lazy Boi",
+ PreviousStatus: &prevStatus,
+ Status: api.WorkerStatusStarting,
+ StatusRequested: nil,
+ Updated: worker.UpdatedAt,
+ Version: "3.0-testing",
+ })
mf.persistence.EXPECT().SaveWorker(gomock.Any(), &worker).Return(nil)
@@ -167,6 +178,16 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
return nil
})
+ prevStatus := api.WorkerStatusAwake
+ mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
+ Id: worker.UUID,
+ Nickname: worker.Name,
+ PreviousStatus: &prevStatus,
+ Status: api.WorkerStatusOffline,
+ Updated: worker.UpdatedAt,
+ Version: worker.Software,
+ })
+
err := mf.flamenco.SignOff(echo)
assert.NoError(t, err)
@@ -174,6 +195,41 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
assert.Equal(t, http.StatusNoContent, resp.StatusCode)
}
+func TestWorkerStateChanged(t *testing.T) {
+ mockCtrl := gomock.NewController(t)
+ defer mockCtrl.Finish()
+
+ mf := newMockedFlamenco(mockCtrl)
+ worker := testWorker()
+ worker.Status = api.WorkerStatusStarting
+ prevStatus := worker.Status
+
+ // Expect a broadcast of the change
+ mf.broadcaster.EXPECT().BroadcastWorkerUpdate(api.SocketIOWorkerUpdate{
+ Id: worker.UUID,
+ Nickname: worker.Name,
+ PreviousStatus: &prevStatus,
+ Status: api.WorkerStatusAwake,
+ StatusRequested: nil,
+ Updated: worker.UpdatedAt,
+ Version: worker.Software,
+ })
+
+ // Expect the Worker to be saved with the new status
+ savedWorker := worker
+ savedWorker.Status = api.WorkerStatusAwake
+ mf.persistence.EXPECT().SaveWorkerStatus(gomock.Any(), &savedWorker).Return(nil)
+
+ // Perform the request
+ echo := mf.prepareMockedJSONRequest(api.WorkerStateChanged{
+ Status: api.WorkerStatusAwake,
+ })
+ requestWorkerStore(echo, &worker)
+ err := mf.flamenco.WorkerStateChanged(echo)
+ assert.NoError(t, err)
+ assertResponseEmpty(t, echo)
+}
+
func TestTaskUpdate(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go
index b10ba86e..9aef9c92 100644
--- a/internal/manager/webupdates/sio_rooms.go
+++ b/internal/manager/webupdates/sio_rooms.go
@@ -21,8 +21,9 @@ const (
// Predefined SocketIO rooms. There will be others, but those will have a
// dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be
// listed here as constants. See `roomXXX()` functions for those.
- SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
- SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
+ SocketIORoomChat SocketIORoomName = "Chat" // For chat messages.
+ SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates.
+ SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates.
)
const (
@@ -32,6 +33,7 @@ const (
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate
+ SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)
@@ -62,6 +64,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
switch subs.Type {
case api.SocketIOSubscriptionTypeAllJobs:
sioRoom = SocketIORoomJobs
+ case api.SocketIOSubscriptionTypeAllWorkers:
+ sioRoom = SocketIORoomWorkers
case api.SocketIOSubscriptionTypeJob:
if subs.Uuid == nil {
logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID")
diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/webupdates/worker_updates.go
new file mode 100644
index 00000000..03928e88
--- /dev/null
+++ b/internal/manager/webupdates/worker_updates.go
@@ -0,0 +1,41 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+package webupdates
+
+import (
+ "github.com/rs/zerolog/log"
+
+ "git.blender.org/flamenco/internal/manager/persistence"
+ "git.blender.org/flamenco/pkg/api"
+)
+
+// NewWorkerUpdate returns a partial SocketIOWorkerUpdate struct for the given worker.
+// It only fills in the fields that represent the current state of the worker. For
+// example, it omits `PreviousStatus`. The ommitted fields can be filled in by
+// the caller.
+func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate {
+ workerUpdate := api.SocketIOWorkerUpdate{
+ Id: worker.UUID,
+ Nickname: worker.Name,
+ Status: worker.Status,
+ Version: worker.Software,
+ Updated: worker.UpdatedAt,
+ }
+ return workerUpdate
+}
+
+// BroadcastWorkerUpdate sends the worker update to clients.
+func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) {
+ log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update")
+ b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
+}
+
+// BroadcastNewWorker sends a "new worker" notification to clients.
+func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) {
+ if workerUpdate.PreviousStatus != nil {
+ log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state")
+ workerUpdate.PreviousStatus = nil
+ }
+
+ log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker")
+ b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate)
+}
diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue
index b25ae6b5..e8bfe7e4 100644
--- a/web/app/src/components/UpdateListener.vue
+++ b/web/app/src/components/UpdateListener.vue
@@ -13,7 +13,7 @@ const websocketURL = ws();
export default {
emits: [
// Data from Flamenco Manager:
- "jobUpdate", "taskUpdate", "taskLogUpdate", "message",
+ "jobUpdate", "taskUpdate", "taskLogUpdate", "message", "workerUpdate",
// SocketIO events:
"sioReconnected", "sioDisconnected"
],
@@ -146,6 +146,13 @@ export default {
this.$emit("taskLogUpdate", apiTaskLogUpdate);
});
+ this.socket.on("/workers", (workerUpdate) => {
+ // Convert to API object, in order to have the same parsing of data as
+ // when we'd do an API call.
+ const apiWorkerUpdate = API.SocketIOWorkerUpdate.constructFromObject(workerUpdate)
+ this.$emit("workerUpdate", apiWorkerUpdate);
+ });
+
// Chat system, useful for debugging.
this.socket.on("/message", (message) => {
this.$emit("message", message);
diff --git a/web/app/src/components/workers/WorkersTable.vue b/web/app/src/components/workers/WorkersTable.vue
index 8f027080..054fec99 100644
--- a/web/app/src/components/workers/WorkersTable.vue
+++ b/web/app/src/components/workers/WorkersTable.vue
@@ -99,24 +99,24 @@ export default {
this.tabulator.setData(data.workers);
this._refreshAvailableStatuses();
},
- // processWorkerUpdate(workerUpdate) {
- // // updateData() will only overwrite properties that are actually set on
- // // workerUpdate, and leave the rest as-is.
- // if (this.tabulator.initialized) {
- // this.tabulator.updateData([workerUpdate])
- // .then(this.sortData);
- // }
- // this._refreshAvailableStatuses();
- // },
- // processNewWorker(workerUpdate) {
- // if (this.tabulator.initialized) {
- // this.tabulator.updateData([workerUpdate])
- // .then(this.sortData);
- // }
- // this.tabulator.addData([workerUpdate])
- // .then(this.sortData);
- // this._refreshAvailableStatuses();
- // },
+ processWorkerUpdate(workerUpdate) {
+ // updateData() will only overwrite properties that are actually set on
+ // workerUpdate, and leave the rest as-is.
+ if (this.tabulator.initialized) {
+ this.tabulator.updateData([workerUpdate])
+ .then(this.sortData);
+ }
+ this._refreshAvailableStatuses();
+ },
+ processNewWorker(workerUpdate) {
+ if (this.tabulator.initialized) {
+ this.tabulator.updateData([workerUpdate])
+ .then(this.sortData);
+ }
+ this.tabulator.addData([workerUpdate])
+ .then(this.sortData);
+ this._refreshAvailableStatuses();
+ },
onRowClick(event, row) {
// Take a copy of the data, so that it's decoupled from the tabulator data
diff --git a/web/app/src/stores/notifications.js b/web/app/src/stores/notifications.js
index 2974555e..405dbf87 100644
--- a/web/app/src/stores/notifications.js
+++ b/web/app/src/stores/notifications.js
@@ -66,6 +66,18 @@ export const useNotifs = defineStore('notifications', {
this.add(msg)
},
+ /**
+ * @param {API.SocketIOWorkerUpdate} workerUpdate Worker update received via SocketIO.
+ */
+ addWorkerUpdate(workerUpdate) {
+ console.log('Received worker update:', workerUpdate);
+ let msg = `Worker ${workerUpdate.name}`;
+ if (workerUpdate.previous_status && workerUpdate.previous_status != workerUpdate.status) {
+ msg += ` changed status ${workerUpdate.previous_status} ➜ ${workerUpdate.status}`;
+ }
+ this.add(msg)
+ },
+
/* Ensure there is only 1000 items in the history. */
_prune() {
if (this.history.length <= 1000) return;
diff --git a/web/app/src/views/WorkersView.vue b/web/app/src/views/WorkersView.vue
index 284a9cc6..aaa6a077 100644
--- a/web/app/src/views/WorkersView.vue
+++ b/web/app/src/views/WorkersView.vue
@@ -7,7 +7,8 @@
@@ -24,6 +25,7 @@