Add backend foundation and config-driven workbench
This commit is contained in:
323
backend/internal/store/postgres/config_store.go
Normal file
323
backend/internal/store/postgres/config_store.go
Normal file
@@ -0,0 +1,323 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type EventConfigSource struct {
|
||||
ID string
|
||||
EventID string
|
||||
SourceVersionNo int
|
||||
SourceKind string
|
||||
SchemaID string
|
||||
SchemaVersion string
|
||||
Status string
|
||||
SourceJSON string
|
||||
Notes *string
|
||||
}
|
||||
|
||||
type EventConfigBuild struct {
|
||||
ID string
|
||||
EventID string
|
||||
SourceID string
|
||||
BuildNo int
|
||||
BuildStatus string
|
||||
BuildLog *string
|
||||
ManifestJSON string
|
||||
AssetIndexJSON string
|
||||
}
|
||||
|
||||
type EventReleaseAsset struct {
|
||||
ID string
|
||||
EventReleaseID string
|
||||
AssetType string
|
||||
AssetKey string
|
||||
AssetPath *string
|
||||
AssetURL string
|
||||
Checksum *string
|
||||
SizeBytes *int64
|
||||
MetaJSON string
|
||||
}
|
||||
|
||||
type UpsertEventConfigSourceParams struct {
|
||||
EventID string
|
||||
SourceVersionNo int
|
||||
SourceKind string
|
||||
SchemaID string
|
||||
SchemaVersion string
|
||||
Status string
|
||||
Source map[string]any
|
||||
Notes *string
|
||||
}
|
||||
|
||||
type UpsertEventConfigBuildParams struct {
|
||||
EventID string
|
||||
SourceID string
|
||||
BuildNo int
|
||||
BuildStatus string
|
||||
BuildLog *string
|
||||
Manifest map[string]any
|
||||
AssetIndex []map[string]any
|
||||
}
|
||||
|
||||
type UpsertEventReleaseAssetParams struct {
|
||||
EventReleaseID string
|
||||
AssetType string
|
||||
AssetKey string
|
||||
AssetPath *string
|
||||
AssetURL string
|
||||
Checksum *string
|
||||
SizeBytes *int64
|
||||
Meta map[string]any
|
||||
}
|
||||
|
||||
func (s *Store) UpsertEventConfigSource(ctx context.Context, tx Tx, params UpsertEventConfigSourceParams) (*EventConfigSource, error) {
|
||||
sourceJSON, err := json.Marshal(params.Source)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal event config source: %w", err)
|
||||
}
|
||||
|
||||
row := tx.QueryRow(ctx, `
|
||||
INSERT INTO event_config_sources (
|
||||
event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb, notes
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8)
|
||||
ON CONFLICT (event_id, source_version_no) DO UPDATE SET
|
||||
source_kind = EXCLUDED.source_kind,
|
||||
schema_id = EXCLUDED.schema_id,
|
||||
schema_version = EXCLUDED.schema_version,
|
||||
status = EXCLUDED.status,
|
||||
source_jsonb = EXCLUDED.source_jsonb,
|
||||
notes = EXCLUDED.notes
|
||||
RETURNING id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
|
||||
`, params.EventID, params.SourceVersionNo, params.SourceKind, params.SchemaID, params.SchemaVersion, params.Status, string(sourceJSON), params.Notes)
|
||||
|
||||
var item EventConfigSource
|
||||
if err := row.Scan(
|
||||
&item.ID,
|
||||
&item.EventID,
|
||||
&item.SourceVersionNo,
|
||||
&item.SourceKind,
|
||||
&item.SchemaID,
|
||||
&item.SchemaVersion,
|
||||
&item.Status,
|
||||
&item.SourceJSON,
|
||||
&item.Notes,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("upsert event config source: %w", err)
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func (s *Store) UpsertEventConfigBuild(ctx context.Context, tx Tx, params UpsertEventConfigBuildParams) (*EventConfigBuild, error) {
|
||||
manifestJSON, err := json.Marshal(params.Manifest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal event config manifest: %w", err)
|
||||
}
|
||||
assetIndexJSON, err := json.Marshal(params.AssetIndex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal event config asset index: %w", err)
|
||||
}
|
||||
|
||||
row := tx.QueryRow(ctx, `
|
||||
INSERT INTO event_config_builds (
|
||||
event_id, source_id, build_no, build_status, build_log, manifest_jsonb, asset_index_jsonb
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb)
|
||||
ON CONFLICT (event_id, build_no) DO UPDATE SET
|
||||
source_id = EXCLUDED.source_id,
|
||||
build_status = EXCLUDED.build_status,
|
||||
build_log = EXCLUDED.build_log,
|
||||
manifest_jsonb = EXCLUDED.manifest_jsonb,
|
||||
asset_index_jsonb = EXCLUDED.asset_index_jsonb
|
||||
RETURNING id, event_id, source_id, build_no, build_status, build_log, manifest_jsonb::text, asset_index_jsonb::text
|
||||
`, params.EventID, params.SourceID, params.BuildNo, params.BuildStatus, params.BuildLog, string(manifestJSON), string(assetIndexJSON))
|
||||
|
||||
var item EventConfigBuild
|
||||
if err := row.Scan(
|
||||
&item.ID,
|
||||
&item.EventID,
|
||||
&item.SourceID,
|
||||
&item.BuildNo,
|
||||
&item.BuildStatus,
|
||||
&item.BuildLog,
|
||||
&item.ManifestJSON,
|
||||
&item.AssetIndexJSON,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("upsert event config build: %w", err)
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func (s *Store) AttachBuildToRelease(ctx context.Context, tx Tx, releaseID, buildID string) error {
|
||||
if _, err := tx.Exec(ctx, `
|
||||
UPDATE event_releases
|
||||
SET build_id = $2
|
||||
WHERE id = $1
|
||||
`, releaseID, buildID); err != nil {
|
||||
return fmt.Errorf("attach build to release: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) ReplaceEventReleaseAssets(ctx context.Context, tx Tx, eventReleaseID string, assets []UpsertEventReleaseAssetParams) error {
|
||||
if _, err := tx.Exec(ctx, `DELETE FROM event_release_assets WHERE event_release_id = $1`, eventReleaseID); err != nil {
|
||||
return fmt.Errorf("clear event release assets: %w", err)
|
||||
}
|
||||
|
||||
for _, asset := range assets {
|
||||
metaJSON, err := json.Marshal(asset.Meta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal event release asset meta: %w", err)
|
||||
}
|
||||
if _, err := tx.Exec(ctx, `
|
||||
INSERT INTO event_release_assets (
|
||||
event_release_id, asset_type, asset_key, asset_path, asset_url, checksum, size_bytes, meta_jsonb
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
|
||||
`, eventReleaseID, asset.AssetType, asset.AssetKey, asset.AssetPath, asset.AssetURL, asset.Checksum, asset.SizeBytes, string(metaJSON)); err != nil {
|
||||
return fmt.Errorf("insert event release asset: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) NextEventConfigSourceVersion(ctx context.Context, eventID string) (int, error) {
|
||||
var next int
|
||||
if err := s.pool.QueryRow(ctx, `
|
||||
SELECT COALESCE(MAX(source_version_no), 0) + 1
|
||||
FROM event_config_sources
|
||||
WHERE event_id = $1
|
||||
`, eventID).Scan(&next); err != nil {
|
||||
return 0, fmt.Errorf("next event config source version: %w", err)
|
||||
}
|
||||
return next, nil
|
||||
}
|
||||
|
||||
func (s *Store) NextEventConfigBuildNo(ctx context.Context, eventID string) (int, error) {
|
||||
var next int
|
||||
if err := s.pool.QueryRow(ctx, `
|
||||
SELECT COALESCE(MAX(build_no), 0) + 1
|
||||
FROM event_config_builds
|
||||
WHERE event_id = $1
|
||||
`, eventID).Scan(&next); err != nil {
|
||||
return 0, fmt.Errorf("next event config build no: %w", err)
|
||||
}
|
||||
return next, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListEventConfigSourcesByEventID(ctx context.Context, eventID string, limit int) ([]EventConfigSource, error) {
|
||||
if limit <= 0 || limit > 100 {
|
||||
limit = 20
|
||||
}
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
|
||||
FROM event_config_sources
|
||||
WHERE event_id = $1
|
||||
ORDER BY source_version_no DESC
|
||||
LIMIT $2
|
||||
`, eventID, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list event config sources: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var items []EventConfigSource
|
||||
for rows.Next() {
|
||||
item, err := scanEventConfigSourceFromRows(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, *item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate event config sources: %w", err)
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (s *Store) GetEventConfigSourceByID(ctx context.Context, sourceID string) (*EventConfigSource, error) {
|
||||
row := s.pool.QueryRow(ctx, `
|
||||
SELECT id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
|
||||
FROM event_config_sources
|
||||
WHERE id = $1
|
||||
LIMIT 1
|
||||
`, sourceID)
|
||||
return scanEventConfigSource(row)
|
||||
}
|
||||
|
||||
func (s *Store) GetEventConfigBuildByID(ctx context.Context, buildID string) (*EventConfigBuild, error) {
|
||||
row := s.pool.QueryRow(ctx, `
|
||||
SELECT id, event_id, source_id, build_no, build_status, build_log, manifest_jsonb::text, asset_index_jsonb::text
|
||||
FROM event_config_builds
|
||||
WHERE id = $1
|
||||
LIMIT 1
|
||||
`, buildID)
|
||||
return scanEventConfigBuild(row)
|
||||
}
|
||||
|
||||
func scanEventConfigSource(row pgx.Row) (*EventConfigSource, error) {
|
||||
var item EventConfigSource
|
||||
err := row.Scan(
|
||||
&item.ID,
|
||||
&item.EventID,
|
||||
&item.SourceVersionNo,
|
||||
&item.SourceKind,
|
||||
&item.SchemaID,
|
||||
&item.SchemaVersion,
|
||||
&item.Status,
|
||||
&item.SourceJSON,
|
||||
&item.Notes,
|
||||
)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan event config source: %w", err)
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func scanEventConfigSourceFromRows(rows pgx.Rows) (*EventConfigSource, error) {
|
||||
var item EventConfigSource
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.EventID,
|
||||
&item.SourceVersionNo,
|
||||
&item.SourceKind,
|
||||
&item.SchemaID,
|
||||
&item.SchemaVersion,
|
||||
&item.Status,
|
||||
&item.SourceJSON,
|
||||
&item.Notes,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("scan event config source row: %w", err)
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
|
||||
func scanEventConfigBuild(row pgx.Row) (*EventConfigBuild, error) {
|
||||
var item EventConfigBuild
|
||||
err := row.Scan(
|
||||
&item.ID,
|
||||
&item.EventID,
|
||||
&item.SourceID,
|
||||
&item.BuildNo,
|
||||
&item.BuildStatus,
|
||||
&item.BuildLog,
|
||||
&item.ManifestJSON,
|
||||
&item.AssetIndexJSON,
|
||||
)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scan event config build: %w", err)
|
||||
}
|
||||
return &item, nil
|
||||
}
|
||||
Reference in New Issue
Block a user