Task log broadcasting via SocketIO

Implement task log broadcasting via SocketIO. The logs aren't shown in the
web interface yet, but do arrive there in a Pinia store. That store is
capped at 1000 lines to keep memory requirements low-ish.
This commit is contained in:
Sybren A. Stüvel 2022-05-20 13:03:41 +02:00
parent 1bb4ada72b
commit 3e5f681321
9 changed files with 212 additions and 5 deletions

View File

@ -86,6 +86,8 @@ type ChangeBroadcaster interface {
// Note that there is no BroadcastNewTask. The 'new job' broadcast is sent
// after the job's tasks have been created, and thus there is no need for a
// separate broadcast per task.
BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate)
}
// ChangeBroadcaster should be a subset of webupdates.BiDirComms.

View File

@ -264,6 +264,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0)
}
// BroadcastTaskLogUpdate mocks base method.
func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0)
}
// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate.
func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0)
}
// MockJobCompiler is a mock of JobCompiler interface.
type MockJobCompiler struct {
ctrl *gomock.Controller

View File

@ -17,6 +17,7 @@ import (
"git.blender.org/flamenco/internal/manager/persistence"
"git.blender.org/flamenco/internal/manager/task_state_machine"
"git.blender.org/flamenco/internal/manager/webupdates"
"git.blender.org/flamenco/pkg/api"
)
@ -403,6 +404,10 @@ func (f *Flamenco) doTaskUpdate(
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, *update.Log)
f.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
}
// Any error updating the status is more important than an error updating the

View File

@ -42,6 +42,14 @@ func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate {
return taskUpdate
}
// NewTaskLogUpdate returns a SocketIOTaskLogUpdate for the given task.
func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdate {
return api.SocketIOTaskLogUpdate{
TaskId: taskUUID,
Log: logchunk,
}
}
// BroadcastJobUpdate sends the job update to clients.
func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) {
log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update")
@ -67,3 +75,14 @@ func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) {
room := roomForJob(taskUpdate.JobId)
b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate)
}
// BroadcastTaskLogUpdate sends the task log chunk to clients.
func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) {
// Don't log the contents here; logs can get big.
room := roomForTaskLog(taskLogUpdate.TaskId)
log.Debug().
Str("task", taskLogUpdate.TaskId).
Str("room", string(room)).
Msg("socketIO: broadcasting task log")
b.BroadcastTo(room, SIOEventTaskLogUpdate, taskLogUpdate)
}

View File

@ -30,6 +30,7 @@ const (
SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here
SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate
SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate
SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate
SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription
)
@ -60,6 +61,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
switch subs.Type {
case api.SocketIOSubscriptionTypeJob:
sioRoom = roomForJob(uuid.String())
case api.SocketIOSubscriptionTypeTasklog:
sioRoom = roomForTaskLog(uuid.String())
default:
logger.Warn().Msg("socketIO: unknown subscription type, ignoring")
return "unknown subscription type, ignoring request"
@ -93,3 +96,12 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock
func roomForJob(jobUUID string) SocketIORoomName {
return SocketIORoomName("job-" + jobUUID)
}
// roomForTaskLog will return the SocketIO room name for receiving task logs of
// the the given task.
//
// Note that general task updates (`api.SIOEventTaskUpdate`) are sent to their
// job's room, and not to this room.
func roomForTaskLog(taskUUID string) SocketIORoomName {
return SocketIORoomName("tasklog-" + taskUUID)
}

View File

@ -11,7 +11,7 @@ import { useSocketStatus } from '@/stores/socket-status';
export default {
emits: [
// Data from Flamenco Manager:
"jobUpdate", "taskUpdate", "message",
"jobUpdate", "taskUpdate", "taskLogUpdate", "message",
// SocketIO events:
"sioReconnected", "sioDisconnected"
],
@ -44,6 +44,14 @@ export default {
this._updateJobSubscription("subscribe", newJobID);
}
},
subscribedTaskID(newTaskID, oldTaskID) {
if (oldTaskID) {
this._updateTaskLogSubscription("unsubscribe", oldTaskID);
}
if (newTaskID) {
this._updateTaskLogSubscription("subscribe", newTaskID);
}
},
},
methods: {
connectToWebsocket() {
@ -117,6 +125,13 @@ export default {
this.$emit("taskUpdate", apiTaskUpdate);
});
this.socket.on("/tasklog", (taskLogUpdate) => {
// Convert to API object, in order to have the same parsing of data as
// when we'd do an API call.
const apiTaskLogUpdate = API.SocketIOTaskLogUpdate.constructFromObject(taskLogUpdate)
this.$emit("taskLogUpdate", apiTaskLogUpdate);
});
// Chat system, useful for debugging.
this.socket.on("/message", (message) => {
this.$emit("message", message);
@ -140,15 +155,32 @@ export default {
this.socket.emit("/chat", payload);
},
/**
* Send job (un)subscription request.
* @param {string} operation either "subscribe" or "unsubscribe"
* @param {string} jobID
*/
_updateJobSubscription(operation, jobID) {
const payload = new API.SocketIOSubscription(operation, "job", jobID);
console.log(`sending job ${operation}:`, payload);
this.socket.emit("/subscription", payload);
},
/**
* Send task log (un)subscription request.
* @param {string} operation either "subscribe" or "unsubscribe"
* @param {string} jobID
*/
_updateTaskLogSubscription(operation, taskID) {
const payload = new API.SocketIOSubscription(operation, "tasklog", taskID);
console.log(`sending tasklog ${operation}:`, payload);
this.socket.emit("/subscription", payload);
},
// Resubscribe to whatever we want to be subscribed to:
_resubscribe() {
if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID);
if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID);
},
},
};

