Files
cmr-mini/realtime-gateway/internal/plugin/bus.go

54 lines
939 B
Go

package plugin
import (
"context"
"log/slog"
"sync"
"realtime-gateway/internal/model"
)
type Handler interface {
Name() string
Handle(context.Context, model.Envelope) error
}
type Bus struct {
logger *slog.Logger
mu sync.RWMutex
handlers []Handler
}
func NewBus(logger *slog.Logger) *Bus {
return &Bus{
logger: logger,
}
}
func (b *Bus) Register(handler Handler) {
b.mu.Lock()
b.handlers = append(b.handlers, handler)
b.mu.Unlock()
}
func (b *Bus) Publish(envelope model.Envelope) {
b.mu.RLock()
handlers := append([]Handler(nil), b.handlers...)
b.mu.RUnlock()
for _, handler := range handlers {
handler := handler
go func() {
if err := handler.Handle(context.Background(), envelope); err != nil {
b.logger.Warn("plugin handler failed", "handler", handler.Name(), "error", err)
}
}()
}
}
func (b *Bus) HandlerCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.handlers)
}