From 76a24243f081e1420a545ed6b3a9fd7e6424de25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Sat, 3 Feb 2024 22:51:29 +0100 Subject: [PATCH] Manager: Introduce event bus system Introduce an "event bus"-like system. It's more like a fan-out broadcaster for certain events. Instead of directly sending events to SocketIO, they are now sent to the broker, which in turn sends it to any registered "forwarder". Currently there is ony one forwarder, for SocketIO. This opens the door for a proper MQTT client that sends the same events to an MQTT server. --- cmd/flamenco-manager/main.go | 21 ++- cmd/flamenco-manager/webservice.go | 6 +- internal/manager/api_impl/interfaces.go | 6 +- internal/manager/api_impl/jobs.go | 6 +- internal/manager/api_impl/worker_mgt.go | 14 +- internal/manager/api_impl/workers.go | 12 +- internal/manager/eventbus/eventbus.go | 40 +++++ .../events_jobs.go} | 44 +++-- .../events_workers.go} | 24 ++- .../events_workertags.go} | 18 +- internal/manager/eventbus/socketio.go | 168 ++++++++++++++++++ internal/manager/eventbus/topics.go | 38 ++++ internal/manager/job_deleter/interfaces.go | 6 +- internal/manager/job_deleter/job_deleter.go | 6 +- .../manager/sleep_scheduler/interfaces.go | 6 +- internal/manager/task_logs/task_logs.go | 8 +- .../manager/task_state_machine/interfaces.go | 6 +- .../task_state_machine/task_state_machine.go | 6 +- .../manager/timeout_checker/interfaces.go | 6 +- internal/manager/webupdates/chat.go | 17 -- internal/manager/webupdates/sio_rooms.go | 135 -------------- internal/manager/webupdates/webupdates.go | 70 -------- 22 files changed, 339 insertions(+), 324 deletions(-) create mode 100644 internal/manager/eventbus/eventbus.go rename internal/manager/{webupdates/job_updates.go => eventbus/events_jobs.go} (71%) rename internal/manager/{webupdates/worker_updates.go => eventbus/events_workers.go} (58%) rename internal/manager/{webupdates/workertag_updates.go => eventbus/events_workertags.go} (55%) create mode 100644 internal/manager/eventbus/socketio.go create mode 100644 internal/manager/eventbus/topics.go delete mode 100644 internal/manager/webupdates/chat.go delete mode 100644 internal/manager/webupdates/sio_rooms.go delete mode 100644 internal/manager/webupdates/webupdates.go diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 9b3c5409..f3bceeab 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -26,6 +26,7 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/api_impl" "projects.blender.org/studio/flamenco/internal/manager/api_impl/dummy" "projects.blender.org/studio/flamenco/internal/manager/config" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" @@ -35,7 +36,6 @@ import ( "projects.blender.org/studio/flamenco/internal/manager/task_logs" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" "projects.blender.org/studio/flamenco/internal/manager/timeout_checker" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/own_url" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" "projects.blender.org/studio/flamenco/pkg/shaman" @@ -152,29 +152,32 @@ func runFlamencoManager() bool { log.Fatal().Err(err).Msg("error loading job compilers") } - webUpdater := webupdates.New() + // Set up the event system. + eventBroker := eventbus.NewBroker() + socketio := eventbus.NewSocketIOForwarder() + eventBroker.AddForwarder(socketio) localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath) - logStorage := task_logs.NewStorage(localStorage, timeService, webUpdater) + logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker) - taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater, logStorage) - sleepScheduler := sleep_scheduler.New(timeService, persist, webUpdater) + taskStateMachine := task_state_machine.NewStateMachine(persist, eventBroker, logStorage) + sleepScheduler := sleep_scheduler.New(timeService, persist, eventBroker) lastRender := last_rendered.New(localStorage) shamanServer := buildShamanServer(configService, isFirstRun) - jobDeleter := job_deleter.NewService(persist, localStorage, webUpdater, shamanServer) + jobDeleter := job_deleter.NewService(persist, localStorage, eventBroker, shamanServer) flamenco := api_impl.NewFlamenco( - compiler, persist, webUpdater, logStorage, configService, + compiler, persist, eventBroker, logStorage, configService, taskStateMachine, shamanServer, timeService, lastRender, localStorage, sleepScheduler, jobDeleter) - e := buildWebService(flamenco, persist, ssdp, webUpdater, urls, localStorage) + e := buildWebService(flamenco, persist, ssdp, socketio, urls, localStorage) timeoutChecker := timeout_checker.New( configService.Get().TaskTimeout, configService.Get().WorkerTimeout, - timeService, persist, taskStateMachine, logStorage, webUpdater) + timeService, persist, taskStateMachine, logStorage, eventBroker) // The main context determines the lifetime of the application. All // long-running goroutines need to keep an eye on this, and stop their work diff --git a/cmd/flamenco-manager/webservice.go b/cmd/flamenco-manager/webservice.go index 295cb230..ede66add 100644 --- a/cmd/flamenco-manager/webservice.go +++ b/cmd/flamenco-manager/webservice.go @@ -20,9 +20,9 @@ import ( "github.com/ziflex/lecho/v3" "projects.blender.org/studio/flamenco/internal/manager/api_impl" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/local_storage" "projects.blender.org/studio/flamenco/internal/manager/swagger_ui" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/upnp_ssdp" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/web" @@ -32,7 +32,7 @@ func buildWebService( flamenco api.ServerInterface, persist api_impl.PersistenceService, ssdp *upnp_ssdp.Server, - webUpdater *webupdates.BiDirComms, + socketio *eventbus.SocketIOForwarder, ownURLs []url.URL, localStorage local_storage.StorageInfo, ) *echo.Echo { @@ -112,7 +112,7 @@ func buildWebService( // Register routes. api.RegisterHandlers(e, flamenco) - webUpdater.RegisterHandlers(e) + socketio.RegisterHandlers(e) swagger_ui.RegisterSwaggerUIStaticFiles(e) e.GET("/api/v3/openapi3.json", func(c echo.Context) error { return c.JSON(http.StatusOK, swagger) diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 593e8244..52ab5d25 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -14,13 +14,13 @@ import ( "github.com/rs/zerolog" "projects.blender.org/studio/flamenco/internal/manager/config" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/job_deleter" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/sleep_scheduler" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -125,8 +125,8 @@ type ChangeBroadcaster interface { BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) type JobCompiler interface { ListJobTypes() api.AvailableJobTypes diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 35aca5e9..9e9918b2 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -16,9 +16,9 @@ import ( "github.com/labstack/echo/v4" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/job_compilers" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/crosspath" @@ -105,7 +105,7 @@ func (f *Flamenco) SubmitJob(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error retrieving job from database") } - jobUpdate := webupdates.NewJobUpdate(dbJob) + jobUpdate := eventbus.NewJobUpdate(dbJob) f.broadcaster.BroadcastNewJob(jobUpdate) apiJob := jobDBtoAPI(dbJob) @@ -365,7 +365,7 @@ func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error { } // Broadcast this change to the SocketIO clients. - jobUpdate := webupdates.NewJobUpdate(dbJob) + jobUpdate := eventbus.NewJobUpdate(dbJob) f.broadcaster.BroadcastJobUpdate(jobUpdate) return e.NoContent(http.StatusNoContent) diff --git a/internal/manager/api_impl/worker_mgt.go b/internal/manager/api_impl/worker_mgt.go index 590a481c..a2412fa4 100644 --- a/internal/manager/api_impl/worker_mgt.go +++ b/internal/manager/api_impl/worker_mgt.go @@ -7,8 +7,8 @@ import ( "net/http" "github.com/labstack/echo/v4" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -121,7 +121,7 @@ func (f *Flamenco) DeleteWorker(e echo.Context, workerUUID string) error { now := f.clock.Now() // Broadcast the fact that this worker was just deleted. - update := webupdates.NewWorkerUpdate(worker) + update := eventbus.NewWorkerUpdate(worker) update.DeletedAt = &now f.broadcaster.BroadcastWorkerUpdate(update) @@ -183,7 +183,7 @@ func (f *Flamenco) RequestWorkerStatusChange(e echo.Context, workerUUID string) } // Broadcast the change. - update := webupdates.NewWorkerUpdate(dbWorker) + update := eventbus.NewWorkerUpdate(dbWorker) f.broadcaster.BroadcastWorkerUpdate(update) return e.NoContent(http.StatusNoContent) @@ -228,7 +228,7 @@ func (f *Flamenco) SetWorkerTags(e echo.Context, workerUUID string) error { } // Broadcast the change. - update := webupdates.NewWorkerUpdate(dbWorker) + update := eventbus.NewWorkerUpdate(dbWorker) f.broadcaster.BroadcastWorkerUpdate(update) return e.NoContent(http.StatusNoContent) @@ -267,7 +267,7 @@ func (f *Flamenco) DeleteWorkerTag(e echo.Context, tagUUID string) error { } // SocketIO broadcast of tag deletion. - update := webupdates.NewWorkerTagDeletedUpdate(tagUUID) + update := eventbus.NewWorkerTagDeletedUpdate(tagUUID) f.broadcaster.BroadcastWorkerTagUpdate(update) logger.Info().Msg("worker tag deleted") @@ -347,7 +347,7 @@ func (f *Flamenco) UpdateWorkerTag(e echo.Context, tagUUID string) error { } // SocketIO broadcast of tag update. - sioUpdate := webupdates.NewWorkerTagUpdate(dbTag) + sioUpdate := eventbus.NewWorkerTagUpdate(dbTag) f.broadcaster.BroadcastWorkerTagUpdate(sioUpdate) logger.Info().Msg("worker tag updated") @@ -419,7 +419,7 @@ func (f *Flamenco) CreateWorkerTag(e echo.Context) error { logger.Info().Msg("created new worker tag") // SocketIO broadcast of tag creation. - sioUpdate := webupdates.NewWorkerTagUpdate(&dbTag) + sioUpdate := eventbus.NewWorkerTagUpdate(&dbTag) f.broadcaster.BroadcastNewWorkerTag(sioUpdate) return e.JSON(http.StatusOK, workerTagDBtoAPI(dbTag)) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index af1eb29d..8e77545a 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -13,10 +13,10 @@ import ( "github.com/labstack/echo/v4" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/last_rendered" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/internal/uuid" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -105,7 +105,7 @@ func (f *Flamenco) SignOn(e echo.Context) error { } // Broadcast the status change to 'starting'. - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) if prevStatus != "" { update.PreviousStatus = &prevStatus } @@ -208,7 +208,7 @@ func (f *Flamenco) SignOff(e echo.Context) error { return sendAPIError(e, http.StatusInternalServerError, "error re-queueing your tasks") } - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) update.PreviousStatus = &prevStatus f.broadcaster.BroadcastWorkerUpdate(update) @@ -285,7 +285,7 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { } } - update := webupdates.NewWorkerUpdate(w) + update := eventbus.NewWorkerUpdate(w) update.PreviousStatus = &prevStatus f.broadcaster.BroadcastWorkerUpdate(update) @@ -367,7 +367,7 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { } // Broadcast a worker update so that the web frontend will show the newly assigned task. - update := webupdates.NewWorkerUpdate(worker) + update := eventbus.NewWorkerUpdate(worker) f.broadcaster.BroadcastWorkerUpdate(update) // Convert database objects to API objects: @@ -465,7 +465,7 @@ func (f *Flamenco) TaskOutputProduced(e echo.Context, taskID string) error { } // Broadcast when the processing is done. - update := webupdates.NewLastRenderedUpdate(jobUUID) + update := eventbus.NewLastRenderedUpdate(jobUUID) update.Thumbnail = *thumbnailInfo f.broadcaster.BroadcastLastRenderedImage(update) }, diff --git a/internal/manager/eventbus/eventbus.go b/internal/manager/eventbus/eventbus.go new file mode 100644 index 00000000..c0b4fdfe --- /dev/null +++ b/internal/manager/eventbus/eventbus.go @@ -0,0 +1,40 @@ +package eventbus + +import ( + "sync" +) + +type ( + EventTopic string +) + +type Forwarder interface { + Broadcast(topic EventTopic, payload interface{}) +} + +type Broker struct { + forwarders []Forwarder + mutex sync.Mutex +} + +func NewBroker() *Broker { + return &Broker{ + forwarders: []Forwarder{}, + mutex: sync.Mutex{}, + } +} + +func (b *Broker) AddForwarder(forwarder Forwarder) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.forwarders = append(b.forwarders, forwarder) +} + +func (b *Broker) broadcast(topic EventTopic, payload interface{}) { + b.mutex.Lock() + defer b.mutex.Unlock() + + for _, forwarder := range b.forwarders { + forwarder.Broadcast(topic, payload) + } +} diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/eventbus/events_jobs.go similarity index 71% rename from internal/manager/webupdates/job_updates.go rename to internal/manager/eventbus/events_jobs.go index 0ad547a7..eca03e05 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/eventbus/events_jobs.go @@ -1,5 +1,5 @@ // SPDX-License-Identifier: GPL-3.0-or-later -package webupdates +package eventbus import ( "github.com/rs/zerolog/log" @@ -64,49 +64,45 @@ func NewTaskLogUpdate(taskUUID string, logchunk string) api.SocketIOTaskLogUpdat } } -// BroadcastJobUpdate sends the job update to clients. -func (b *BiDirComms) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) { - log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") - b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) -} - // BroadcastNewJob sends a "new job" notification to clients. // This function should be called when the job has been completely created, so // including its tasks. -func (b *BiDirComms) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) { +func (b *Broker) BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) { if jobUpdate.PreviousStatus != nil { log.Warn().Interface("jobUpdate", jobUpdate).Msg("socketIO: new jobs should not have a previous state") jobUpdate.PreviousStatus = nil } log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting new job") - b.BroadcastTo(SocketIORoomJobs, SIOEventJobUpdate, jobUpdate) + b.broadcast(TopicJobUpdate, jobUpdate) } -// BroadcastTaskUpdate sends the task update to clients. -func (b *BiDirComms) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { - log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update") - room := roomForJob(taskUpdate.JobId) - b.BroadcastTo(room, SIOEventTaskUpdate, taskUpdate) +func (b *Broker) BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) { + log.Debug().Interface("jobUpdate", jobUpdate).Msg("socketIO: broadcasting job update") + b.broadcast(TopicJobUpdate, jobUpdate) } -// BroadcastLastRenderedImage sends the 'last-rendered' update to clients. -func (b *BiDirComms) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) { +func (b *Broker) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) { log.Debug().Interface("lastRenderedUpdate", update).Msg("socketIO: broadcasting last-rendered image update") - room := roomForJob(update.JobId) - b.BroadcastTo(room, SIOEventLastRenderedUpdate, update) + topic := topicForJobLastRendered(update.JobId) + b.broadcast(topic, update) // TODO: throttle these via a last-in-one-out queue (see `pkg/last_in_one_out_queue`). - b.BroadcastTo(SocketIORoomLastRendered, SIOEventLastRenderedUpdate, update) + b.broadcast(TopicLastRenderedImage, update) } -// BroadcastTaskLogUpdate sends the task log chunk to clients. -func (b *BiDirComms) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { +func (b *Broker) BroadcastTaskUpdate(taskUpdate api.SocketIOTaskUpdate) { + log.Debug().Interface("taskUpdate", taskUpdate).Msg("socketIO: broadcasting task update") + topic := topicForJob(taskUpdate.JobId) + b.broadcast(topic, taskUpdate) +} + +func (b *Broker) BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) { // Don't log the contents here; logs can get big. - room := roomForTaskLog(taskLogUpdate.TaskId) + topic := topicForTaskLog(taskLogUpdate.TaskId) log.Debug(). Str("task", taskLogUpdate.TaskId). - Str("room", string(room)). + Str("topic", string(topic)). Msg("socketIO: broadcasting task log") - b.BroadcastTo(room, SIOEventTaskLogUpdate, taskLogUpdate) + b.broadcast(topic, taskLogUpdate) } diff --git a/internal/manager/webupdates/worker_updates.go b/internal/manager/eventbus/events_workers.go similarity index 58% rename from internal/manager/webupdates/worker_updates.go rename to internal/manager/eventbus/events_workers.go index 5d6a3d7d..e208f4fb 100644 --- a/internal/manager/webupdates/worker_updates.go +++ b/internal/manager/eventbus/events_workers.go @@ -1,9 +1,7 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates +package eventbus import ( "github.com/rs/zerolog/log" - "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -38,19 +36,17 @@ func NewWorkerUpdate(worker *persistence.Worker) api.SocketIOWorkerUpdate { return workerUpdate } -// BroadcastWorkerUpdate sends the worker update to clients. -func (b *BiDirComms) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) { - log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting worker update") - b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) -} - -// BroadcastNewWorker sends a "new worker" notification to clients. -func (b *BiDirComms) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) { +func (b *Broker) BroadcastNewWorker(workerUpdate api.SocketIOWorkerUpdate) { if workerUpdate.PreviousStatus != nil { - log.Warn().Interface("workerUpdate", workerUpdate).Msg("socketIO: new workers should not have a previous state") + log.Warn().Interface("workerUpdate", workerUpdate).Msg("eventbus: new workers should not have a previous state") workerUpdate.PreviousStatus = nil } - log.Debug().Interface("workerUpdate", workerUpdate).Msg("socketIO: broadcasting new worker") - b.BroadcastTo(SocketIORoomWorkers, SIOEventWorkerUpdate, workerUpdate) + log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting new worker") + b.broadcast(TopicWorkerUpdate, workerUpdate) +} + +func (b *Broker) BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) { + log.Debug().Interface("workerUpdate", workerUpdate).Msg("eventbus: broadcasting worker update") + b.broadcast(TopicWorkerUpdate, workerUpdate) } diff --git a/internal/manager/webupdates/workertag_updates.go b/internal/manager/eventbus/events_workertags.go similarity index 55% rename from internal/manager/webupdates/workertag_updates.go rename to internal/manager/eventbus/events_workertags.go index 95d0ec71..4ec42601 100644 --- a/internal/manager/webupdates/workertag_updates.go +++ b/internal/manager/eventbus/events_workertags.go @@ -1,6 +1,4 @@ -package webupdates - -// SPDX-License-Identifier: GPL-3.0-or-later +package eventbus import ( "github.com/rs/zerolog/log" @@ -35,14 +33,12 @@ func NewWorkerTagDeletedUpdate(tagUUID string) api.SocketIOWorkerTagUpdate { return tagUpdate } -// BroadcastWorkerTagUpdate sends the worker tag update to clients. -func (b *BiDirComms) BroadcastWorkerTagUpdate(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { - log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting worker tag update") - b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +func (b *Broker) BroadcastWorkerTagUpdate(workerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting worker tag update") + b.broadcast(TopicWorkerTagUpdate, workerTagUpdate) } -// BroadcastNewWorkerTag sends a "new worker tag" notification to clients. -func (b *BiDirComms) BroadcastNewWorkerTag(WorkerTagUpdate api.SocketIOWorkerTagUpdate) { - log.Debug().Interface("WorkerTagUpdate", WorkerTagUpdate).Msg("socketIO: broadcasting new worker tag") - b.BroadcastTo(SocketIORoomWorkerTags, SIOEventWorkerTagUpdate, WorkerTagUpdate) +func (b *Broker) BroadcastNewWorkerTag(workerTagUpdate api.SocketIOWorkerTagUpdate) { + log.Debug().Interface("WorkerTagUpdate", workerTagUpdate).Msg("eventbus: broadcasting new worker tag") + b.broadcast(TopicWorkerTagUpdate, workerTagUpdate) } diff --git a/internal/manager/eventbus/socketio.go b/internal/manager/eventbus/socketio.go new file mode 100644 index 00000000..6b2e7abc --- /dev/null +++ b/internal/manager/eventbus/socketio.go @@ -0,0 +1,168 @@ +package eventbus + +import ( + "fmt" + "reflect" + + gosocketio "github.com/graarh/golang-socketio" + "github.com/graarh/golang-socketio/transport" + "github.com/labstack/echo/v4" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/uuid" + "projects.blender.org/studio/flamenco/pkg/api" +) + +type SocketIOEventType string + +const ( + SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription +) + +var socketIOEventTypes = map[string]string{ + reflect.TypeOf(api.SocketIOJobUpdate{}).Name(): "/jobs", + reflect.TypeOf(api.SocketIOTaskUpdate{}).Name(): "/task", + reflect.TypeOf(api.SocketIOLastRenderedUpdate{}).Name(): "/last-rendered", + reflect.TypeOf(api.SocketIOTaskLogUpdate{}).Name(): "/tasklog", + reflect.TypeOf(api.SocketIOWorkerTagUpdate{}).Name(): "/workertags", + reflect.TypeOf(api.SocketIOWorkerUpdate{}).Name(): "/workers", +} + +// SocketIOForwarder is an event forwarder via SocketIO. +type SocketIOForwarder struct { + sockserv *gosocketio.Server +} + +var _ Forwarder = (*SocketIOForwarder)(nil) + +type Message struct { + Name string `json:"name"` + Text string `json:"text"` +} + +func NewSocketIOForwarder() *SocketIOForwarder { + siof := SocketIOForwarder{ + sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()), + } + siof.registerSIOEventHandlers() + return &siof +} + +func (s *SocketIOForwarder) RegisterHandlers(router *echo.Echo) { + router.Any("/socket.io/", echo.WrapHandler(s.sockserv)) +} + +func (s *SocketIOForwarder) Broadcast(topic EventTopic, payload interface{}) { + // SocketIO has a concept of 'event types'. MQTT doesn't have this, and thus the Flamenco event + // system doesn't rely on it. We use the payload type name as event type. + payloadType := reflect.TypeOf(payload).Name() + eventType := socketIOEventTypes[payloadType] + log.Debug(). + Str("topic", string(topic)). + Str("eventType", eventType). + Interface("payload", payload). + Msg("socketIO: broadcasting message") + s.sockserv.BroadcastTo(string(topic), eventType, payload) +} + +func (s *SocketIOForwarder) registerSIOEventHandlers() { + log.Debug().Msg("initialising SocketIO") + + sio := s.sockserv + // the sio.On() and c.Join() calls only return an error when there is no + // server connected to them, but that's not possible with our setup. + // Errors are explicitly silenced (by assigning to _) to reduce clutter. + + // socket connection + _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Debug().Msg("socketIO: connected") + }) + + // socket disconnection + _ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Debug().Msg("socketIO: disconnected") + }) + + _ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { + logger := sioLogger(c) + logger.Warn().Msg("socketIO: socketio error") + }) + + s.registerRoomEventHandlers() +} + +func sioLogger(c *gosocketio.Channel) zerolog.Logger { + logger := log.With(). + Str("clientID", c.Id()). + Str("remoteAddr", c.Ip()). + Logger() + return logger +} + +func (s *SocketIOForwarder) registerRoomEventHandlers() { + _ = s.sockserv.On(string(SIOEventSubscription), s.handleRoomSubscription) +} + +func (s *SocketIOForwarder) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { + logger := sioLogger(c) + logCtx := logger.With(). + Str("op", string(subs.Op)). + Str("type", string(subs.Type)) + if subs.Uuid != nil { + logCtx = logCtx.Str("uuid", string(*subs.Uuid)) + } + logger = logCtx.Logger() + + if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) { + logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") + return "invalid UUID, ignoring request" + } + + var sioRoom EventTopic + switch subs.Type { + case api.SocketIOSubscriptionTypeAllJobs: + sioRoom = TopicJobUpdate + case api.SocketIOSubscriptionTypeAllWorkers: + sioRoom = TopicWorkerUpdate + case api.SocketIOSubscriptionTypeAllLastRendered: + sioRoom = TopicLastRenderedImage + case api.SocketIOSubscriptionTypeAllWorkerTags: + sioRoom = TopicWorkerTagUpdate + case api.SocketIOSubscriptionTypeJob: + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") + return "operation on job requires a UUID" + } + sioRoom = topicForJob(*subs.Uuid) + case api.SocketIOSubscriptionTypeTasklog: + if subs.Uuid == nil { + logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") + return "operation on task requires a UUID" + } + sioRoom = topicForTaskLog(*subs.Uuid) + default: + logger.Warn().Msg("socketIO: unknown subscription type, ignoring") + return "unknown subscription type, ignoring request" + } + + var err error + switch subs.Op { + case api.SocketIOSubscriptionOperationSubscribe: + err = c.Join(string(sioRoom)) + case api.SocketIOSubscriptionOperationUnsubscribe: + err = c.Leave(string(sioRoom)) + default: + logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") + return "invalid subscription operation, ignoring request" + } + + if err != nil { + logger.Warn().Err(err).Msg("socketIO: performing subscription operation") + return fmt.Sprintf("unable to perform subscription operation: %v", err) + } + + logger.Debug().Msg("socketIO: subscription") + return "ok" +} diff --git a/internal/manager/eventbus/topics.go b/internal/manager/eventbus/topics.go new file mode 100644 index 00000000..96315f39 --- /dev/null +++ b/internal/manager/eventbus/topics.go @@ -0,0 +1,38 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +package eventbus + +import "fmt" + +const ( + // Topics on which events are published. + TopicJobUpdate EventTopic = "/jobs" // sends api.SocketIOJobUpdate + TopicLastRenderedImage EventTopic = "/last-rendered" // sends api.SocketIOLastRenderedUpdate + TopicTaskUpdate EventTopic = "/task" // sends api.SocketIOTaskUpdate + TopicWorkerUpdate EventTopic = "/workers" // sends api.SocketIOWorkerUpdate + TopicWorkerTagUpdate EventTopic = "/workertags" // sends api.SocketIOWorkerTagUpdate + TopicSubscription EventTopic = "/subscription" // clients send api.SocketIOSubscription + + // Parameterised topics. + TopicJobSpecific EventTopic = "/jobs/%s" // %s = job UUID + TopicJobLastRendered EventTopic = "/jobs/%s/last-rendered" // %s = job UUID + TopicTaskLog EventTopic = "/tasklog/%s" // %s = task UUID +) + +// topicForJob will return the event topic for the given job. Clients subscribed +// to this topic receive info scoped to this job, so for example updates to all +// tasks of this job. +func topicForJob(jobUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicJobSpecific), jobUUID)) +} +func topicForJobLastRendered(jobUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicJobLastRendered), jobUUID)) +} + +// topicForTaskLog will return the event topic for receiving task logs of +// the the given task. +// +// Note that general task updates are sent to their job's topic, and not to this +// one. +func topicForTaskLog(taskUUID string) EventTopic { + return EventTopic(fmt.Sprintf(string(TopicTaskLog), taskUUID)) +} diff --git a/internal/manager/job_deleter/interfaces.go b/internal/manager/job_deleter/interfaces.go index ab0f1747..527d71c5 100644 --- a/internal/manager/job_deleter/interfaces.go +++ b/internal/manager/job_deleter/interfaces.go @@ -6,9 +6,9 @@ import ( "context" "time" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/local_storage" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -44,8 +44,8 @@ type ChangeBroadcaster interface { BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) type Shaman interface { // IsEnabled returns whether this Shaman service is enabled or not. diff --git a/internal/manager/job_deleter/job_deleter.go b/internal/manager/job_deleter/job_deleter.go index 95fe7589..15b3eb9c 100644 --- a/internal/manager/job_deleter/job_deleter.go +++ b/internal/manager/job_deleter/job_deleter.go @@ -18,8 +18,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" "projects.blender.org/studio/flamenco/pkg/shaman" ) @@ -76,7 +76,7 @@ func (s *Service) QueueJobDeletion(ctx context.Context, job *persistence.Job) er } // Broadcast that this job was queued for deleted. - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) // Let the Run() goroutine know this job is ready for deletion. @@ -128,7 +128,7 @@ func (s *Service) broadcastAndQueueMassJobDeletion(ctx context.Context, jobUUIDs Msg("job deleter: unable to fetch job to send updates") continue } - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) s.changeBroadcaster.BroadcastJobUpdate(jobUpdate) } } diff --git a/internal/manager/sleep_scheduler/interfaces.go b/internal/manager/sleep_scheduler/interfaces.go index 03eaa85f..de653ad5 100644 --- a/internal/manager/sleep_scheduler/interfaces.go +++ b/internal/manager/sleep_scheduler/interfaces.go @@ -5,8 +5,8 @@ package sleep_scheduler import ( "context" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -33,5 +33,5 @@ type ChangeBroadcaster interface { BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) diff --git a/internal/manager/task_logs/task_logs.go b/internal/manager/task_logs/task_logs.go index ef48e120..bc6e3f4a 100644 --- a/internal/manager/task_logs/task_logs.go +++ b/internal/manager/task_logs/task_logs.go @@ -13,7 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/rs/zerolog" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -47,8 +47,8 @@ type ChangeBroadcaster interface { BroadcastTaskLogUpdate(taskLogUpdate api.SocketIOTaskLogUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) // NewStorage creates a new log storage rooted at `basePath`. func NewStorage( @@ -72,7 +72,7 @@ func (s *Storage) Write(logger zerolog.Logger, jobID, taskID string, logText str } // Broadcast the task log to SocketIO clients. - taskUpdate := webupdates.NewTaskLogUpdate(taskID, logText) + taskUpdate := eventbus.NewTaskLogUpdate(taskID, logText) s.broadcaster.BroadcastTaskLogUpdate(taskUpdate) return nil } diff --git a/internal/manager/task_state_machine/interfaces.go b/internal/manager/task_state_machine/interfaces.go index 28e66d9f..829b0dc2 100644 --- a/internal/manager/task_state_machine/interfaces.go +++ b/internal/manager/task_state_machine/interfaces.go @@ -6,9 +6,9 @@ import ( "context" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_logs" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -49,8 +49,8 @@ type ChangeBroadcaster interface { BroadcastTaskUpdate(jobUpdate api.SocketIOTaskUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) // LogStorage writes to task logs. type LogStorage interface { diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 1e696dd9..38b02659 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -9,8 +9,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -88,7 +88,7 @@ func (sm *StateMachine) taskStatusChangeOnly( } // Broadcast this change to the SocketIO clients. - taskUpdate := webupdates.NewTaskUpdate(task) + taskUpdate := eventbus.NewTaskUpdate(task) taskUpdate.PreviousStatus = &oldTaskStatus sm.broadcaster.BroadcastTaskUpdate(taskUpdate) @@ -331,7 +331,7 @@ func (sm *StateMachine) jobStatusSet(ctx context.Context, } // Broadcast this change to the SocketIO clients. - jobUpdate := webupdates.NewJobUpdate(job) + jobUpdate := eventbus.NewJobUpdate(job) jobUpdate.PreviousStatus = &oldJobStatus jobUpdate.RefreshTasks = result.massTaskUpdate sm.broadcaster.BroadcastJobUpdate(jobUpdate) diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go index 2d2b35b8..ae8a9fd9 100644 --- a/internal/manager/timeout_checker/interfaces.go +++ b/internal/manager/timeout_checker/interfaces.go @@ -7,9 +7,9 @@ import ( "time" "github.com/rs/zerolog" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/internal/manager/persistence" "projects.blender.org/studio/flamenco/internal/manager/task_state_machine" - "projects.blender.org/studio/flamenco/internal/manager/webupdates" "projects.blender.org/studio/flamenco/pkg/api" ) @@ -43,5 +43,5 @@ type ChangeBroadcaster interface { BroadcastWorkerUpdate(workerUpdate api.SocketIOWorkerUpdate) } -// ChangeBroadcaster should be a subset of webupdates.BiDirComms. -var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) +// ChangeBroadcaster should be a subset of eventbus.Broker. +var _ ChangeBroadcaster = (*eventbus.Broker)(nil) diff --git a/internal/manager/webupdates/chat.go b/internal/manager/webupdates/chat.go deleted file mode 100644 index 4723abdb..00000000 --- a/internal/manager/webupdates/chat.go +++ /dev/null @@ -1,17 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import gosocketio "github.com/graarh/golang-socketio" - -func (b *BiDirComms) registerChatEventHandlers() { - _ = b.sockserv.On(string(SIOEventChatMessageRcv), - func(c *gosocketio.Channel, message Message) string { - logger := sioLogger(c) - logger.Info(). - Str("text", message.Text). - Str("name", message.Name). - Msg("socketIO: message received") - b.BroadcastTo(SocketIORoomChat, SIOEventChatMessageSend, message) - return "message sent successfully." - }) -} diff --git a/internal/manager/webupdates/sio_rooms.go b/internal/manager/webupdates/sio_rooms.go deleted file mode 100644 index 2ddba497..00000000 --- a/internal/manager/webupdates/sio_rooms.go +++ /dev/null @@ -1,135 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import ( - "fmt" - - gosocketio "github.com/graarh/golang-socketio" - - "projects.blender.org/studio/flamenco/internal/uuid" - "projects.blender.org/studio/flamenco/pkg/api" -) - -// Separate type aliases for room names and event types; it's otherwise too easy -// to confuse the two. -type ( - SocketIORoomName string - SocketIOEventType string -) - -const ( - // Predefined SocketIO rooms. There will be others, but those will have a - // dynamic name like `job-fa48930a-105c-4125-a7f7-0aa1651dcd57` and cannot be - // listed here as constants. See `roomXXX()` functions for those. - SocketIORoomChat SocketIORoomName = "Chat" // For chat messages. - SocketIORoomJobs SocketIORoomName = "Jobs" // For job updates. - SocketIORoomWorkers SocketIORoomName = "Workers" // For worker updates. - SocketIORoomWorkerTags SocketIORoomName = "WorkerTags" // For worker tag updates. - - // For updates about ALL last-rendered images. Normally these are sent to a - // room specific to a particular job, but for the global "last rendered image" - // all updates are sent here too. - SocketIORoomLastRendered SocketIORoomName = "Last-Rendered" -) - -const ( - // Predefined SocketIO event types. - SIOEventChatMessageRcv SocketIOEventType = "/chat" // clients send chat messages here - SIOEventChatMessageSend SocketIOEventType = "/message" // chat messages are broadcasted here - SIOEventJobUpdate SocketIOEventType = "/jobs" // sends api.SocketIOJobUpdate - SIOEventLastRenderedUpdate SocketIOEventType = "/last-rendered" // sends api.SocketIOLastRenderedUpdate - SIOEventTaskUpdate SocketIOEventType = "/task" // sends api.SocketIOTaskUpdate - SIOEventTaskLogUpdate SocketIOEventType = "/tasklog" // sends api.SocketIOTaskLogUpdate - SIOEventWorkerUpdate SocketIOEventType = "/workers" // sends api.SocketIOWorkerUpdate - SIOEventWorkerTagUpdate SocketIOEventType = "/workertags" // sends api.SocketIOWorkerTagUpdate - SIOEventSubscription SocketIOEventType = "/subscription" // clients send api.SocketIOSubscription -) - -func (b *BiDirComms) BroadcastTo(room SocketIORoomName, eventType SocketIOEventType, payload interface{}) { - b.sockserv.BroadcastTo(string(room), string(eventType), payload) -} - -func (b *BiDirComms) registerRoomEventHandlers() { - _ = b.sockserv.On(string(SIOEventSubscription), b.handleRoomSubscription) -} - -func (b *BiDirComms) handleRoomSubscription(c *gosocketio.Channel, subs api.SocketIOSubscription) string { - logger := sioLogger(c) - logCtx := logger.With(). - Str("op", string(subs.Op)). - Str("type", string(subs.Type)) - if subs.Uuid != nil { - logCtx = logCtx.Str("uuid", string(*subs.Uuid)) - } - logger = logCtx.Logger() - - if subs.Uuid != nil && !uuid.IsValid(*subs.Uuid) { - logger.Warn().Msg("socketIO: invalid UUID, ignoring subscription request") - return "invalid UUID, ignoring request" - } - - var sioRoom SocketIORoomName - switch subs.Type { - case api.SocketIOSubscriptionTypeAllJobs: - sioRoom = SocketIORoomJobs - case api.SocketIOSubscriptionTypeAllWorkers: - sioRoom = SocketIORoomWorkers - case api.SocketIOSubscriptionTypeAllLastRendered: - sioRoom = SocketIORoomLastRendered - case api.SocketIOSubscriptionTypeAllWorkerTags: - sioRoom = SocketIORoomWorkerTags - case api.SocketIOSubscriptionTypeJob: - if subs.Uuid == nil { - logger.Warn().Msg("socketIO: trying to (un)subscribe to job without UUID") - return "operation on job requires a UUID" - } - sioRoom = roomForJob(*subs.Uuid) - case api.SocketIOSubscriptionTypeTasklog: - if subs.Uuid == nil { - logger.Warn().Msg("socketIO: trying to (un)subscribe to task without UUID") - return "operation on task requires a UUID" - } - sioRoom = roomForTaskLog(*subs.Uuid) - default: - logger.Warn().Msg("socketIO: unknown subscription type, ignoring") - return "unknown subscription type, ignoring request" - } - - var err error - switch subs.Op { - case api.SocketIOSubscriptionOperationSubscribe: - err = c.Join(string(sioRoom)) - case api.SocketIOSubscriptionOperationUnsubscribe: - err = c.Leave(string(sioRoom)) - default: - logger.Warn().Msg("socketIO: invalid subscription operation, ignoring") - return "invalid subscription operation, ignoring request" - } - - if err != nil { - logger.Warn().Err(err).Msg("socketIO: performing subscription operation") - return fmt.Sprintf("unable to perform subscription operation: %v", err) - } - - logger.Debug().Msg("socketIO: subscription") - return "ok" -} - -// roomForJob will return the SocketIO room name for the given job. Clients in -// this room will receive info scoped to this job, so for example updates to all -// tasks of this job. -// -// Note that `api.SocketIOJobUpdate`s themselves are sent to all SocketIO clients, and -// not to this room. -func roomForJob(jobUUID string) SocketIORoomName { - return SocketIORoomName("job-" + jobUUID) -} - -// roomForTaskLog will return the SocketIO room name for receiving task logs of -// the the given task. -// -// Note that general task updates (`api.SIOEventTaskUpdate`) are sent to their -// job's room, and not to this room. -func roomForTaskLog(taskUUID string) SocketIORoomName { - return SocketIORoomName("tasklog-" + taskUUID) -} diff --git a/internal/manager/webupdates/webupdates.go b/internal/manager/webupdates/webupdates.go deleted file mode 100644 index 6b31b4d8..00000000 --- a/internal/manager/webupdates/webupdates.go +++ /dev/null @@ -1,70 +0,0 @@ -// package webupdates uses SocketIO to send updates to a web client. -// SPDX-License-Identifier: GPL-3.0-or-later -package webupdates - -import ( - gosocketio "github.com/graarh/golang-socketio" - "github.com/graarh/golang-socketio/transport" - "github.com/labstack/echo/v4" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -type BiDirComms struct { - sockserv *gosocketio.Server -} - -type Message struct { - Name string `json:"name"` - Text string `json:"text"` -} - -func New() *BiDirComms { - bdc := BiDirComms{ - sockserv: gosocketio.NewServer(transport.GetDefaultWebsocketTransport()), - } - bdc.registerSIOEventHandlers() - return &bdc -} - -func (b *BiDirComms) RegisterHandlers(router *echo.Echo) { - router.Any("/socket.io/", echo.WrapHandler(b.sockserv)) -} - -func (b *BiDirComms) registerSIOEventHandlers() { - log.Debug().Msg("initialising SocketIO") - - sio := b.sockserv - // the sio.On() and c.Join() calls only return an error when there is no - // server connected to them, but that's not possible with our setup. - // Errors are explicitly silenced (by assigning to _) to reduce clutter. - - // socket connection - _ = sio.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Debug().Msg("socketIO: connected") - _ = c.Join(string(SocketIORoomChat)) // All clients connect to the chat room. - }) - - // socket disconnection - _ = sio.On(gosocketio.OnDisconnection, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Debug().Msg("socketIO: disconnected") - }) - - _ = sio.On(gosocketio.OnError, func(c *gosocketio.Channel) { - logger := sioLogger(c) - logger.Warn().Msg("socketIO: socketio error") - }) - - b.registerChatEventHandlers() - b.registerRoomEventHandlers() -} - -func sioLogger(c *gosocketio.Channel) zerolog.Logger { - logger := log.With(). - Str("clientID", c.Id()). - Str("remoteAddr", c.Ip()). - Logger() - return logger -}