From c3837376f5cb69d3b090bebca6468e9c2101ca98 Mon Sep 17 00:00:00 2001 From: Oleksandr Berezovskyi Date: Sun, 24 May 2026 18:52:59 +0300 Subject: [PATCH] feat(bridge): MQTT bridge with Home Assistant auto-discovery --- pkg/bridge/config.go | 42 ++++++++++++++++++ pkg/bridge/const.go | 6 +++ pkg/bridge/discovery.go | 63 +++++++++++++++++++++++++++ pkg/bridge/handlers.go | 35 +++++++++++++++ pkg/bridge/helpers.go | 20 +++++++++ pkg/bridge/types.go | 94 +++++++++++++++++++++++++++++++++++++++++ pkg/bridge/utils.go | 22 ++++++++++ 7 files changed, 282 insertions(+) create mode 100644 pkg/bridge/config.go create mode 100644 pkg/bridge/const.go create mode 100644 pkg/bridge/discovery.go create mode 100644 pkg/bridge/handlers.go create mode 100644 pkg/bridge/helpers.go create mode 100644 pkg/bridge/types.go create mode 100644 pkg/bridge/utils.go diff --git a/pkg/bridge/config.go b/pkg/bridge/config.go new file mode 100644 index 0000000..5dbb0ca --- /dev/null +++ b/pkg/bridge/config.go @@ -0,0 +1,42 @@ +package bridge + +import ( + "errors" + "strings" +) + +type Config struct { + Broker string `yaml:"broker"` + Username string `yaml:"username"` + Password string `yaml:"password"` + ClientID string `yaml:"client_id"` + BaseTopic string `yaml:"base_topic"` + DiscoveryPrefix string `yaml:"discovery_prefix"` +} + +func (c *Config) SetDefaults() { + if c.Broker == "" { + c.Broker = "tcp://localhost:1883" + } + if c.ClientID == "" { + c.ClientID = "ezcoo-usb-control" + } + if c.BaseTopic == "" { + c.BaseTopic = "ezcoo" + } + if c.DiscoveryPrefix == "" { + c.DiscoveryPrefix = "homeassistant" + } + c.BaseTopic = strings.TrimRight(c.BaseTopic, "/") + c.DiscoveryPrefix = strings.TrimRight(c.DiscoveryPrefix, "/") +} + +func (c *Config) Validate() error { + if c.Broker == "" { + return errors.New("mqtt.broker is required") + } + if c.ClientID == "" { + return errors.New("mqtt.client_id is required") + } + return nil +} diff --git a/pkg/bridge/const.go b/pkg/bridge/const.go new file mode 100644 index 0000000..479f96d --- /dev/null +++ b/pkg/bridge/const.go @@ -0,0 +1,6 @@ +package bridge + +const ( + numOutputs = 2 + numInputs = 4 +) diff --git a/pkg/bridge/discovery.go b/pkg/bridge/discovery.go new file mode 100644 index 0000000..fc9bd41 --- /dev/null +++ b/pkg/bridge/discovery.go @@ -0,0 +1,63 @@ +package bridge + +import ( + "encoding/json" + "fmt" + "log/slog" +) + +func (b *Bridge) publishDiscovery() { + type component struct { + Platform string `json:"p"` + Name string `json:"name"` + UniqueID string `json:"unique_id"` + CommandTopic string `json:"command_topic"` + StateTopic string `json:"state_topic"` + Options []string `json:"options"` + } + type payload struct { + Device struct { + Identifiers []string `json:"identifiers"` + Name string `json:"name"` + Manufacturer string `json:"manufacturer"` + Model string `json:"model"` + } `json:"device"` + Origin map[string]string `json:"origin"` + Availability []map[string]string `json:"availability"` + Components map[string]component `json:"components"` + } + + options := make([]string, numInputs) + for i := range options { + options[i] = inputLabel(i + 1) + } + + p := payload{ + Origin: map[string]string{"name": "ezcoo-usb-control"}, + Availability: []map[string]string{{"topic": b.availTopic()}}, + Components: make(map[string]component, numOutputs), + } + p.Device.Identifiers = []string{"ezcoo_mx42has_arc"} + p.Device.Name = "EZCOO HDMI Matrix" + p.Device.Manufacturer = "EZCOO" + p.Device.Model = "EZ-MX42HAS-ARC" + + for out := 1; out <= numOutputs; out++ { + p.Components[fmt.Sprintf("out%d", out)] = component{ + Platform: "select", + Name: fmt.Sprintf("Output %d", out), + UniqueID: fmt.Sprintf("ezcoo_mx42_out%d", out), + CommandTopic: b.cmdTopic(out), + StateTopic: b.stateTopic(out), + Options: options, + } + } + + data, err := json.Marshal(p) + if err != nil { + slog.Error("marshal discovery payload", "err", err) + return + } + b.client.Publish(b.discoveryTopic(), 1, true, data) + slog.Info("published HA discovery") +} diff --git a/pkg/bridge/handlers.go b/pkg/bridge/handlers.go new file mode 100644 index 0000000..528e392 --- /dev/null +++ b/pkg/bridge/handlers.go @@ -0,0 +1,35 @@ +package bridge + +import ( + "log/slog" + "strings" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func (b *Bridge) onConnect(c mqtt.Client) { + c.Publish(b.availTopic(), 1, true, b.availPayload()) + + c.Subscribe(b.cfg.DiscoveryPrefix+"/status", 1, func(_ mqtt.Client, msg mqtt.Message) { + if string(msg.Payload()) == "online" { + b.publishDiscovery() + b.OnState(b.dev.GetStatus()) + } + }) + + for out := 1; out <= numOutputs; out++ { + out := out + c.Subscribe(b.cmdTopic(out), 1, func(_ mqtt.Client, msg mqtt.Message) { + payload := strings.TrimSpace(string(msg.Payload())) + in := inputNumberFromLabel(payload) + if in == 0 { + slog.Warn("unknown input label", "payload", payload) + return + } + b.dev.SetOutput(out, in) + c.Publish(b.stateTopic(out), 1, true, inputLabel(in)) + }) + } + + b.publishDiscovery() +} diff --git a/pkg/bridge/helpers.go b/pkg/bridge/helpers.go new file mode 100644 index 0000000..972ea64 --- /dev/null +++ b/pkg/bridge/helpers.go @@ -0,0 +1,20 @@ +package bridge + +import "fmt" + +func (b *Bridge) availTopic() string { return b.cfg.BaseTopic + "/availability" } +func (b *Bridge) stateTopic(out int) string { + return fmt.Sprintf("%s/output%d/state", b.cfg.BaseTopic, out) +} +func (b *Bridge) cmdTopic(out int) string { + return fmt.Sprintf("%s/output%d/set", b.cfg.BaseTopic, out) +} +func (b *Bridge) discoveryTopic() string { + return fmt.Sprintf("%s/device/ezcoo_matrix/config", b.cfg.DiscoveryPrefix) +} +func (b *Bridge) availPayload() string { + if b.available.Load() { + return "online" + } + return "offline" +} diff --git a/pkg/bridge/types.go b/pkg/bridge/types.go new file mode 100644 index 0000000..988063f --- /dev/null +++ b/pkg/bridge/types.go @@ -0,0 +1,94 @@ +package bridge + +import ( + "errors" + "log/slog" + "sync/atomic" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + + "gitea.berezovskyi.dev/oleksandr/ezcoo-usb-control/pkg/ezcoo" +) + +var ErrShutdown = errors.New("bridge: shutdown requested") + +type Device interface { + SetOutput(out, in int) + GetStatus() ezcoo.State +} + +type Bridge struct { + cfg Config + client mqtt.Client + dev Device + done <-chan struct{} + available atomic.Bool +} + +func New(cfg Config, dev Device, done <-chan struct{}) *Bridge { + b := &Bridge{cfg: cfg, dev: dev, done: done} + opts := mqtt.NewClientOptions(). + AddBroker(cfg.Broker). + SetClientID(cfg.ClientID). + SetUsername(cfg.Username). + SetPassword(cfg.Password). + SetAutoReconnect(true). + SetWill(b.availTopic(), "offline", 1, true). + SetOnConnectHandler(func(c mqtt.Client) { + slog.Info("MQTT connected") + b.onConnect(c) + }). + SetConnectionLostHandler(func(_ mqtt.Client, err error) { + slog.Warn("MQTT connection lost", "err", err) + }) + b.client = mqtt.NewClient(opts) + return b +} + +func (b *Bridge) OnState(s ezcoo.State) { + if !b.client.IsConnected() { + return + } + for out := 1; out <= numOutputs; out++ { + in := s[out-1] + if in >= 1 && in <= numInputs { + b.client.Publish(b.stateTopic(out), 1, true, inputLabel(in)) + slog.Debug("state update", "output", out, "input", in) + } + } +} + +func (b *Bridge) SetAvailable(online bool) { + b.available.Store(online) + if b.client.IsConnected() { + b.client.Publish(b.availTopic(), 1, true, b.availPayload()) + slog.Info("availability", "online", online) + } +} + +func (b *Bridge) Connect() error { + for { + tok := b.client.Connect() + if tok.WaitTimeout(5 * time.Second) { + if tok.Error() == nil { + return nil + } + slog.Warn("MQTT connect failed, retrying in 5s", "err", tok.Error()) + } else { + slog.Warn("MQTT connect timed out, retrying in 5s") + } + select { + case <-b.done: + return ErrShutdown + case <-time.After(5 * time.Second): + } + } +} + +func (b *Bridge) Run() { + <-b.done + tok := b.client.Publish(b.availTopic(), 1, true, "offline") + tok.WaitTimeout(2 * time.Second) + b.client.Disconnect(500) +} diff --git a/pkg/bridge/utils.go b/pkg/bridge/utils.go new file mode 100644 index 0000000..0c61925 --- /dev/null +++ b/pkg/bridge/utils.go @@ -0,0 +1,22 @@ +package bridge + +import ( + "fmt" + "strings" +) + +func inputLabel(in int) string { return fmt.Sprintf("IN%d", in) } + +func inputNumberFromLabel(label string) int { + label = strings.ToUpper(strings.TrimSpace(label)) + for i := 1; i <= numInputs; i++ { + if label == inputLabel(i) { + return i + } + } + var n int + if _, err := fmt.Sscanf(label, "%d", &n); err == nil && n >= 1 && n <= numInputs { + return n + } + return 0 +}