95 lines
2.1 KiB
Go
95 lines
2.1 KiB
Go
package bridge
|
|
|
|
import (
|
|
"errors"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
"gitea.berezovskyi.dev/oleksandr/ezcoo-usb-control/pkg/ezcoo"
|
|
)
|
|
|
|
var ErrShutdown = errors.New("bridge: shutdown requested")
|
|
|
|
type Device interface {
|
|
SetOutput(out, in int)
|
|
GetStatus() ezcoo.State
|
|
}
|
|
|
|
type Bridge struct {
|
|
cfg Config
|
|
client mqtt.Client
|
|
dev Device
|
|
done <-chan struct{}
|
|
available atomic.Bool
|
|
}
|
|
|
|
func New(cfg Config, dev Device, done <-chan struct{}) *Bridge {
|
|
b := &Bridge{cfg: cfg, dev: dev, done: done}
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(cfg.Broker).
|
|
SetClientID(cfg.ClientID).
|
|
SetUsername(cfg.Username).
|
|
SetPassword(cfg.Password).
|
|
SetAutoReconnect(true).
|
|
SetWill(b.availTopic(), "offline", 1, true).
|
|
SetOnConnectHandler(func(c mqtt.Client) {
|
|
slog.Info("MQTT connected")
|
|
b.onConnect(c)
|
|
}).
|
|
SetConnectionLostHandler(func(_ mqtt.Client, err error) {
|
|
slog.Warn("MQTT connection lost", "err", err)
|
|
})
|
|
b.client = mqtt.NewClient(opts)
|
|
return b
|
|
}
|
|
|
|
func (b *Bridge) OnState(s ezcoo.State) {
|
|
if !b.client.IsConnected() {
|
|
return
|
|
}
|
|
for out := 1; out <= numOutputs; out++ {
|
|
in := s[out-1]
|
|
if in >= 1 && in <= numInputs {
|
|
b.client.Publish(b.stateTopic(out), 1, true, inputLabel(in))
|
|
slog.Debug("state update", "output", out, "input", in)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Bridge) SetAvailable(online bool) {
|
|
b.available.Store(online)
|
|
if b.client.IsConnected() {
|
|
b.client.Publish(b.availTopic(), 1, true, b.availPayload())
|
|
slog.Info("availability", "online", online)
|
|
}
|
|
}
|
|
|
|
func (b *Bridge) Connect() error {
|
|
for {
|
|
tok := b.client.Connect()
|
|
if tok.WaitTimeout(5 * time.Second) {
|
|
if tok.Error() == nil {
|
|
return nil
|
|
}
|
|
slog.Warn("MQTT connect failed, retrying in 5s", "err", tok.Error())
|
|
} else {
|
|
slog.Warn("MQTT connect timed out, retrying in 5s")
|
|
}
|
|
select {
|
|
case <-b.done:
|
|
return ErrShutdown
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Bridge) Run() {
|
|
<-b.done
|
|
tok := b.client.Publish(b.availTopic(), 1, true, "offline")
|
|
tok.WaitTimeout(2 * time.Second)
|
|
b.client.Disconnect(500)
|
|
}
|