Files
cmr-mini/backend/internal/store/postgres/config_store.go

372 lines
10 KiB
Go

package postgres
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
)
type EventConfigSource struct {
ID string
EventID string
SourceVersionNo int
SourceKind string
SchemaID string
SchemaVersion string
Status string
SourceJSON string
Notes *string
}
type EventConfigBuild struct {
ID string
EventID string
SourceID string
BuildNo int
BuildStatus string
BuildLog *string
ManifestJSON string
AssetIndexJSON string
}
type EventReleaseAsset struct {
ID string
EventReleaseID string
AssetType string
AssetKey string
AssetPath *string
AssetURL string
Checksum *string
SizeBytes *int64
MetaJSON string
}
type UpsertEventConfigSourceParams struct {
EventID string
SourceVersionNo int
SourceKind string
SchemaID string
SchemaVersion string
Status string
Source map[string]any
Notes *string
}
type UpsertEventConfigBuildParams struct {
EventID string
SourceID string
BuildNo int
BuildStatus string
BuildLog *string
Manifest map[string]any
AssetIndex []map[string]any
}
type UpsertEventReleaseAssetParams struct {
EventReleaseID string
AssetType string
AssetKey string
AssetPath *string
AssetURL string
Checksum *string
SizeBytes *int64
Meta map[string]any
}
func (s *Store) UpsertEventConfigSource(ctx context.Context, tx Tx, params UpsertEventConfigSourceParams) (*EventConfigSource, error) {
sourceJSON, err := json.Marshal(params.Source)
if err != nil {
return nil, fmt.Errorf("marshal event config source: %w", err)
}
row := tx.QueryRow(ctx, `
INSERT INTO event_config_sources (
event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb, notes
)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8)
ON CONFLICT (event_id, source_version_no) DO UPDATE SET
source_kind = EXCLUDED.source_kind,
schema_id = EXCLUDED.schema_id,
schema_version = EXCLUDED.schema_version,
status = EXCLUDED.status,
source_jsonb = EXCLUDED.source_jsonb,
notes = EXCLUDED.notes
RETURNING id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
`, params.EventID, params.SourceVersionNo, params.SourceKind, params.SchemaID, params.SchemaVersion, params.Status, string(sourceJSON), params.Notes)
var item EventConfigSource
if err := row.Scan(
&item.ID,
&item.EventID,
&item.SourceVersionNo,
&item.SourceKind,
&item.SchemaID,
&item.SchemaVersion,
&item.Status,
&item.SourceJSON,
&item.Notes,
); err != nil {
return nil, fmt.Errorf("upsert event config source: %w", err)
}
return &item, nil
}
func (s *Store) UpsertEventConfigBuild(ctx context.Context, tx Tx, params UpsertEventConfigBuildParams) (*EventConfigBuild, error) {
manifestJSON, err := json.Marshal(params.Manifest)
if err != nil {
return nil, fmt.Errorf("marshal event config manifest: %w", err)
}
assetIndexJSON, err := json.Marshal(params.AssetIndex)
if err != nil {
return nil, fmt.Errorf("marshal event config asset index: %w", err)
}
row := tx.QueryRow(ctx, `
INSERT INTO event_config_builds (
event_id, source_id, build_no, build_status, build_log, manifest_jsonb, asset_index_jsonb
)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::jsonb)
ON CONFLICT (event_id, build_no) DO UPDATE SET
source_id = EXCLUDED.source_id,
build_status = EXCLUDED.build_status,
build_log = EXCLUDED.build_log,
manifest_jsonb = EXCLUDED.manifest_jsonb,
asset_index_jsonb = EXCLUDED.asset_index_jsonb
RETURNING id, event_id, source_id, build_no, build_status, build_log, manifest_jsonb::text, asset_index_jsonb::text
`, params.EventID, params.SourceID, params.BuildNo, params.BuildStatus, params.BuildLog, string(manifestJSON), string(assetIndexJSON))
var item EventConfigBuild
if err := row.Scan(
&item.ID,
&item.EventID,
&item.SourceID,
&item.BuildNo,
&item.BuildStatus,
&item.BuildLog,
&item.ManifestJSON,
&item.AssetIndexJSON,
); err != nil {
return nil, fmt.Errorf("upsert event config build: %w", err)
}
return &item, nil
}
func (s *Store) AttachBuildToRelease(ctx context.Context, tx Tx, releaseID, buildID string) error {
if _, err := tx.Exec(ctx, `
UPDATE event_releases
SET build_id = $2
WHERE id = $1
`, releaseID, buildID); err != nil {
return fmt.Errorf("attach build to release: %w", err)
}
return nil
}
func (s *Store) ReplaceEventReleaseAssets(ctx context.Context, tx Tx, eventReleaseID string, assets []UpsertEventReleaseAssetParams) error {
if _, err := tx.Exec(ctx, `DELETE FROM event_release_assets WHERE event_release_id = $1`, eventReleaseID); err != nil {
return fmt.Errorf("clear event release assets: %w", err)
}
for _, asset := range assets {
metaJSON, err := json.Marshal(asset.Meta)
if err != nil {
return fmt.Errorf("marshal event release asset meta: %w", err)
}
if _, err := tx.Exec(ctx, `
INSERT INTO event_release_assets (
event_release_id, asset_type, asset_key, asset_path, asset_url, checksum, size_bytes, meta_jsonb
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb)
`, eventReleaseID, asset.AssetType, asset.AssetKey, asset.AssetPath, asset.AssetURL, asset.Checksum, asset.SizeBytes, string(metaJSON)); err != nil {
return fmt.Errorf("insert event release asset: %w", err)
}
}
return nil
}
func (s *Store) NextEventConfigSourceVersion(ctx context.Context, eventID string) (int, error) {
var next int
if err := s.pool.QueryRow(ctx, `
SELECT COALESCE(MAX(source_version_no), 0) + 1
FROM event_config_sources
WHERE event_id = $1
`, eventID).Scan(&next); err != nil {
return 0, fmt.Errorf("next event config source version: %w", err)
}
return next, nil
}
func (s *Store) NextEventConfigBuildNo(ctx context.Context, eventID string) (int, error) {
var next int
if err := s.pool.QueryRow(ctx, `
SELECT COALESCE(MAX(build_no), 0) + 1
FROM event_config_builds
WHERE event_id = $1
`, eventID).Scan(&next); err != nil {
return 0, fmt.Errorf("next event config build no: %w", err)
}
return next, nil
}
func (s *Store) ListEventConfigSourcesByEventID(ctx context.Context, eventID string, limit int) ([]EventConfigSource, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
rows, err := s.pool.Query(ctx, `
SELECT id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
FROM event_config_sources
WHERE event_id = $1
ORDER BY source_version_no DESC
LIMIT $2
`, eventID, limit)
if err != nil {
return nil, fmt.Errorf("list event config sources: %w", err)
}
defer rows.Close()
var items []EventConfigSource
for rows.Next() {
item, err := scanEventConfigSourceFromRows(rows)
if err != nil {
return nil, err
}
items = append(items, *item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate event config sources: %w", err)
}
return items, nil
}
func (s *Store) GetEventConfigSourceByID(ctx context.Context, sourceID string) (*EventConfigSource, error) {
row := s.pool.QueryRow(ctx, `
SELECT id, event_id, source_version_no, source_kind, schema_id, schema_version, status, source_jsonb::text, notes
FROM event_config_sources
WHERE id = $1
LIMIT 1
`, sourceID)
return scanEventConfigSource(row)
}
func (s *Store) GetEventConfigBuildByID(ctx context.Context, buildID string) (*EventConfigBuild, error) {
row := s.pool.QueryRow(ctx, `
SELECT id, event_id, source_id, build_no, build_status, build_log, manifest_jsonb::text, asset_index_jsonb::text
FROM event_config_builds
WHERE id = $1
LIMIT 1
`, buildID)
return scanEventConfigBuild(row)
}
func (s *Store) ListEventConfigBuildsByEventID(ctx context.Context, eventID string, limit int) ([]EventConfigBuild, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
rows, err := s.pool.Query(ctx, `
SELECT id, event_id, source_id, build_no, build_status, build_log, manifest_jsonb::text, asset_index_jsonb::text
FROM event_config_builds
WHERE event_id = $1
ORDER BY build_no DESC
LIMIT $2
`, eventID, limit)
if err != nil {
return nil, fmt.Errorf("list event config builds: %w", err)
}
defer rows.Close()
items := []EventConfigBuild{}
for rows.Next() {
item, err := scanEventConfigBuildFromRows(rows)
if err != nil {
return nil, err
}
items = append(items, *item)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate event config builds: %w", err)
}
return items, nil
}
func scanEventConfigSource(row pgx.Row) (*EventConfigSource, error) {
var item EventConfigSource
err := row.Scan(
&item.ID,
&item.EventID,
&item.SourceVersionNo,
&item.SourceKind,
&item.SchemaID,
&item.SchemaVersion,
&item.Status,
&item.SourceJSON,
&item.Notes,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scan event config source: %w", err)
}
return &item, nil
}
func scanEventConfigSourceFromRows(rows pgx.Rows) (*EventConfigSource, error) {
var item EventConfigSource
if err := rows.Scan(
&item.ID,
&item.EventID,
&item.SourceVersionNo,
&item.SourceKind,
&item.SchemaID,
&item.SchemaVersion,
&item.Status,
&item.SourceJSON,
&item.Notes,
); err != nil {
return nil, fmt.Errorf("scan event config source row: %w", err)
}
return &item, nil
}
func scanEventConfigBuild(row pgx.Row) (*EventConfigBuild, error) {
var item EventConfigBuild
err := row.Scan(
&item.ID,
&item.EventID,
&item.SourceID,
&item.BuildNo,
&item.BuildStatus,
&item.BuildLog,
&item.ManifestJSON,
&item.AssetIndexJSON,
)
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scan event config build: %w", err)
}
return &item, nil
}
func scanEventConfigBuildFromRows(rows pgx.Rows) (*EventConfigBuild, error) {
var item EventConfigBuild
err := rows.Scan(
&item.ID,
&item.EventID,
&item.SourceID,
&item.BuildNo,
&item.BuildStatus,
&item.BuildLog,
&item.ManifestJSON,
&item.AssetIndexJSON,
)
if err != nil {
return nil, fmt.Errorf("scan event config build row: %w", err)
}
return &item, nil
}