View File

@ -24,7 +24,9 @@ app.mount('#app')
// For debugging.
import { useJobs } from '@/stores/jobs';
import { useNotifs } from '@/stores/notifications';
import { useTaskLog } from '@/stores/tasklog';
import * as API from '@/manager-api';
window.jobs = useJobs();
window.notifs = useNotifs();
window.taskLog = useTaskLog();
window.API = API;

View File

@ -0,0 +1,109 @@
import { defineStore } from 'pinia'
// Maximum number of task log lines that will be stored.
const capacity = 1000;
/**
* Store logs of the active task.
*/
export const useTaskLog = defineStore('taskLog', {
state: () => ({
/**
* Task log entries.
*
* The 'id' is just for Tabulator to uniquely identify rows, in order to be
* able to scroll to them and keep them in order.
*
* @type {{ id: Number, line: string }[]} */
history: [],
/** @type { id: Number, line: string } */
last: "",
lastID: 0,
}),
getters: {
empty: (state) => state.history.length == 0,
},
actions: {
/**
* @param {API.SocketIOTaskLogUpdate} taskLogUpdate
*/
addTaskLogUpdate(taskLogUpdate) {
console.log('task log update:', taskLogUpdate);
this.addChunk(taskLogUpdate.log);
},
/**
* Erase the entire task log history. Use this when switching between tasks.
*/
clear() {
this.$patch((state) => {
state.history = [];
state.last = null;
state.lastID = 0;
state.hasChanged = true;
});
},
/**
* Add a task log chunk.
* @param {string} logChunk
*/
addChunk(logChunk) {
const lines = logChunk.trimEnd().split('\n');
if (lines.length == 0)
return;
if (lines.length > capacity) {
// Only keep the `capacity` last lines, so that adding them to the
// history will not overflow the capacity.
lines.splice(0, lines.length - capacity);
}
this.$patch((state) => {
let entry = null;
// Make sure there is enough space to actually add the new lines.
this._pruneState(state, lines.length);
for (let line of lines) {
entry = this._createEntry(state, line);
state.history.push(entry);
}
if (entry == null) {
console.warn("taskLog.addChunk: there were lines to add, but no entry created. Weird.");
return;
}
state.last = entry;
state.lastID = entry.id;
state.hasChanged = true;
});
},
_createEntry(state, line) {
return {id: this._generateID(state), line: line};
},
/**
* Ensure there is enough space in the history to fit `spaceForLineNum` lines.
*/
_pruneState(state, spaceForLineNum) {
if (spaceForLineNum > capacity) {
// No need to calculate anything, just delete everything.
state.history = [];
return;
}
const pruneTo = capacity - spaceForLineNum;
if (state.history.length <= pruneTo) return;
const deleteCount = state.history.length - pruneTo;
state.history.splice(0, deleteCount);
},
_generateID(state) {
return ++state.lastID;
}
},
})

View File

@ -13,8 +13,10 @@
<footer class="window-footer" v-if="!showFooterPopup" @click="showFooterPopup = true"><notification-bar /></footer>
<footer-popup v-if="showFooterPopup" ref="footerPopup" @clickClose="showFooterPopup = false" />
<update-listener ref="updateListener" :websocketURL="websocketURL" :subscribedJobID="jobID"
@jobUpdate="onSioJobUpdate" @taskUpdate="onSioTaskUpdate" @message="onChatMessage"
<update-listener ref="updateListener" :websocketURL="websocketURL"
:subscribedJobID="jobID" :subscribedTaskID="taskID"
@jobUpdate="onSioJobUpdate" @taskUpdate="onSioTaskUpdate" @taskLogUpdate="onSioTaskLogUpdate"
@message="onChatMessage"
@sioReconnected="onSIOReconnected" @sioDisconnected="onSIODisconnected" />
</template>
@ -24,6 +26,7 @@ import * as API from '@/manager-api';
import { useJobs } from '@/stores/jobs';
import { useTasks } from '@/stores/tasks';
import { useNotifs } from '@/stores/notifications'
import { useTaskLog } from '@/stores/tasklog'
import { apiClient } from '@/stores/api-query-count';
import FooterPopup from '@/components/FooterPopup.vue'
@ -53,6 +56,7 @@ export default {
jobs: useJobs(),
tasks: useTasks(),
notifs: useNotifs(),
taskLog: useTaskLog(),
showFooterPopup: false,
}),
computed: {
@ -78,6 +82,7 @@ export default {
this._fetchJob(newJobID);
},
taskID(newTaskID, oldTaskID) {
this.taskLog.clear();
this._fetchTask(newTaskID);
},
},
@ -140,6 +145,14 @@ export default {
this.notifs.addTaskUpdate(taskUpdate);
},
/**
* Event handler for SocketIO task log updates.
* @param {API.SocketIOTaskLogUpdate} taskLogUpdate
*/
onSioTaskLogUpdate(taskLogUpdate) {
this.taskLog.addTaskLogUpdate(taskLogUpdate);
},
/**
* @param {string} jobID job ID to navigate to, can be empty string for "no active job".
*/
@ -191,7 +204,8 @@ export default {
return jobsAPI.fetchTask(taskID)
.then((task) => {
this.tasks.setActiveTask(task);
// Forward the full task to Tabulator, so that that gets updated too.
// Forward the full task to Tabulator, so that that gets updated too.\
if (this.$refs.tasksTable)
this.$refs.tasksTable.processTaskUpdate(task);
});
},