package bridge import ( "errors" "log/slog" "sync/atomic" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "gitea.berezovskyi.dev/oleksandr/ezcoo-usb-control/pkg/ezcoo" ) var ErrShutdown = errors.New("bridge: shutdown requested") type Device interface { SetOutput(out, in int) GetStatus() ezcoo.State } type Bridge struct { cfg Config client mqtt.Client dev Device done <-chan struct{} available atomic.Bool } func New(cfg Config, dev Device, done <-chan struct{}) *Bridge { b := &Bridge{cfg: cfg, dev: dev, done: done} opts := mqtt.NewClientOptions(). AddBroker(cfg.Broker). SetClientID(cfg.ClientID). SetUsername(cfg.Username). SetPassword(cfg.Password). SetAutoReconnect(true). SetWill(b.availTopic(), "offline", 1, true). SetOnConnectHandler(func(c mqtt.Client) { slog.Info("MQTT connected") b.onConnect(c) }). SetConnectionLostHandler(func(_ mqtt.Client, err error) { slog.Warn("MQTT connection lost", "err", err) }) b.client = mqtt.NewClient(opts) return b } func (b *Bridge) OnState(s ezcoo.State) { if !b.client.IsConnected() { return } for out := 1; out <= numOutputs; out++ { in := s[out-1] if in >= 1 && in <= numInputs { b.client.Publish(b.stateTopic(out), 1, true, inputLabel(in)) slog.Debug("state update", "output", out, "input", in) } } } func (b *Bridge) SetAvailable(online bool) { b.available.Store(online) if b.client.IsConnected() { b.client.Publish(b.availTopic(), 1, true, b.availPayload()) slog.Info("availability", "online", online) } } func (b *Bridge) Connect() error { for { tok := b.client.Connect() if tok.WaitTimeout(5 * time.Second) { if tok.Error() == nil { return nil } slog.Warn("MQTT connect failed, retrying in 5s", "err", tok.Error()) } else { slog.Warn("MQTT connect timed out, retrying in 5s") } select { case <-b.done: return ErrShutdown case <-time.After(5 * time.Second): } } } func (b *Bridge) Run() { <-b.done tok := b.client.Publish(b.availTopic(), 1, true, "offline") tok.WaitTimeout(2 * time.Second) b.client.Disconnect(500) }