Files
cmr-mini/realtime-gateway/cmd/mock-consumer/main.go

355 lines
9.9 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"realtime-gateway/internal/model"
)
type consumerConfig struct {
URL string `json:"url"`
Token string `json:"token"`
ChannelID string `json:"channelId"`
DeviceID string `json:"deviceId"`
GroupID string `json:"groupId"`
Topic string `json:"topic"`
Topics []string `json:"topics"`
Subscriptions []model.Subscription `json:"subscriptions"`
Snapshot *bool `json:"snapshot"`
}
func main() {
configPath := findConfigPath(os.Args[1:])
fileConfig, err := loadConsumerConfig(configPath)
if err != nil {
log.Fatalf("load config: %v", err)
}
flag.StringVar(&configPath, "config", configPath, "path to consumer config file")
url := flag.String("url", valueOr(fileConfig.URL, "ws://127.0.0.1:8080/ws"), "gateway websocket url")
token := flag.String("token", fileConfig.Token, "consumer token, leave empty if anonymous consumers are allowed")
channelID := flag.String("channel-id", fileConfig.ChannelID, "channel id to join before subscribe")
deviceID := flag.String("device-id", valueOr(fileConfig.DeviceID, "child-001"), "device id to subscribe")
groupID := flag.String("group-id", fileConfig.GroupID, "group id to subscribe")
topic := flag.String("topic", valueOr(fileConfig.Topic, "telemetry.location"), "single topic to subscribe")
topics := flag.String("topics", strings.Join(fileConfig.Topics, ","), "comma-separated topics to subscribe, overrides -topic when set")
snapshot := flag.Bool("snapshot", boolValue(fileConfig.Snapshot, true), "request latest snapshot after subscribe")
flag.Parse()
subscriptions := resolveSubscriptions(fileConfig, *deviceID, *groupID, *topic, *topics)
if len(subscriptions) == 0 {
log.Fatalf("no subscriptions configured")
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
conn, _, err := websocket.Dial(ctx, *url, nil)
if err != nil {
log.Fatalf("dial gateway: %v", err)
}
defer conn.Close(websocket.StatusNormalClosure, "consumer closed")
var welcome model.ServerMessage
if err := wsjson.Read(ctx, conn, &welcome); err != nil {
log.Fatalf("read welcome: %v", err)
}
log.Printf("connected: session=%s", welcome.SessionID)
if *token != "" || *channelID != "" {
authReq := model.ClientMessage{
Type: "authenticate",
Role: model.RoleConsumer,
Token: *token,
}
if *channelID != "" {
authReq.Type = "join_channel"
authReq.ChannelID = *channelID
}
if err := wsjson.Write(ctx, conn, authReq); err != nil {
log.Fatalf("send authenticate: %v", err)
}
var authResp model.ServerMessage
if err := wsjson.Read(ctx, conn, &authResp); err != nil {
log.Fatalf("read authenticate response: %v", err)
}
if authResp.Type == "error" {
log.Fatalf("authenticate failed: %s", authResp.Error)
}
log.Printf("authenticated: session=%s", authResp.SessionID)
}
subReq := model.ClientMessage{
Type: "subscribe",
Subscriptions: subscriptions,
}
if err := wsjson.Write(ctx, conn, subReq); err != nil {
log.Fatalf("send subscribe: %v", err)
}
var subResp model.ServerMessage
if err := wsjson.Read(ctx, conn, &subResp); err != nil {
log.Fatalf("read subscribe response: %v", err)
}
if subResp.Type == "error" {
log.Fatalf("subscribe failed: %s", subResp.Error)
}
log.Printf("subscribed: %s", describeSubscriptions(subscriptions))
if *snapshot {
req := model.ClientMessage{
Type: "snapshot",
Subscriptions: []model.Subscription{
{DeviceID: firstSnapshotDeviceID(subscriptions, *deviceID)},
},
}
if err := wsjson.Write(ctx, conn, req); err != nil {
log.Fatalf("send snapshot: %v", err)
}
var resp model.ServerMessage
if err := wsjson.Read(ctx, conn, &resp); err != nil {
log.Fatalf("read snapshot response: %v", err)
}
if resp.Type == "snapshot" {
log.Printf("snapshot: %s", compactJSON(resp.State))
} else if resp.Type == "error" {
log.Printf("snapshot unavailable: %s", resp.Error)
}
}
for {
var message model.ServerMessage
if err := wsjson.Read(ctx, conn, &message); err != nil {
log.Fatalf("read event: %v", err)
}
switch message.Type {
case "event":
log.Printf("event: %s", describeEnvelope(message.Envelope))
case "error":
log.Printf("server error: %s", message.Error)
default:
log.Printf("message: type=%s", message.Type)
}
}
}
func loadConsumerConfig(path string) (consumerConfig, error) {
if strings.TrimSpace(path) == "" {
return consumerConfig{}, nil
}
data, err := os.ReadFile(path)
if err != nil {
return consumerConfig{}, fmt.Errorf("read %s: %w", path, err)
}
var cfg consumerConfig
if err := json.Unmarshal(data, &cfg); err != nil {
return consumerConfig{}, fmt.Errorf("parse %s: %w", path, err)
}
return cfg, nil
}
func findConfigPath(args []string) string {
for index := 0; index < len(args); index++ {
arg := args[index]
if strings.HasPrefix(arg, "-config=") {
return strings.TrimSpace(strings.TrimPrefix(arg, "-config="))
}
if arg == "-config" && index+1 < len(args) {
return strings.TrimSpace(args[index+1])
}
if strings.HasPrefix(arg, "--config=") {
return strings.TrimSpace(strings.TrimPrefix(arg, "--config="))
}
if arg == "--config" && index+1 < len(args) {
return strings.TrimSpace(args[index+1])
}
}
return ""
}
func valueOr(value string, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}
func boolValue(value *bool, fallback bool) bool {
if value == nil {
return fallback
}
return *value
}
func resolveSubscriptions(fileConfig consumerConfig, deviceID string, groupID string, topic string, topicsCSV string) []model.Subscription {
if len(fileConfig.Subscriptions) > 0 && strings.TrimSpace(topicsCSV) == "" && strings.TrimSpace(topic) == valueOr(fileConfig.Topic, "telemetry.location") {
return filterValidSubscriptions(fileConfig.Subscriptions)
}
topics := parseTopics(topicsCSV)
if len(topics) == 0 {
topics = []string{topic}
}
subscriptions := make([]model.Subscription, 0, len(topics))
for _, entry := range topics {
subscriptions = append(subscriptions, model.Subscription{
DeviceID: deviceID,
GroupID: groupID,
Topic: entry,
})
}
return filterValidSubscriptions(subscriptions)
}
func parseTopics(value string) []string {
if strings.TrimSpace(value) == "" {
return nil
}
parts := strings.Split(value, ",")
topics := make([]string, 0, len(parts))
for _, part := range parts {
topic := strings.TrimSpace(part)
if topic == "" {
continue
}
topics = append(topics, topic)
}
return topics
}
func filterValidSubscriptions(items []model.Subscription) []model.Subscription {
subscriptions := make([]model.Subscription, 0, len(items))
for _, item := range items {
if strings.TrimSpace(item.ChannelID) == "" && strings.TrimSpace(item.DeviceID) == "" && strings.TrimSpace(item.GroupID) == "" {
continue
}
subscriptions = append(subscriptions, model.Subscription{
ChannelID: strings.TrimSpace(item.ChannelID),
DeviceID: strings.TrimSpace(item.DeviceID),
GroupID: strings.TrimSpace(item.GroupID),
Topic: strings.TrimSpace(item.Topic),
})
}
return subscriptions
}
func describeSubscriptions(items []model.Subscription) string {
parts := make([]string, 0, len(items))
for _, item := range items {
scope := item.DeviceID
if scope == "" {
if item.GroupID != "" {
scope = "group:" + item.GroupID
} else {
scope = "all"
}
}
if item.ChannelID != "" {
scope = "channel:" + item.ChannelID + " / " + scope
}
if item.Topic != "" {
scope += "/" + item.Topic
}
parts = append(parts, scope)
}
return strings.Join(parts, ", ")
}
func firstSnapshotDeviceID(items []model.Subscription, fallback string) string {
for _, item := range items {
if strings.TrimSpace(item.DeviceID) != "" {
return item.DeviceID
}
}
return fallback
}
func describeEnvelope(envelope *model.Envelope) string {
if envelope == nil {
return "{}"
}
switch envelope.Topic {
case "telemetry.location":
return describeLocationEnvelope(envelope)
case "telemetry.heart_rate":
return describeHeartRateEnvelope(envelope)
default:
return compactEnvelope(envelope)
}
}
func describeLocationEnvelope(envelope *model.Envelope) string {
var payload struct {
Lat float64 `json:"lat"`
Lng float64 `json:"lng"`
Speed float64 `json:"speed"`
Bearing float64 `json:"bearing"`
Accuracy float64 `json:"accuracy"`
CoordSystem string `json:"coordSystem"`
}
if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
return compactEnvelope(envelope)
}
return fmt.Sprintf(
"gps device=%s lat=%.6f lng=%.6f speed=%.2f bearing=%.1f accuracy=%.1f coord=%s",
envelope.Target.DeviceID,
payload.Lat,
payload.Lng,
payload.Speed,
payload.Bearing,
payload.Accuracy,
payload.CoordSystem,
)
}
func describeHeartRateEnvelope(envelope *model.Envelope) string {
var payload struct {
BPM int `json:"bpm"`
Confidence float64 `json:"confidence"`
}
if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
return compactEnvelope(envelope)
}
if payload.Confidence > 0 {
return fmt.Sprintf("heart_rate device=%s bpm=%d confidence=%.2f", envelope.Target.DeviceID, payload.BPM, payload.Confidence)
}
return fmt.Sprintf("heart_rate device=%s bpm=%d", envelope.Target.DeviceID, payload.BPM)
}
func compactEnvelope(envelope *model.Envelope) string {
if envelope == nil {
return "{}"
}
data, err := json.Marshal(envelope)
if err != nil {
return "{}"
}
return compactJSON(data)
}
func compactJSON(data []byte) string {
var value any
if err := json.Unmarshal(data, &value); err != nil {
return string(data)
}
compact, err := json.Marshal(value)
if err != nil {
return string(data)
}
return string(compact)
}