Files

95 lines
2.1 KiB
Go

package bridge
import (
"errors"
"log/slog"
"sync/atomic"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"gitea.berezovskyi.dev/oleksandr/ezcoo-usb-control/pkg/ezcoo"
)
var ErrShutdown = errors.New("bridge: shutdown requested")
type Device interface {
SetOutput(out, in int)
GetStatus() ezcoo.State
}
type Bridge struct {
cfg Config
client mqtt.Client
dev Device
done <-chan struct{}
available atomic.Bool
}
func New(cfg Config, dev Device, done <-chan struct{}) *Bridge {
b := &Bridge{cfg: cfg, dev: dev, done: done}
opts := mqtt.NewClientOptions().
AddBroker(cfg.Broker).
SetClientID(cfg.ClientID).
SetUsername(cfg.Username).
SetPassword(cfg.Password).
SetAutoReconnect(true).
SetWill(b.availTopic(), "offline", 1, true).
SetOnConnectHandler(func(c mqtt.Client) {
slog.Info("MQTT connected")
b.onConnect(c)
}).
SetConnectionLostHandler(func(_ mqtt.Client, err error) {
slog.Warn("MQTT connection lost", "err", err)
})
b.client = mqtt.NewClient(opts)
return b
}
func (b *Bridge) OnState(s ezcoo.State) {
if !b.client.IsConnected() {
return
}
for out := 1; out <= numOutputs; out++ {
in := s[out-1]
if in >= 1 && in <= numInputs {
b.client.Publish(b.stateTopic(out), 1, true, inputLabel(in))
slog.Debug("state update", "output", out, "input", in)
}
}
}
func (b *Bridge) SetAvailable(online bool) {
b.available.Store(online)
if b.client.IsConnected() {
b.client.Publish(b.availTopic(), 1, true, b.availPayload())
slog.Info("availability", "online", online)
}
}
func (b *Bridge) Connect() error {
for {
tok := b.client.Connect()
if tok.WaitTimeout(5 * time.Second) {
if tok.Error() == nil {
return nil
}
slog.Warn("MQTT connect failed, retrying in 5s", "err", tok.Error())
} else {
slog.Warn("MQTT connect timed out, retrying in 5s")
}
select {
case <-b.done:
return ErrShutdown
case <-time.After(5 * time.Second):
}
}
}
func (b *Bridge) Run() {
<-b.done
tok := b.client.Publish(b.availTopic(), 1, true, "offline")
tok.WaitTimeout(2 * time.Second)
b.client.Disconnect(500)
}