Files
cmr-mini/realtime-gateway/internal/gateway/admin_ui.go

229 lines
6.3 KiB
Go

package gateway
import (
"embed"
"encoding/json"
"fmt"
"io/fs"
"net/http"
"sort"
"strings"
"time"
"realtime-gateway/internal/model"
)
//go:embed adminui/*
var adminUIFiles embed.FS
type adminOverview struct {
Status string `json:"status"`
StartedAt time.Time `json:"startedAt"`
Now time.Time `json:"now"`
UptimeSeconds int64 `json:"uptimeSeconds"`
HTTPListen string `json:"httpListen"`
Anonymous bool `json:"anonymousConsumers"`
Metrics map[string]any `json:"metrics"`
Auth map[string]any `json:"auth"`
Endpoints map[string]any `json:"endpoints"`
}
func (s *Server) registerAdminRoutes(mux *http.ServeMux) error {
sub, err := fs.Sub(adminUIFiles, "adminui")
if err != nil {
return err
}
fileServer := http.FileServer(http.FS(sub))
mux.Handle("/assets/", http.StripPrefix("/assets/", noStoreHandler(fileServer)))
mux.HandleFunc("/", s.handleAdminIndex)
mux.HandleFunc("/admin", s.handleAdminIndex)
mux.HandleFunc("/api/admin/overview", s.handleAdminOverview)
mux.HandleFunc("/api/admin/sessions", s.handleAdminSessions)
mux.HandleFunc("/api/admin/latest", s.handleAdminLatest)
mux.HandleFunc("/api/admin/traffic", s.handleAdminTraffic)
mux.HandleFunc("/api/admin/live", s.handleAdminLive)
return nil
}
func (s *Server) handleAdminIndex(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/admin" {
http.NotFound(w, r)
return
}
data, err := adminUIFiles.ReadFile("adminui/index.html")
if err != nil {
http.Error(w, "admin ui unavailable", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
_, _ = w.Write(data)
}
func noStoreHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-store")
next.ServeHTTP(w, r)
})
}
func (s *Server) handleAdminOverview(w http.ResponseWriter, _ *http.Request) {
subscriberCount, latestStateCount := s.hub.Stats()
traffic := s.hub.TrafficSnapshot()
now := time.Now()
writeJSON(w, http.StatusOK, adminOverview{
Status: "ok",
StartedAt: s.startedAt,
Now: now,
UptimeSeconds: int64(now.Sub(s.startedAt).Seconds()),
HTTPListen: s.cfg.Server.HTTPListen,
Anonymous: s.cfg.Auth.AllowAnonymousConsumers,
Metrics: map[string]any{
"sessions": s.sessions.Count(),
"subscribers": subscriberCount,
"latestState": latestStateCount,
"channels": len(s.channels.List()),
"pluginHandlers": s.plugins.HandlerCount(),
"published": traffic.Published,
"dropped": traffic.Dropped,
"fanout": traffic.Fanout,
},
Auth: map[string]any{
"producerTokens": len(s.cfg.Auth.ProducerTokens),
"consumerTokens": len(s.cfg.Auth.ConsumerTokens),
"controllerTokens": len(s.cfg.Auth.ControllerTokens),
},
Endpoints: map[string]any{
"websocket": "/ws",
"health": "/healthz",
"metrics": "/metrics",
"createChannel": "/api/channel/create",
"channels": "/api/admin/channels",
"traffic": "/api/admin/traffic",
"admin": "/admin",
},
})
}
func (s *Server) handleAdminSessions(w http.ResponseWriter, _ *http.Request) {
snapshots := s.sessions.List()
sort.Slice(snapshots, func(i int, j int) bool {
return snapshots[i].CreatedAt.After(snapshots[j].CreatedAt)
})
writeJSON(w, http.StatusOK, map[string]any{
"items": snapshots,
"count": len(snapshots),
})
}
func (s *Server) handleAdminLatest(w http.ResponseWriter, r *http.Request) {
envelopes := s.hub.LatestStates()
sort.Slice(envelopes, func(i int, j int) bool {
return envelopes[i].Timestamp > envelopes[j].Timestamp
})
query := strings.TrimSpace(r.URL.Query().Get("topic"))
if query != "" {
filtered := make([]any, 0, len(envelopes))
for _, envelope := range envelopes {
if envelope.Topic != query {
continue
}
filtered = append(filtered, adminLatestItem(envelope))
}
writeJSON(w, http.StatusOK, map[string]any{
"items": filtered,
"count": len(filtered),
})
return
}
items := make([]any, 0, len(envelopes))
for _, envelope := range envelopes {
items = append(items, adminLatestItem(envelope))
}
writeJSON(w, http.StatusOK, map[string]any{
"items": items,
"count": len(items),
})
}
func (s *Server) handleAdminLive(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
topic := strings.TrimSpace(r.URL.Query().Get("topic"))
channelID := strings.TrimSpace(r.URL.Query().Get("channelId"))
deviceID := strings.TrimSpace(r.URL.Query().Get("deviceId"))
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
id, stream := s.hub.SubscribeLive(64)
defer s.hub.UnsubscribeLive(id)
fmt.Fprint(w, ": live stream ready\n\n")
flusher.Flush()
ping := time.NewTicker(15 * time.Second)
defer ping.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ping.C:
fmt.Fprint(w, ": ping\n\n")
flusher.Flush()
case envelope, ok := <-stream:
if !ok {
return
}
if topic != "" && envelope.Topic != topic {
continue
}
if channelID != "" && envelope.Target.ChannelID != channelID {
continue
}
if deviceID != "" && envelope.Target.DeviceID != deviceID {
continue
}
data, err := json.Marshal(adminLatestItem(envelope))
if err != nil {
continue
}
fmt.Fprintf(w, "event: envelope\ndata: %s\n\n", data)
flusher.Flush()
}
}
}
func (s *Server) handleAdminTraffic(w http.ResponseWriter, _ *http.Request) {
writeJSON(w, http.StatusOK, s.hub.TrafficSnapshot())
}
func adminLatestItem(envelope model.Envelope) map[string]any {
payload := map[string]any{}
_ = json.Unmarshal(envelope.Payload, &payload)
return map[string]any{
"timestamp": envelope.Timestamp,
"topic": envelope.Topic,
"channelId": envelope.Target.ChannelID,
"deviceId": envelope.Target.DeviceID,
"groupId": envelope.Target.GroupID,
"sourceId": envelope.Source.ID,
"mode": envelope.Source.Mode,
"payload": payload,
}
}