Task update notifications via SocketIO

Manager now sends out task updates via SocketIO, and the web interface
handles those.

Note that there is a `BroadcastTaskUpdate()` function, but not a
`BroadcastNewTask`. The 'new job' broadcast is sent after the job's
tasks have been created, and thus there is no need for a separate
broadcast per task.
This commit is contained in:
Sybren A. Stüvel 2022-05-03 11:25:38 +02:00
parent 222d618ef6
commit 50c8cd39f2
8 changed files with 79 additions and 11 deletions

View File

@ -76,6 +76,10 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil)
type ChangeBroadcaster interface {
// BroadcastNewJob sends a 'new job' notification to all SocketIO clients.
BroadcastNewJob(jobUpdate api.JobUpdate)
// Note that there is no BroadcastNewTask. The 'new job' broadcast is sent
// after the job's tasks have been created, and thus there is no need for a
// separate broadcast per task.
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.

View File

@ -157,3 +157,15 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0)
}
// BroadcastTaskUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskUpdate(arg0 api.SocketIOTaskUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastTaskUpdate", arg0)
}
// BroadcastTaskUpdate indicates an expected call of BroadcastTaskUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskUpdate), arg0)
}

View File

@ -48,8 +48,11 @@ type PersistenceService interface {
var _ PersistenceService = (*persistence.DB)(nil)
type ChangeBroadcaster interface {
// BroadcastJobUpdate sends the job update to clients.
// BroadcastJobUpdate sends the job update to SocketIO clients.
BroadcastJobUpdate(jobUpdate api.JobUpdate)
// BroadcastTaskUpdate sends the task update to SocketIO clients.
BroadcastTaskUpdate(jobUpdate api.SocketIOTaskUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms
@ -89,6 +92,12 @@ func (sm *StateMachine) TaskStatusChange(
if err := sm.persist.SaveTask(ctx, task); err != nil {
return fmt.Errorf("saving task to database: %w", err)
}
// Broadcast this change to the SocketIO clients.
taskUpdate := webupdates.NewTaskUpdate(task)
taskUpdate.PreviousStatus = &oldTaskStatus
sm.broadcaster.BroadcastTaskUpdate(taskUpdate)
if err := sm.updateJobAfterTaskStatusChange(ctx, task, oldTaskStatus); err != nil {
return fmt.Errorf("updating job after task status change: %w", err)
}

View File

@ -24,6 +24,24 @@ func NewJobUpdate(job *persistence.Job) api.JobUpdate {
return jobUpdate
}
// NewTaskUpdate returns a partial TaskUpdate struct for the given task. It only
// fills in the fields that represent the current state of the task. For
// example, it omits `PreviousStatus`. The omitted fields can be filled in by
// the caller.
//
// Assumes task.Job is not nil.
func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate {
taskUpdate := api.SocketIOTaskUpdate{
Id: task.UUID,
JobId: task.Job.UUID,
Name: task.Name,
Updated: task.UpdatedAt,
Status: task.Status,
}
return taskUpdate
}
// BroadcastJobUpdate sends the job update to clients.
func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) {
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update")
@ -31,6 +49,8 @@ func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.JobUpdate) {
}
// BroadcastNewJob sends a "new job" notification to clients.
// This function should be called when the job has been completely created, so
// including its tasks.
func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) {
if jobUpdate.PreviousStatus != nil {
log.Warn().Interface("jobUpdate", jobUpdate).Msg("socketIO: new jobs should not have a previous state")
@ -41,6 +61,13 @@ func (b *BiDirComms) BroadcastNewJob(jobUpdate api.JobUpdate) {
b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate)
}
// BroadcastTaskUpdate sends the task update to clients.
func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) {
log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update")
room := roomForJob(taskUpdate.JobId)
b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate)
}
// roomForJob will return the SocketIO room name for the given job. Clients in
// this room will receive info scoped to this job, so for example updates to all
// tasks of this job.

View File

@ -24,6 +24,7 @@ const (
SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here
SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.JobUpdate
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)

View File

@ -17,8 +17,8 @@
<footer>
<span class='notifications' v-if="notifs.last">{{ notifs.last.msg }}</span>
<update-listener ref="updateListener" :websocketURL="websocketURL" :subscribedJob="jobs.activeJobID"
@jobUpdate="onSioJobUpdate" @message="onChatMessage" @sioReconnected="onSIOReconnected"
@sioDisconnected="onSIODisconnected" />
@jobUpdate="onSioJobUpdate" @taskUpdate="onSioTaskUpdate" @message="onChatMessage"
@sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
</footer>
</template>
@ -61,7 +61,9 @@ export default {
this.fetchManagerInfo();
},
methods: {
// UI component event handlers:
// onSelectedJobChanged is called whenever the selected job changes; this is
// both when another job is selected and when the selected job itself gets
// updated.
onSelectedJobChanged(jobSummary) {
if (!jobSummary) { // There is no selected job.
this.jobs.deselectAllJobs();
@ -108,8 +110,8 @@ export default {
} else {
console.warn("App: this.$refs.jobsTable is", this.$refs.jobsTable);
}
const activeJob = this.jobs.activeJob;
if (activeJob && activeJob.id == jobUpdate.id) {
if (this.jobs.activeJobID == jobUpdate.id) {
this.onSelectedJobChanged(jobUpdate);
}
},
@ -122,6 +124,20 @@ export default {
// this.messages.push(`New job: ${jobUpdate.id} (${jobUpdate.status})`);
this.$refs.jobsTable.processNewJob(jobUpdate);
},
/**
* Event handler for SocketIO task updates.
* @param {API.SocketIOTaskUpdate} taskUpdate
*/
onSioTaskUpdate(taskUpdate) {
if (!this.$refs.tasksTable) {
console.warn("App: this.$refs.tasksTable is", this.$refs.tasksTable);
return;
}
this.$refs.tasksTable.processTaskUpdate(taskUpdate);
},
onChatMessage(message) {
console.log("chat message received:", message);
this.messages.push(`${message.text}`);

View File

@ -109,10 +109,6 @@ export default {
this.tabulator.updateData([taskUpdate])
.then(this.sortData);
},
processNewTask(taskUpdate) {
this.tabulator.addData([taskUpdate])
.then(this.sortData);
},
// Selection handling.
onRowSelected(selectedRow) {

View File

@ -26,7 +26,10 @@ export default {
}
this.connectToWebsocket();
},
beforeDestroy: function () {
unmounted() {
this.disconnectWebsocket();
},
beforeDestroy() {
this.disconnectWebsocket();
},
watch: {