diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 37646b9e..651b6a27 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -86,6 +86,8 @@ type ChangeBroadcaster interface { // Note that there is no BroadcastNewTask. The 'new job' broadcast is sent // after the job's tasks have been created, and thus there is no need for a // separate broadcast per task. + + BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) } // ChangeBroadcaster should be a subset of webupdates.BiDirComms. diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 79b75a77..3e7bf926 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -264,6 +264,18 @@ func (mr *MockChangeBroadcasterMockRecorder) BroadcastNewJob(arg0 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastNewJob", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastNewJob), arg0) } +// BroadcastTaskLogUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastTaskLogUpdate(arg0 api.SocketIOTaskLogUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastTaskLogUpdate", arg0) +} + +// BroadcastTaskLogUpdate indicates an expected call of BroadcastTaskLogUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastTaskLogUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastTaskLogUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastTaskLogUpdate), arg0) +} + // MockJobCompiler is a mock of JobCompiler interface. type MockJobCompiler struct { ctrl *gomock.Controller diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 1fbda706..4f9b00c8 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -17,6 +17,7 @@ import ( "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/task_state_machine" + "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/pkg/api" ) @@ -403,6 +404,10 @@ func (f *Flamenco) doTaskUpdate( if err != nil { logger.Error().Err(err).Msg("error writing task log") } + + // Broadcast the task log to SocketIO clients. + taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, *update.Log) + f.broadcaster.BroadcastTaskLogUpdate(taskUpdate) } // Any error updating the status is more important than an error updating the diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index a9a19150..96068877 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -42,6 +42,14 @@ func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate { return taskUpdate } +// NewTaskLogUpdate returns a SocketIOTaskLogUpdate for the given task. +func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdate { + return api.SocketIOTaskLogUpdate{ + TaskId: taskUUID, + Log: logchunk, + } +} + // BroadcastJobUpdate sends the job update to clients. func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) { log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") @@ -67,3 +75,14 @@ func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { room := roomForJob(taskUpdate.JobId) b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate) } + +// BroadcastTaskLogUpdate sends the task log chunk to clients. +func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { + // Don't log the contents here; logs can get big. + room := roomForTaskLog(taskLogUpdate.TaskId) + log.Debug(). + Str("task", taskLogUpdate.TaskId). + Str("room", string(room)). + Msg("socketIO: broadcasting task log") + b.BroadcastTo(room, SIOEventTaskLogUpdate, taskLogUpdate) +} diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go index 085fce4d..8189dc3d 100644 --- a/internal/manager/webupdates/sio_rooms.go +++ b/internal/manager/webupdates/sio_rooms.go @@ -30,6 +30,7 @@ const ( SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate + SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription ) @@ -60,6 +61,8 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock switch subs.Type { case api.SocketIOSubscriptionTypeJob: sioRoom = roomForJob(uuid.String()) + case api.SocketIOSubscriptionTypeTasklog: + sioRoom = roomForTaskLog(uuid.String()) default: logger.Warn().Msg("socketIO: unknown subscription type, ignoring") return "unknown subscription type, ignoring request" @@ -93,3 +96,12 @@ func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.Sock func roomForJob(jobUUID string) SocketIORoomName { return SocketIORoomName("job-" + jobUUID) } + +// roomForTaskLog will return the SocketIO room name for receiving task logs of +// the the given task. +// +// Note that general task updates (`api.SIOEventTaskUpdate`) are sent to their +// job's room, and not to this room. +func roomForTaskLog(taskUUID string) SocketIORoomName { + return SocketIORoomName("tasklog-" + taskUUID) +} diff --git a/web/app/src/components/UpdateListener.vue b/web/app/src/components/UpdateListener.vue index f06e7d54..0c8501ff 100644 --- a/web/app/src/components/UpdateListener.vue +++ b/web/app/src/components/UpdateListener.vue @@ -11,7 +11,7 @@ import { useSocketStatus } from '@/stores/socket-status'; export default { emits: [ // Data from Flamenco Manager: - "jobUpdate", "taskUpdate", "message", + "jobUpdate", "taskUpdate", "taskLogUpdate", "message", // SocketIO events: "sioReconnected", "sioDisconnected" ], @@ -44,6 +44,14 @@ export default { this._updateJobSubscription("subscribe", newJobID); } }, + subscribedTaskID(newTaskID, oldTaskID) { + if (oldTaskID) { + this._updateTaskLogSubscription("unsubscribe", oldTaskID); + } + if (newTaskID) { + this._updateTaskLogSubscription("subscribe", newTaskID); + } + }, }, methods: { connectToWebsocket() { @@ -117,6 +125,13 @@ export default { this.$emit("taskUpdate", apiTaskUpdate); }); + this.socket.on("/tasklog", (taskLogUpdate) => { + // Convert to API object, in order to have the same parsing of data as + // when we'd do an API call. + const apiTaskLogUpdate = API.SocketIOTaskLogUpdate.constructFromObject(taskLogUpdate) + this.$emit("taskLogUpdate", apiTaskLogUpdate); + }); + // Chat system, useful for debugging. this.socket.on("/message", (message) => { this.$emit("message", message); @@ -140,15 +155,32 @@ export default { this.socket.emit("/chat", payload); }, + /** + * Send job (un)subscription request. + * @param {string} operation either "subscribe" or "unsubscribe" + * @param {string} jobID + */ _updateJobSubscription(operation, jobID) { const payload = new API.SocketIOSubscription(operation, "job", jobID); console.log(`sending job ${operation}:`, payload); this.socket.emit("/subscription", payload); }, + /** + * Send task log (un)subscription request. + * @param {string} operation either "subscribe" or "unsubscribe" + * @param {string} jobID + */ + _updateTaskLogSubscription(operation, taskID) { + const payload = new API.SocketIOSubscription(operation, "tasklog", taskID); + console.log(`sending tasklog ${operation}:`, payload); + this.socket.emit("/subscription", payload); + }, + // Resubscribe to whatever we want to be subscribed to: _resubscribe() { if (this.subscribedJobID) this._updateJobSubscription("subscribe", this.subscribedJobID); + if (this.subscribedTaskID) this._updateTaskLogSubscription("subscribe", this.subscribedTaskID); }, }, }; diff --git a/web/app/src/main.js b/web/app/src/main.js index 5b0d7e12..55589b01 100644 --- a/web/app/src/main.js +++ b/web/app/src/main.js @@ -24,7 +24,9 @@ app.mount('#app') // For debugging. import { useJobs } from '@/stores/jobs'; import { useNotifs } from '@/stores/notifications'; +import { useTaskLog } from '@/stores/tasklog'; import * as API from '@/manager-api'; window.jobs = useJobs(); window.notifs = useNotifs(); +window.taskLog = useTaskLog(); window.API = API; diff --git a/web/app/src/stores/tasklog.js b/web/app/src/stores/tasklog.js new file mode 100644 index 00000000..ce14ac39 --- /dev/null +++ b/web/app/src/stores/tasklog.js @@ -0,0 +1,109 @@ +import { defineStore } from 'pinia' + +// Maximum number of task log lines that will be stored. +const capacity = 1000; + +/** + * Store logs of the active task. + */ +export const useTaskLog = defineStore('taskLog', { + state: () => ({ + /** + * Task log entries. + * + * The 'id' is just for Tabulator to uniquely identify rows, in order to be + * able to scroll to them and keep them in order. + * + * @type {{ id: Number, line: string }[]} */ + history: [], + /** @type { id: Number, line: string } */ + last: "", + + lastID: 0, + }), + getters: { + empty: (state) => state.history.length == 0, + }, + actions: { + /** + * @param {API.SocketIOTaskLogUpdate} taskLogUpdate + */ + addTaskLogUpdate(taskLogUpdate) { + console.log('task log update:', taskLogUpdate); + this.addChunk(taskLogUpdate.log); + }, + + /** + * Erase the entire task log history. Use this when switching between tasks. + */ + clear() { + this.$patch((state) => { + state.history = []; + state.last = null; + state.lastID = 0; + state.hasChanged = true; + }); + }, + + /** + * Add a task log chunk. + * @param {string} logChunk + */ + addChunk(logChunk) { + const lines = logChunk.trimEnd().split('\n'); + if (lines.length == 0) + return; + + if (lines.length > capacity) { + // Only keep the `capacity` last lines, so that adding them to the + // history will not overflow the capacity. + lines.splice(0, lines.length - capacity); + } + + this.$patch((state) => { + let entry = null; + + // Make sure there is enough space to actually add the new lines. + this._pruneState(state, lines.length); + + for (let line of lines) { + entry = this._createEntry(state, line); + state.history.push(entry); + } + + if (entry == null) { + console.warn("taskLog.addChunk: there were lines to add, but no entry created. Weird."); + return; + } + + state.last = entry; + state.lastID = entry.id; + state.hasChanged = true; + }); + }, + + _createEntry(state, line) { + return {id: this._generateID(state), line: line}; + }, + + /** + * Ensure there is enough space in the history to fit `spaceForLineNum` lines. + */ + _pruneState(state, spaceForLineNum) { + if (spaceForLineNum > capacity) { + // No need to calculate anything, just delete everything. + state.history = []; + return; + } + + const pruneTo = capacity - spaceForLineNum; + if (state.history.length <= pruneTo) return; + + const deleteCount = state.history.length - pruneTo; + state.history.splice(0, deleteCount); + }, + _generateID(state) { + return ++state.lastID; + } + }, +}) diff --git a/web/app/src/views/JobsView.vue b/web/app/src/views/JobsView.vue index 63f9bf45..4e8f0687 100644 --- a/web/app/src/views/JobsView.vue +++ b/web/app/src/views/JobsView.vue @@ -13,8 +13,10 @@ - @@ -24,6 +26,7 @@ import * as API from '@/manager-api'; import { useJobs } from '@/stores/jobs'; import { useTasks } from '@/stores/tasks'; import { useNotifs } from '@/stores/notifications' +import { useTaskLog } from '@/stores/tasklog' import { apiClient } from '@/stores/api-query-count'; import FooterPopup from '@/components/FooterPopup.vue' @@ -53,6 +56,7 @@ export default { jobs: useJobs(), tasks: useTasks(), notifs: useNotifs(), + taskLog: useTaskLog(), showFooterPopup: false, }), computed: { @@ -78,6 +82,7 @@ export default { this._fetchJob(newJobID); }, taskID(newTaskID, oldTaskID) { + this.taskLog.clear(); this._fetchTask(newTaskID); }, }, @@ -140,6 +145,14 @@ export default { this.notifs.addTaskUpdate(taskUpdate); }, + /** + * Event handler for SocketIO task log updates. + * @param {API.SocketIOTaskLogUpdate} taskLogUpdate + */ + onSioTaskLogUpdate(taskLogUpdate) { + this.taskLog.addTaskLogUpdate(taskLogUpdate); + }, + /** * @param {string} jobID job ID to navigate to, can be empty string for "no active job". */ @@ -191,8 +204,9 @@ export default { return jobsAPI.fetchTask(taskID) .then((task) => { this.tasks.setActiveTask(task); - // Forward the full task to Tabulator, so that that gets updated too. - this.$refs.tasksTable.processTaskUpdate(task); + // Forward the full task to Tabulator, so that that gets updated too.\ + if (this.$refs.tasksTable) + this.$refs.tasksTable.processTaskUpdate(task); }); },