Add realtime gateway and simulator bridge
This commit is contained in:
337
realtime-gateway/internal/router/hub.go
Normal file
337
realtime-gateway/internal/router/hub.go
Normal file
@@ -0,0 +1,337 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"realtime-gateway/internal/model"
|
||||
)
|
||||
|
||||
type Subscriber interface {
|
||||
ID() string
|
||||
Send(message model.ServerMessage) error
|
||||
}
|
||||
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
subscribers map[string]Subscriber
|
||||
filters map[string][]model.Subscription
|
||||
latestState map[string]model.Envelope
|
||||
liveFeeds map[uint64]chan model.Envelope
|
||||
nextLiveID uint64
|
||||
stats trafficStats
|
||||
maxLatest int
|
||||
}
|
||||
|
||||
type TrafficSnapshot struct {
|
||||
Published uint64 `json:"published"`
|
||||
Dropped uint64 `json:"dropped"`
|
||||
Fanout uint64 `json:"fanout"`
|
||||
Topics []TopicTrafficItem `json:"topics"`
|
||||
Channels []ChannelTrafficItem `json:"channels"`
|
||||
}
|
||||
|
||||
type TopicTrafficItem struct {
|
||||
Topic string `json:"topic"`
|
||||
Published uint64 `json:"published"`
|
||||
Dropped uint64 `json:"dropped"`
|
||||
Fanout uint64 `json:"fanout"`
|
||||
}
|
||||
|
||||
type ChannelTrafficItem struct {
|
||||
ChannelID string `json:"channelId"`
|
||||
Published uint64 `json:"published"`
|
||||
Dropped uint64 `json:"dropped"`
|
||||
Fanout uint64 `json:"fanout"`
|
||||
}
|
||||
|
||||
type trafficStats struct {
|
||||
Published uint64
|
||||
Dropped uint64
|
||||
Fanout uint64
|
||||
Topics map[string]*trafficCounter
|
||||
Channels map[string]*trafficCounter
|
||||
}
|
||||
|
||||
type trafficCounter struct {
|
||||
Published uint64
|
||||
Dropped uint64
|
||||
Fanout uint64
|
||||
}
|
||||
|
||||
type PublishResult struct {
|
||||
Matched int `json:"matched"`
|
||||
Stored bool `json:"stored"`
|
||||
Dropped bool `json:"dropped"`
|
||||
}
|
||||
|
||||
func NewHub(maxLatest int) *Hub {
|
||||
return &Hub{
|
||||
subscribers: make(map[string]Subscriber),
|
||||
filters: make(map[string][]model.Subscription),
|
||||
latestState: make(map[string]model.Envelope),
|
||||
liveFeeds: make(map[uint64]chan model.Envelope),
|
||||
stats: trafficStats{
|
||||
Topics: make(map[string]*trafficCounter),
|
||||
Channels: make(map[string]*trafficCounter),
|
||||
},
|
||||
maxLatest: maxLatest,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Register(subscriber Subscriber, subscriptions []model.Subscription) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.subscribers[subscriber.ID()] = subscriber
|
||||
h.filters[subscriber.ID()] = subscriptions
|
||||
}
|
||||
|
||||
func (h *Hub) Unregister(subscriberID string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
delete(h.subscribers, subscriberID)
|
||||
delete(h.filters, subscriberID)
|
||||
}
|
||||
|
||||
func (h *Hub) UpdateSubscriptions(subscriberID string, subscriptions []model.Subscription) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.filters[subscriberID] = subscriptions
|
||||
}
|
||||
|
||||
func (h *Hub) Publish(envelope model.Envelope, deliveryMode string) PublishResult {
|
||||
h.mu.RLock()
|
||||
matches := make([]Subscriber, 0, len(h.subscribers))
|
||||
for subscriberID, subscriber := range h.subscribers {
|
||||
subscriptions := h.filters[subscriberID]
|
||||
if !matchesAny(envelope, subscriptions) {
|
||||
continue
|
||||
}
|
||||
matches = append(matches, subscriber)
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
|
||||
if deliveryMode == "drop_if_no_consumer" && len(matches) == 0 {
|
||||
h.recordTraffic(envelope, 0, true)
|
||||
return PublishResult{
|
||||
Matched: 0,
|
||||
Stored: false,
|
||||
Dropped: true,
|
||||
}
|
||||
}
|
||||
|
||||
h.storeLatest(envelope)
|
||||
h.publishLive(envelope)
|
||||
h.recordTraffic(envelope, len(matches), false)
|
||||
for _, subscriber := range matches {
|
||||
_ = subscriber.Send(model.ServerMessage{
|
||||
Type: "event",
|
||||
Envelope: &envelope,
|
||||
})
|
||||
}
|
||||
return PublishResult{
|
||||
Matched: len(matches),
|
||||
Stored: true,
|
||||
Dropped: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) Snapshot(channelID string, deviceID string) (json.RawMessage, bool) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
envelope, ok := h.latestState[latestStateKey(channelID, deviceID)]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
data, err := json.Marshal(envelope)
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
return data, true
|
||||
}
|
||||
|
||||
func (h *Hub) Stats() (subscriberCount int, latestStateCount int) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.subscribers), len(h.latestState)
|
||||
}
|
||||
|
||||
func (h *Hub) LatestStates() []model.Envelope {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
items := make([]model.Envelope, 0, len(h.latestState))
|
||||
for _, envelope := range h.latestState {
|
||||
items = append(items, envelope)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
func (h *Hub) TrafficSnapshot() TrafficSnapshot {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
topics := make([]TopicTrafficItem, 0, len(h.stats.Topics))
|
||||
for topic, counter := range h.stats.Topics {
|
||||
topics = append(topics, TopicTrafficItem{
|
||||
Topic: topic,
|
||||
Published: counter.Published,
|
||||
Dropped: counter.Dropped,
|
||||
Fanout: counter.Fanout,
|
||||
})
|
||||
}
|
||||
|
||||
channels := make([]ChannelTrafficItem, 0, len(h.stats.Channels))
|
||||
for channelID, counter := range h.stats.Channels {
|
||||
channels = append(channels, ChannelTrafficItem{
|
||||
ChannelID: channelID,
|
||||
Published: counter.Published,
|
||||
Dropped: counter.Dropped,
|
||||
Fanout: counter.Fanout,
|
||||
})
|
||||
}
|
||||
|
||||
return TrafficSnapshot{
|
||||
Published: h.stats.Published,
|
||||
Dropped: h.stats.Dropped,
|
||||
Fanout: h.stats.Fanout,
|
||||
Topics: topics,
|
||||
Channels: channels,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) SubscribeLive(buffer int) (uint64, <-chan model.Envelope) {
|
||||
if buffer <= 0 {
|
||||
buffer = 32
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.nextLiveID += 1
|
||||
id := h.nextLiveID
|
||||
ch := make(chan model.Envelope, buffer)
|
||||
h.liveFeeds[id] = ch
|
||||
return id, ch
|
||||
}
|
||||
|
||||
func (h *Hub) UnsubscribeLive(id uint64) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
ch, ok := h.liveFeeds[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
delete(h.liveFeeds, id)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (h *Hub) storeLatest(envelope model.Envelope) {
|
||||
if envelope.Target.DeviceID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if len(h.latestState) >= h.maxLatest {
|
||||
for key := range h.latestState {
|
||||
delete(h.latestState, key)
|
||||
break
|
||||
}
|
||||
}
|
||||
h.latestState[latestStateKey(envelope.Target.ChannelID, envelope.Target.DeviceID)] = envelope
|
||||
}
|
||||
|
||||
func (h *Hub) publishLive(envelope model.Envelope) {
|
||||
h.mu.RLock()
|
||||
feeds := make([]chan model.Envelope, 0, len(h.liveFeeds))
|
||||
for _, ch := range h.liveFeeds {
|
||||
feeds = append(feeds, ch)
|
||||
}
|
||||
h.mu.RUnlock()
|
||||
|
||||
for _, ch := range feeds {
|
||||
select {
|
||||
case ch <- envelope:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) recordTraffic(envelope model.Envelope, matched int, dropped bool) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.stats.Published += 1
|
||||
h.stats.Fanout += uint64(matched)
|
||||
if dropped {
|
||||
h.stats.Dropped += 1
|
||||
}
|
||||
|
||||
topicKey := envelope.Topic
|
||||
if topicKey == "" {
|
||||
topicKey = "--"
|
||||
}
|
||||
topicCounter := h.stats.Topics[topicKey]
|
||||
if topicCounter == nil {
|
||||
topicCounter = &trafficCounter{}
|
||||
h.stats.Topics[topicKey] = topicCounter
|
||||
}
|
||||
topicCounter.Published += 1
|
||||
topicCounter.Fanout += uint64(matched)
|
||||
if dropped {
|
||||
topicCounter.Dropped += 1
|
||||
}
|
||||
|
||||
channelKey := envelope.Target.ChannelID
|
||||
if channelKey == "" {
|
||||
channelKey = "--"
|
||||
}
|
||||
channelCounter := h.stats.Channels[channelKey]
|
||||
if channelCounter == nil {
|
||||
channelCounter = &trafficCounter{}
|
||||
h.stats.Channels[channelKey] = channelCounter
|
||||
}
|
||||
channelCounter.Published += 1
|
||||
channelCounter.Fanout += uint64(matched)
|
||||
if dropped {
|
||||
channelCounter.Dropped += 1
|
||||
}
|
||||
}
|
||||
|
||||
func matchesAny(envelope model.Envelope, subscriptions []model.Subscription) bool {
|
||||
if len(subscriptions) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, subscription := range subscriptions {
|
||||
if matches(envelope, subscription) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func matches(envelope model.Envelope, subscription model.Subscription) bool {
|
||||
if subscription.ChannelID != "" && subscription.ChannelID != envelope.Target.ChannelID {
|
||||
return false
|
||||
}
|
||||
if subscription.DeviceID != "" && subscription.DeviceID != envelope.Target.DeviceID {
|
||||
return false
|
||||
}
|
||||
if subscription.GroupID != "" && subscription.GroupID != envelope.Target.GroupID {
|
||||
return false
|
||||
}
|
||||
if subscription.Topic != "" && subscription.Topic != envelope.Topic {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func latestStateKey(channelID string, deviceID string) string {
|
||||
if channelID == "" {
|
||||
return deviceID
|
||||
}
|
||||
return channelID + "::" + deviceID
|
||||
}
|
||||
Reference in New Issue
Block a user