229 lines
6.3 KiB
Go
229 lines
6.3 KiB
Go
package gateway
|
|
|
|
import (
|
|
"embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/fs"
|
|
"net/http"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"realtime-gateway/internal/model"
|
|
)
|
|
|
|
//go:embed adminui/*
|
|
var adminUIFiles embed.FS
|
|
|
|
type adminOverview struct {
|
|
Status string `json:"status"`
|
|
StartedAt time.Time `json:"startedAt"`
|
|
Now time.Time `json:"now"`
|
|
UptimeSeconds int64 `json:"uptimeSeconds"`
|
|
HTTPListen string `json:"httpListen"`
|
|
Anonymous bool `json:"anonymousConsumers"`
|
|
Metrics map[string]any `json:"metrics"`
|
|
Auth map[string]any `json:"auth"`
|
|
Endpoints map[string]any `json:"endpoints"`
|
|
}
|
|
|
|
func (s *Server) registerAdminRoutes(mux *http.ServeMux) error {
|
|
sub, err := fs.Sub(adminUIFiles, "adminui")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
fileServer := http.FileServer(http.FS(sub))
|
|
mux.Handle("/assets/", http.StripPrefix("/assets/", noStoreHandler(fileServer)))
|
|
mux.HandleFunc("/", s.handleAdminIndex)
|
|
mux.HandleFunc("/admin", s.handleAdminIndex)
|
|
mux.HandleFunc("/api/admin/overview", s.handleAdminOverview)
|
|
mux.HandleFunc("/api/admin/sessions", s.handleAdminSessions)
|
|
mux.HandleFunc("/api/admin/latest", s.handleAdminLatest)
|
|
mux.HandleFunc("/api/admin/traffic", s.handleAdminTraffic)
|
|
mux.HandleFunc("/api/admin/live", s.handleAdminLive)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) handleAdminIndex(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/" && r.URL.Path != "/admin" {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
|
|
data, err := adminUIFiles.ReadFile("adminui/index.html")
|
|
if err != nil {
|
|
http.Error(w, "admin ui unavailable", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-store")
|
|
_, _ = w.Write(data)
|
|
}
|
|
|
|
func noStoreHandler(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Cache-Control", "no-store")
|
|
next.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleAdminOverview(w http.ResponseWriter, _ *http.Request) {
|
|
subscriberCount, latestStateCount := s.hub.Stats()
|
|
traffic := s.hub.TrafficSnapshot()
|
|
now := time.Now()
|
|
writeJSON(w, http.StatusOK, adminOverview{
|
|
Status: "ok",
|
|
StartedAt: s.startedAt,
|
|
Now: now,
|
|
UptimeSeconds: int64(now.Sub(s.startedAt).Seconds()),
|
|
HTTPListen: s.cfg.Server.HTTPListen,
|
|
Anonymous: s.cfg.Auth.AllowAnonymousConsumers,
|
|
Metrics: map[string]any{
|
|
"sessions": s.sessions.Count(),
|
|
"subscribers": subscriberCount,
|
|
"latestState": latestStateCount,
|
|
"channels": len(s.channels.List()),
|
|
"pluginHandlers": s.plugins.HandlerCount(),
|
|
"published": traffic.Published,
|
|
"dropped": traffic.Dropped,
|
|
"fanout": traffic.Fanout,
|
|
},
|
|
Auth: map[string]any{
|
|
"producerTokens": len(s.cfg.Auth.ProducerTokens),
|
|
"consumerTokens": len(s.cfg.Auth.ConsumerTokens),
|
|
"controllerTokens": len(s.cfg.Auth.ControllerTokens),
|
|
},
|
|
Endpoints: map[string]any{
|
|
"websocket": "/ws",
|
|
"health": "/healthz",
|
|
"metrics": "/metrics",
|
|
"createChannel": "/api/channel/create",
|
|
"channels": "/api/admin/channels",
|
|
"traffic": "/api/admin/traffic",
|
|
"admin": "/admin",
|
|
},
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleAdminSessions(w http.ResponseWriter, _ *http.Request) {
|
|
snapshots := s.sessions.List()
|
|
sort.Slice(snapshots, func(i int, j int) bool {
|
|
return snapshots[i].CreatedAt.After(snapshots[j].CreatedAt)
|
|
})
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"items": snapshots,
|
|
"count": len(snapshots),
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleAdminLatest(w http.ResponseWriter, r *http.Request) {
|
|
envelopes := s.hub.LatestStates()
|
|
sort.Slice(envelopes, func(i int, j int) bool {
|
|
return envelopes[i].Timestamp > envelopes[j].Timestamp
|
|
})
|
|
|
|
query := strings.TrimSpace(r.URL.Query().Get("topic"))
|
|
if query != "" {
|
|
filtered := make([]any, 0, len(envelopes))
|
|
for _, envelope := range envelopes {
|
|
if envelope.Topic != query {
|
|
continue
|
|
}
|
|
filtered = append(filtered, adminLatestItem(envelope))
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"items": filtered,
|
|
"count": len(filtered),
|
|
})
|
|
return
|
|
}
|
|
|
|
items := make([]any, 0, len(envelopes))
|
|
for _, envelope := range envelopes {
|
|
items = append(items, adminLatestItem(envelope))
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{
|
|
"items": items,
|
|
"count": len(items),
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleAdminLive(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
topic := strings.TrimSpace(r.URL.Query().Get("topic"))
|
|
channelID := strings.TrimSpace(r.URL.Query().Get("channelId"))
|
|
deviceID := strings.TrimSpace(r.URL.Query().Get("deviceId"))
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-store")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
|
|
id, stream := s.hub.SubscribeLive(64)
|
|
defer s.hub.UnsubscribeLive(id)
|
|
|
|
fmt.Fprint(w, ": live stream ready\n\n")
|
|
flusher.Flush()
|
|
|
|
ping := time.NewTicker(15 * time.Second)
|
|
defer ping.Stop()
|
|
|
|
ctx := r.Context()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ping.C:
|
|
fmt.Fprint(w, ": ping\n\n")
|
|
flusher.Flush()
|
|
case envelope, ok := <-stream:
|
|
if !ok {
|
|
return
|
|
}
|
|
if topic != "" && envelope.Topic != topic {
|
|
continue
|
|
}
|
|
if channelID != "" && envelope.Target.ChannelID != channelID {
|
|
continue
|
|
}
|
|
if deviceID != "" && envelope.Target.DeviceID != deviceID {
|
|
continue
|
|
}
|
|
|
|
data, err := json.Marshal(adminLatestItem(envelope))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
fmt.Fprintf(w, "event: envelope\ndata: %s\n\n", data)
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleAdminTraffic(w http.ResponseWriter, _ *http.Request) {
|
|
writeJSON(w, http.StatusOK, s.hub.TrafficSnapshot())
|
|
}
|
|
|
|
func adminLatestItem(envelope model.Envelope) map[string]any {
|
|
payload := map[string]any{}
|
|
_ = json.Unmarshal(envelope.Payload, &payload)
|
|
|
|
return map[string]any{
|
|
"timestamp": envelope.Timestamp,
|
|
"topic": envelope.Topic,
|
|
"channelId": envelope.Target.ChannelID,
|
|
"deviceId": envelope.Target.DeviceID,
|
|
"groupId": envelope.Target.GroupID,
|
|
"sourceId": envelope.Source.ID,
|
|
"mode": envelope.Source.Mode,
|
|
"payload": payload,
|
|
}
|
|
}
|