diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cc22495..677f6454 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ This file contains the history of changes to Flamenco. Only changes that might be interesting for users are listed here, such as new features and fixes for bugs in actually-released versions. +## 3.5 - in development + +- Add MQTT support. Flamenco Manager can now send internal events to an MQTT broker. + ## 3.4 - released 2024-01-12 - Fix [#104263: Error performing BAT pack in Windows with shared storage](https://projects.blender.org/studio/flamenco/issues/104263). diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index f3bceeab..e341f3dc 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -156,6 +156,10 @@ func runFlamencoManager() bool { eventBroker := eventbus.NewBroker() socketio := eventbus.NewSocketIOForwarder() eventBroker.AddForwarder(socketio) + mqttClient := eventbus.NewMQTTForwarder(configService.Get().MQTT.Client) + if mqttClient != nil { + eventBroker.AddForwarder(mqttClient) + } localStorage := local_storage.NewNextToExe(configService.Get().LocalManagerStoragePath) logStorage := task_logs.NewStorage(localStorage, timeService, eventBroker) @@ -186,6 +190,10 @@ func runFlamencoManager() bool { installSignalHandler(mainCtxCancel) + if mqttClient != nil { + mqttClient.Connect(mainCtx) + } + // Before doing anything new, clean up in case we made a mess in an earlier run. taskStateMachine.CheckStuck(mainCtx) diff --git a/go.mod b/go.mod index e80ee83c..1b194c7d 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/disintegration/imaging v1.6.2 github.com/dop251/goja v0.0.0-20230812105242-81d76064690d github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d + github.com/eclipse/paho.golang v0.12.0 github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e github.com/gertd/go-pluralize v0.2.1 github.com/getkin/kin-openapi v0.88.0 @@ -24,7 +25,7 @@ require ( github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 github.com/pressly/goose/v3 v3.15.1 github.com/rs/zerolog v1.26.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.8.4 github.com/zcalusic/sysinfo v1.0.1 github.com/ziflex/lecho/v3 v3.1.0 golang.org/x/crypto v0.16.0 @@ -47,7 +48,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/labstack/gommon v0.4.0 // indirect @@ -58,6 +59,7 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect golang.org/x/mod v0.14.0 // indirect + golang.org/x/sync v0.5.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect golang.org/x/tools v0.16.1 // indirect diff --git a/go.sum b/go.sum index 7de8e931..6e1cea3c 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d h1:W1n4DvpzZGOI github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q= +github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE= github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e h1:cG4ivpkHpkmWTaaLrgekDVR0xAr87V697T2c+WnUdiY= github.com/fromkeith/gossdp v0.0.0-20180102154144-1b2c43f6886e/go.mod h1:7xQpS/YtlWo38XfIqje9GgtlPuBRatYcL23GlYBtgWM= github.com/gertd/go-pluralize v0.2.1 h1:M3uASbVjMnTsPb0PNqg+E/24Vwigyo/tvyMTtAlLgiA= @@ -72,6 +74,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC5IXdEA547bcoU5dozgQAfc8Onsg4= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U= github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg= @@ -81,8 +85,8 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graarh/golang-socketio v0.0.0-20170510162725-2c44953b9b5f h1:utzdm9zUvVWGRtIpkdE4+36n+Gv60kNb7mFvgGxLElY= github.com/graarh/golang-socketio v0.0.0-20170510162725-2c44953b9b5f/go.mod h1:8gudiNCFh3ZfvInknmoXzPeV17FSH+X2J5k2cUPIwnA= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= @@ -162,8 +166,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= @@ -181,6 +186,8 @@ github.com/zcalusic/sysinfo v1.0.1 h1:cVh8q3codjh43AGRTa54dJ2Zq+qPejv8n2VWpxKViw github.com/zcalusic/sysinfo v1.0.1/go.mod h1:LxwKwtQdbTIQc65drhjQzYzt0o7jfB80LrrZm7SWn8o= github.com/ziflex/lecho/v3 v3.1.0 h1:65bSzSc0yw7EEhi44lMnkOI877ZzbE7tGDWfYCQXZwI= github.com/ziflex/lecho/v3 v3.1.0/go.mod h1:dwQ6xCAKmSBHhwZ6XmiAiDptD7iklVkW7xQYGUncX0Q= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -220,6 +227,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index 31e7a7dd..386d0b98 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -21,6 +21,7 @@ import ( yaml "gopkg.in/yaml.v2" "projects.blender.org/studio/flamenco/internal/appinfo" + "projects.blender.org/studio/flamenco/internal/manager/eventbus" "projects.blender.org/studio/flamenco/pkg/crosspath" shaman_config "projects.blender.org/studio/flamenco/pkg/shaman/config" ) @@ -101,6 +102,8 @@ type Base struct { // When this many workers have tried the task and failed, it will be hard-failed // (even when there are workers left that could technically retry the task). TaskFailAfterSoftFailCount int `yaml:"task_fail_after_softfail_count"` + + MQTT MQTTConfig `yaml:"mqtt"` } // GarbageCollect contains the config options for the GC. @@ -117,6 +120,11 @@ type ShamanGarbageCollect struct { SilentlyDisable bool `yaml:"-"` } +// MQTTConfig contains the configuration options for MQTT broker (idea for the future) and client. +type MQTTConfig struct { + Client eventbus.MQTTClientConfig `yaml:"client"` +} + // Conf is the latest version of the configuration. // Currently it is version 3. type Conf struct { diff --git a/internal/manager/eventbus/mqtt_client.go b/internal/manager/eventbus/mqtt_client.go new file mode 100644 index 00000000..2839ba99 --- /dev/null +++ b/internal/manager/eventbus/mqtt_client.go @@ -0,0 +1,153 @@ +package eventbus + +import ( + "context" + "encoding/json" + "net/url" + "strings" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/rs/zerolog/log" +) + +const ( + defaultClientID = "flamenco" + keepAlive = 30 // seconds + connectRetryDelay = 10 * time.Second + + mqttQoS = 1 +) + +type MQTTForwarder struct { + config autopaho.ClientConfig + conn *autopaho.ConnectionManager + topicPrefix string + + // Context to use when publishing messages. + ctx context.Context +} + +var _ Forwarder = (*MQTTForwarder)(nil) + +// MQTTClientConfig contains the MQTT client configuration. +type MQTTClientConfig struct { + BrokerURL string `yaml:"broker"` + ClientID string `yaml:"clientID"` + TopicPrefix string `yaml:"topic_prefix"` + + Username string `yaml:"username"` + Password string `yaml:"password"` +} + +func NewMQTTForwarder(config MQTTClientConfig) *MQTTForwarder { + config.BrokerURL = strings.TrimSpace(config.BrokerURL) + config.ClientID = strings.TrimSpace(config.ClientID) + + if config.BrokerURL == "" { + return nil + } + if config.ClientID == "" { + config.ClientID = defaultClientID + } + + serverURL, err := url.Parse(config.BrokerURL) + if err != nil { + log.Error(). + Err(err). + Str("mqttServerURL", config.BrokerURL). + Msg("could not parse MQTT server URL, skipping creation of MQTT client") + return nil + } + + client := MQTTForwarder{ + topicPrefix: config.TopicPrefix, + } + client.config = autopaho.ClientConfig{ + BrokerUrls: []*url.URL{serverURL}, + KeepAlive: keepAlive, + ConnectRetryDelay: connectRetryDelay, + OnConnectionUp: client.onConnectionUp, + OnConnectError: client.onConnectionError, + Debug: paho.NOOPLogger{}, + ClientConfig: paho.ClientConfig{ + ClientID: config.ClientID, + OnClientError: client.onClientError, + OnServerDisconnect: client.onServerDisconnect, + }, + } + client.config.SetUsernamePassword(config.Username, []byte(config.Password)) + return &client +} + +func (m *MQTTForwarder) Connect(ctx context.Context) { + log.Debug().Msg("mqtt client: connecting") + conn, err := autopaho.NewConnection(ctx, m.config) + if err != nil { + panic(err) + } + + m.conn = conn + m.ctx = ctx +} + +func (m *MQTTForwarder) onConnectionUp(connMgr *autopaho.ConnectionManager, connAck *paho.Connack) { + log.Info().Msg("mqtt client: connection established") +} + +func (m *MQTTForwarder) onConnectionError(err error) { + log.Warn().AnErr("cause", err).Msg("mqtt client: connection error") +} + +func (m *MQTTForwarder) onClientError(err error) { + log.Warn().AnErr("cause", err).Msg("mqtt client: server requested disconnect") +} + +func (m *MQTTForwarder) onServerDisconnect(d *paho.Disconnect) { + logEntry := log.Warn() + if d.Properties != nil { + logEntry = logEntry.Str("reason", d.Properties.ReasonString) + } else { + logEntry = logEntry.Int("reasonCode", int(d.ReasonCode)) + } + logEntry.Msg("mqtt client: server requested disconnect") +} + +func (c *MQTTForwarder) Broadcast(topic EventTopic, payload interface{}) { + fullTopic := c.topicPrefix + string(topic) + + logger := log.With(). + Str("topic", fullTopic). + Interface("event", payload). + Logger() + + asJSON, err := json.Marshal(payload) + if err != nil { + logger.Error().AnErr("cause", err).Interface("event", payload). + Msg("mqtt client: could not convert event to JSON") + return + } + + // Publish will block so we run it in a GoRoutine. + // TODO: might be a good idea todo this at the event broker level, rather than in this function. + go func(topic string, msg []byte) { + pr, err := c.conn.Publish(c.ctx, &paho.Publish{ + QoS: mqttQoS, + Topic: topic, + Payload: msg, + }) + switch { + case err != nil: + logger.Error().AnErr("cause", err).Msg("mqtt client: error publishing event") + return + case pr.ReasonCode == 16: + logger.Debug().Msg("mqtt client: event sent to server, but there were no subscribers") + return + case pr.ReasonCode != 0: + logger.Warn().Int("reasonCode", int(pr.ReasonCode)).Msg("mqtt client: event rejected by mqtt server") + default: + logger.Debug().Msg("mqtt client: event sent to server") + } + }(fullTopic, asJSON) +}