Files
cmr-mini/realtime-gateway/internal/router/hub.go

338 lines
7.5 KiB
Go

package router
import (
"encoding/json"
"sync"
"realtime-gateway/internal/model"
)
type Subscriber interface {
ID() string
Send(message model.ServerMessage) error
}
type Hub struct {
mu sync.RWMutex
subscribers map[string]Subscriber
filters map[string][]model.Subscription
latestState map[string]model.Envelope
liveFeeds map[uint64]chan model.Envelope
nextLiveID uint64
stats trafficStats
maxLatest int
}
type TrafficSnapshot struct {
Published uint64 `json:"published"`
Dropped uint64 `json:"dropped"`
Fanout uint64 `json:"fanout"`
Topics []TopicTrafficItem `json:"topics"`
Channels []ChannelTrafficItem `json:"channels"`
}
type TopicTrafficItem struct {
Topic string `json:"topic"`
Published uint64 `json:"published"`
Dropped uint64 `json:"dropped"`
Fanout uint64 `json:"fanout"`
}
type ChannelTrafficItem struct {
ChannelID string `json:"channelId"`
Published uint64 `json:"published"`
Dropped uint64 `json:"dropped"`
Fanout uint64 `json:"fanout"`
}
type trafficStats struct {
Published uint64
Dropped uint64
Fanout uint64
Topics map[string]*trafficCounter
Channels map[string]*trafficCounter
}
type trafficCounter struct {
Published uint64
Dropped uint64
Fanout uint64
}
type PublishResult struct {
Matched int `json:"matched"`
Stored bool `json:"stored"`
Dropped bool `json:"dropped"`
}
func NewHub(maxLatest int) *Hub {
return &Hub{
subscribers: make(map[string]Subscriber),
filters: make(map[string][]model.Subscription),
latestState: make(map[string]model.Envelope),
liveFeeds: make(map[uint64]chan model.Envelope),
stats: trafficStats{
Topics: make(map[string]*trafficCounter),
Channels: make(map[string]*trafficCounter),
},
maxLatest: maxLatest,
}
}
func (h *Hub) Register(subscriber Subscriber, subscriptions []model.Subscription) {
h.mu.Lock()
defer h.mu.Unlock()
h.subscribers[subscriber.ID()] = subscriber
h.filters[subscriber.ID()] = subscriptions
}
func (h *Hub) Unregister(subscriberID string) {
h.mu.Lock()
defer h.mu.Unlock()
delete(h.subscribers, subscriberID)
delete(h.filters, subscriberID)
}
func (h *Hub) UpdateSubscriptions(subscriberID string, subscriptions []model.Subscription) {
h.mu.Lock()
defer h.mu.Unlock()
h.filters[subscriberID] = subscriptions
}
func (h *Hub) Publish(envelope model.Envelope, deliveryMode string) PublishResult {
h.mu.RLock()
matches := make([]Subscriber, 0, len(h.subscribers))
for subscriberID, subscriber := range h.subscribers {
subscriptions := h.filters[subscriberID]
if !matchesAny(envelope, subscriptions) {
continue
}
matches = append(matches, subscriber)
}
h.mu.RUnlock()
if deliveryMode == "drop_if_no_consumer" && len(matches) == 0 {
h.recordTraffic(envelope, 0, true)
return PublishResult{
Matched: 0,
Stored: false,
Dropped: true,
}
}
h.storeLatest(envelope)
h.publishLive(envelope)
h.recordTraffic(envelope, len(matches), false)
for _, subscriber := range matches {
_ = subscriber.Send(model.ServerMessage{
Type: "event",
Envelope: &envelope,
})
}
return PublishResult{
Matched: len(matches),
Stored: true,
Dropped: false,
}
}
func (h *Hub) Snapshot(channelID string, deviceID string) (json.RawMessage, bool) {
h.mu.RLock()
defer h.mu.RUnlock()
envelope, ok := h.latestState[latestStateKey(channelID, deviceID)]
if !ok {
return nil, false
}
data, err := json.Marshal(envelope)
if err != nil {
return nil, false
}
return data, true
}
func (h *Hub) Stats() (subscriberCount int, latestStateCount int) {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.subscribers), len(h.latestState)
}
func (h *Hub) LatestStates() []model.Envelope {
h.mu.RLock()
defer h.mu.RUnlock()
items := make([]model.Envelope, 0, len(h.latestState))
for _, envelope := range h.latestState {
items = append(items, envelope)
}
return items
}
func (h *Hub) TrafficSnapshot() TrafficSnapshot {
h.mu.RLock()
defer h.mu.RUnlock()
topics := make([]TopicTrafficItem, 0, len(h.stats.Topics))
for topic, counter := range h.stats.Topics {
topics = append(topics, TopicTrafficItem{
Topic: topic,
Published: counter.Published,
Dropped: counter.Dropped,
Fanout: counter.Fanout,
})
}
channels := make([]ChannelTrafficItem, 0, len(h.stats.Channels))
for channelID, counter := range h.stats.Channels {
channels = append(channels, ChannelTrafficItem{
ChannelID: channelID,
Published: counter.Published,
Dropped: counter.Dropped,
Fanout: counter.Fanout,
})
}
return TrafficSnapshot{
Published: h.stats.Published,
Dropped: h.stats.Dropped,
Fanout: h.stats.Fanout,
Topics: topics,
Channels: channels,
}
}
func (h *Hub) SubscribeLive(buffer int) (uint64, <-chan model.Envelope) {
if buffer <= 0 {
buffer = 32
}
h.mu.Lock()
defer h.mu.Unlock()
h.nextLiveID += 1
id := h.nextLiveID
ch := make(chan model.Envelope, buffer)
h.liveFeeds[id] = ch
return id, ch
}
func (h *Hub) UnsubscribeLive(id uint64) {
h.mu.Lock()
defer h.mu.Unlock()
ch, ok := h.liveFeeds[id]
if !ok {
return
}
delete(h.liveFeeds, id)
close(ch)
}
func (h *Hub) storeLatest(envelope model.Envelope) {
if envelope.Target.DeviceID == "" {
return
}
h.mu.Lock()
defer h.mu.Unlock()
if len(h.latestState) >= h.maxLatest {
for key := range h.latestState {
delete(h.latestState, key)
break
}
}
h.latestState[latestStateKey(envelope.Target.ChannelID, envelope.Target.DeviceID)] = envelope
}
func (h *Hub) publishLive(envelope model.Envelope) {
h.mu.RLock()
feeds := make([]chan model.Envelope, 0, len(h.liveFeeds))
for _, ch := range h.liveFeeds {
feeds = append(feeds, ch)
}
h.mu.RUnlock()
for _, ch := range feeds {
select {
case ch <- envelope:
default:
}
}
}
func (h *Hub) recordTraffic(envelope model.Envelope, matched int, dropped bool) {
h.mu.Lock()
defer h.mu.Unlock()
h.stats.Published += 1
h.stats.Fanout += uint64(matched)
if dropped {
h.stats.Dropped += 1
}
topicKey := envelope.Topic
if topicKey == "" {
topicKey = "--"
}
topicCounter := h.stats.Topics[topicKey]
if topicCounter == nil {
topicCounter = &trafficCounter{}
h.stats.Topics[topicKey] = topicCounter
}
topicCounter.Published += 1
topicCounter.Fanout += uint64(matched)
if dropped {
topicCounter.Dropped += 1
}
channelKey := envelope.Target.ChannelID
if channelKey == "" {
channelKey = "--"
}
channelCounter := h.stats.Channels[channelKey]
if channelCounter == nil {
channelCounter = &trafficCounter{}
h.stats.Channels[channelKey] = channelCounter
}
channelCounter.Published += 1
channelCounter.Fanout += uint64(matched)
if dropped {
channelCounter.Dropped += 1
}
}
func matchesAny(envelope model.Envelope, subscriptions []model.Subscription) bool {
if len(subscriptions) == 0 {
return false
}
for _, subscription := range subscriptions {
if matches(envelope, subscription) {
return true
}
}
return false
}
func matches(envelope model.Envelope, subscription model.Subscription) bool {
if subscription.ChannelID != "" && subscription.ChannelID != envelope.Target.ChannelID {
return false
}
if subscription.DeviceID != "" && subscription.DeviceID != envelope.Target.DeviceID {
return false
}
if subscription.GroupID != "" && subscription.GroupID != envelope.Target.GroupID {
return false
}
if subscription.Topic != "" && subscription.Topic != envelope.Topic {
return false
}
return true
}
func latestStateKey(channelID string, deviceID string) string {
if channelID == "" {
return deviceID
}
return channelID + "::" + deviceID
}