From ce2cef8c05012eb904996577dbd1946504ed4240 Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Sat, 16 May 2026 23:46:38 -0500 Subject: [PATCH] Re-validate pending sol_purchases when blocknumber catches up MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A sol_purchases row is inserted with is_valid = NULL ("pending") when its valid_after_blocknumber hasn't been indexed yet at insert time. Until now there was no mechanism to flip pending rows to a final verdict — they stayed NULL indefinitely even after the indexer caught up. This adds a trigger on tracks/playlists that emits NOTIFY 'pending_purchase_revalidation' when blocknumber advances past any pending row's valid_after_blocknumber, plus a Go listener in the Solana indexer that consumes the notification and re-runs the existing validatePurchase. A 5-minute sweep + startup sweep covers cases where NOTIFY drops (no connected listener) or rows that pre-date the trigger. The trigger does no validation work itself — it only signals which content_id changed, so the math stays in validatePurchase as the single source of truth. No SQL port, no parity risk with config-driven values like NetworkTakeRate. --- .../notify_pending_purchase_revalidation.sql | 70 +++ solana/indexer/program/indexer.go | 7 +- .../indexer/program/purchase_revalidator.go | 298 ++++++++++ .../program/purchase_revalidator_test.go | 259 ++++++++ solana/indexer/solana_indexer.go | 3 +- sql/01_schema.sql | 555 +++++++++++++----- 6 files changed, 1059 insertions(+), 133 deletions(-) create mode 100644 ddl/functions/notify_pending_purchase_revalidation.sql create mode 100644 solana/indexer/program/purchase_revalidator.go create mode 100644 solana/indexer/program/purchase_revalidator_test.go diff --git a/ddl/functions/notify_pending_purchase_revalidation.sql b/ddl/functions/notify_pending_purchase_revalidation.sql new file mode 100644 index 00000000..ac5bb613 --- /dev/null +++ b/ddl/functions/notify_pending_purchase_revalidation.sql @@ -0,0 +1,70 @@ +-- Notifies Go to re-run validatePurchase for sol_purchases rows that became +-- eligible because a tracks/playlists row's blocknumber advanced past their +-- valid_after_blocknumber. The trigger does no validation itself — it just +-- signals which content_id changed; the Go indexer LISTENs on +-- 'pending_purchase_revalidation' and re-runs the existing validator so the +-- math stays in one place. A periodic sweep in Go catches anything missed +-- (NOTIFY is dropped if no listener is connected at the moment). +create or replace function notify_pending_purchase_revalidation() returns trigger as $$ +declare + v_content_type text; + v_content_id int; + v_blocknumber int; +begin + if tg_table_name = 'tracks' then + v_content_type := 'track'; + v_content_id := new.track_id; + elsif tg_table_name = 'playlists' then + v_content_type := 'album'; + v_content_id := new.playlist_id; + else + return null; + end if; + + v_blocknumber := new.blocknumber; + if v_blocknumber is null then + return null; + end if; + + -- Cheap EXISTS guard: tracks/playlists update churn dwarfs the pending + -- purchase set, so it's almost always a no-op. Uses sol_purchases_valid_idx + -- + sol_purchases_content_idx. + if exists ( + select 1 from sol_purchases sp + where sp.content_id = v_content_id + and sp.content_type = v_content_type + and sp.is_valid is null + and sp.valid_after_blocknumber <= v_blocknumber + ) then + perform pg_notify( + 'pending_purchase_revalidation', + v_content_type || ':' || v_content_id::text + ); + end if; + + return null; +exception + when others then + -- Never let a notify failure break a tracks/playlists write. The sweep + -- will catch any pending rows that don't get notified. + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + + +do $$ begin + create trigger on_track_notify_pending_purchase_revalidation + after insert or update of blocknumber on tracks + for each row execute procedure notify_pending_purchase_revalidation(); +exception + when others then null; +end $$; + +do $$ begin + create trigger on_playlist_notify_pending_purchase_revalidation + after insert or update of blocknumber on playlists + for each row execute procedure notify_pending_purchase_revalidation(); +exception + when others then null; +end $$; diff --git a/solana/indexer/program/indexer.go b/solana/indexer/program/indexer.go index 058ac363..7da940bf 100644 --- a/solana/indexer/program/indexer.go +++ b/solana/indexer/program/indexer.go @@ -26,6 +26,7 @@ type Indexer struct { rpcClient common.RpcClient config config.Config transactionCache *otter.Cache[solana.Signature, *rpc.GetTransactionResult] + revalidator *Revalidator logger *zap.Logger } @@ -37,17 +38,21 @@ func New( transactionCache *otter.Cache[solana.Signature, *rpc.GetTransactionResult], logger *zap.Logger, ) *Indexer { + namedLogger := logger.Named(NAME) return &Indexer{ pool: pool, grpcConfig: grpcConfig, rpcClient: rpcClient, config: config, transactionCache: transactionCache, - logger: logger.Named(NAME), + revalidator: NewRevalidator(pool, config, namedLogger), + logger: namedLogger, } } func (d *Indexer) Start(ctx context.Context) { + d.revalidator.Start(ctx) + client, err := d.subscribe(ctx) if err != nil { d.logger.Fatal("failed to start subscription", zap.Error(err)) diff --git a/solana/indexer/program/purchase_revalidator.go b/solana/indexer/program/purchase_revalidator.go new file mode 100644 index 00000000..adf0e5cf --- /dev/null +++ b/solana/indexer/program/purchase_revalidator.go @@ -0,0 +1,298 @@ +package program + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/solana/spl/programs/payment_router" + "github.com/gagliardetto/solana-go" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +const ( + purchaseRevalidationChannel = "pending_purchase_revalidation" + sweepInterval = 5 * time.Minute + listenerReconnectDelay = 5 * time.Second +) + +// Revalidator resolves sol_purchases rows whose is_valid was left NULL at +// insert time because their valid_after_blocknumber hadn't been indexed yet. +// Triggered by notify_pending_purchase_revalidation when tracks/playlists +// blocknumber advances, plus a periodic sweep for safety. +type Revalidator struct { + pool database.DbPool + config config.Config + logger *zap.Logger +} + +func NewRevalidator(pool database.DbPool, cfg config.Config, logger *zap.Logger) *Revalidator { + return &Revalidator{ + pool: pool, + config: cfg, + logger: logger.Named("PurchaseRevalidator"), + } +} + +func (r *Revalidator) Start(ctx context.Context) { + go r.runListener(ctx) + go r.runSweep(ctx) +} + +func (r *Revalidator) runListener(ctx context.Context) { + for { + if ctx.Err() != nil { + return + } + if err := r.listenLoop(ctx); err != nil && !errors.Is(err, context.Canceled) { + r.logger.Error("listener loop ended, reconnecting", zap.Error(err)) + } + select { + case <-ctx.Done(): + return + case <-time.After(listenerReconnectDelay): + } + } +} + +func (r *Revalidator) listenLoop(ctx context.Context) error { + conn, err := r.pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("acquire listener conn: %w", err) + } + defer conn.Release() + + if _, err := conn.Exec(ctx, "LISTEN "+purchaseRevalidationChannel); err != nil { + return fmt.Errorf("LISTEN: %w", err) + } + r.logger.Info("listening for purchase revalidation notifications") + + for { + n, err := conn.Conn().WaitForNotification(ctx) + if err != nil { + return err + } + contentType, contentId, ok := parseRevalidationPayload(n.Payload) + if !ok { + r.logger.Warn("malformed revalidation payload", zap.String("payload", n.Payload)) + continue + } + if err := r.revalidateContent(ctx, contentType, contentId); err != nil { + r.logger.Error("revalidate content failed", + zap.Error(err), + zap.String("content_type", contentType), + zap.Int32("content_id", contentId), + ) + } + } +} + +func (r *Revalidator) runSweep(ctx context.Context) { + // Sweep once on startup so rows that went pending while the indexer was + // down (NOTIFY drops if no listener is connected) get picked up. + r.sweep(ctx) + + ticker := time.NewTicker(sweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.sweep(ctx) + } + } +} + +func (r *Revalidator) sweep(ctx context.Context) { + // MAX(blocks.number) is the same gating predicate validatePurchase uses; + // matching it here avoids waking up rows the validator would just put back + // to pending. + sql := ` + SELECT DISTINCT content_type, content_id + FROM sol_purchases + WHERE is_valid IS NULL + AND valid_after_blocknumber <= (SELECT COALESCE(MAX(number), 0) FROM blocks) + ` + rows, err := r.pool.Query(ctx, sql) + if err != nil { + r.logger.Error("sweep query failed", zap.Error(err)) + return + } + type contentRef struct { + ContentType string + ContentID int32 + } + refs, err := pgx.CollectRows(rows, pgx.RowToStructByPos[contentRef]) + if err != nil { + r.logger.Error("sweep collect failed", zap.Error(err)) + return + } + if len(refs) == 0 { + return + } + r.logger.Info("revalidation sweep", zap.Int("eligible_content_count", len(refs))) + for _, ref := range refs { + if ctx.Err() != nil { + return + } + if err := r.revalidateContent(ctx, ref.ContentType, ref.ContentID); err != nil { + r.logger.Error("sweep revalidate failed", + zap.Error(err), + zap.String("content_type", ref.ContentType), + zap.Int32("content_id", ref.ContentID), + ) + } + } +} + +type pendingRow struct { + Signature string + InstructionIndex int32 + ContentType string + ContentID int32 + BuyerUserID int32 + AccessType string + ValidAfterBlocknumber int64 + PurchaseTime time.Time +} + +func (r *Revalidator) revalidateContent(ctx context.Context, contentType string, contentId int32) error { + // created_at is within seconds of block time for Go-indexer writes — the + // only rows that can be pending. Used as the timestamp for historical + // price + payout-wallet lookups in validatePurchase. + sql := ` + SELECT signature, instruction_index, content_type, content_id, + buyer_user_id, access_type, valid_after_blocknumber, + COALESCE(created_at, NOW()) AS purchase_time + FROM sol_purchases + WHERE content_type = $1 + AND content_id = $2 + AND is_valid IS NULL + ` + rows, err := r.pool.Query(ctx, sql, contentType, contentId) + if err != nil { + return fmt.Errorf("query pending rows: %w", err) + } + pending, err := pgx.CollectRows(rows, pgx.RowToStructByPos[pendingRow]) + if err != nil { + return fmt.Errorf("collect pending rows: %w", err) + } + for _, p := range pending { + if ctx.Err() != nil { + return ctx.Err() + } + if err := r.revalidateRow(ctx, p); err != nil { + r.logger.Error("revalidate row failed", + zap.Error(err), + zap.String("signature", p.Signature), + zap.Int32("instruction_index", p.InstructionIndex), + ) + } + } + return nil +} + +func (r *Revalidator) revalidateRow(ctx context.Context, p pendingRow) error { + routes, err := r.loadRoutes(ctx, p.Signature, int(p.InstructionIndex)) + if err != nil { + return fmt.Errorf("load routes: %w", err) + } + if len(routes) == 0 { + return fmt.Errorf("no sol_payments rows for purchase") + } + + // Reconstruct just enough Route for validatePurchase — only GetRouteMap is + // consumed downstream, so the sender/owner/bump are zero-valued. + inst := payment_router.NewRouteInstruction( + solana.PublicKey{}, + solana.PublicKey{}, + 0, + routes, + ) + + memo := parsedPurchaseMemo{ + ContentType: p.ContentType, + ContentId: int(p.ContentID), + BuyerUserId: int(p.BuyerUserID), + ValidAfterBlocknumber: int(p.ValidAfterBlocknumber), + AccessType: p.AccessType, + } + + isValid, err := validatePurchase(ctx, r.config, r.pool, inst, memo, p.PurchaseTime) + if err != nil { + // validatePurchase returns an error alongside a non-nil false isValid + // when payments don't match expected splits. That's a final verdict; + // fall through and write it. + r.logger.Debug("revalidation determined invalid", + zap.Error(err), + zap.String("signature", p.Signature), + ) + } + if isValid == nil { + // Still pending — blocks table hasn't actually caught up, or another + // edge case. Leave for the next notify/sweep. + r.logger.Warn("revalidation triggered but not ready", + zap.String("signature", p.Signature), + ) + return nil + } + + // Guarded UPDATE: only set if still pending. Prevents racing with another + // notification handler that finished a moment earlier. + _, err = r.pool.Exec(ctx, ` + UPDATE sol_purchases + SET is_valid = $1 + WHERE signature = $2 + AND instruction_index = $3 + AND is_valid IS NULL + `, *isValid, p.Signature, p.InstructionIndex) + if err != nil { + return fmt.Errorf("update is_valid: %w", err) + } + return nil +} + +func (r *Revalidator) loadRoutes(ctx context.Context, signature string, instructionIndex int) (map[solana.PublicKey]uint64, error) { + rows, err := r.pool.Query(ctx, + `SELECT to_account, amount FROM sol_payments WHERE signature = $1 AND instruction_index = $2`, + signature, instructionIndex, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + routes := make(map[solana.PublicKey]uint64) + for rows.Next() { + var account string + var amount int64 + if err := rows.Scan(&account, &amount); err != nil { + return nil, err + } + pk, err := solana.PublicKeyFromBase58(account) + if err != nil { + return nil, fmt.Errorf("invalid to_account %q: %w", account, err) + } + routes[pk] = uint64(amount) + } + return routes, rows.Err() +} + +func parseRevalidationPayload(payload string) (contentType string, contentId int32, ok bool) { + parts := strings.SplitN(payload, ":", 2) + if len(parts) != 2 { + return "", 0, false + } + id, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return "", 0, false + } + return parts[0], int32(id), true +} diff --git a/solana/indexer/program/purchase_revalidator_test.go b/solana/indexer/program/purchase_revalidator_test.go new file mode 100644 index 00000000..4a3f2e51 --- /dev/null +++ b/solana/indexer/program/purchase_revalidator_test.go @@ -0,0 +1,259 @@ +package program + +import ( + "context" + "strconv" + "testing" + "time" + + "api.audius.co/config" + "api.audius.co/database" + "github.com/gagliardetto/solana-go" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/test-go/testify/assert" + "github.com/test-go/testify/require" + "go.uber.org/zap" +) + +func TestParseRevalidationPayload(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + payload string + wantType string + wantID int32 + wantOK bool + }{ + {"track", "track:1234", "track", 1234, true}, + {"album", "album:42", "album", 42, true}, + {"empty", "", "", 0, false}, + {"no separator", "track1234", "", 0, false}, + {"non numeric id", "track:abc", "", 0, false}, + {"id with extra colon", "track:12:34", "", 0, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + gotType, gotID, gotOK := parseRevalidationPayload(tc.payload) + assert.Equal(t, tc.wantOK, gotOK) + if tc.wantOK { + assert.Equal(t, tc.wantType, gotType) + assert.Equal(t, tc.wantID, gotID) + } + }) + } +} + +// Direct call to revalidateContent: pending row with matching payments and a +// caught-up blocks table should flip to valid. +func TestRevalidatorRevalidateContent(t *testing.T) { + ctx := t.Context() + pool := database.CreateTestDatabase(t, "test_solana_indexer_program") + + const ( + sellerUserId = 1 + trackId = 42 + priceCents = 100 + validAfterBlocknumber = 200 + signature = "test-sig-direct" + ) + priceUsdc := float64(priceCents * 10000) + payoutWallet := solana.NewWallet().PublicKey() + networkSplit := int64(priceUsdc * config.Cfg.NetworkTakeRate / 100.0) + payoutSplit := int64(priceUsdc) - networkSplit + + // FK constraint: tracks.blocknumber references blocks.number. Insert the + // target block first so the tracks fixture can reference it. + insertBlock(t, pool, validAfterBlocknumber) + seedRevalidatorFixtures(pool, sellerUserId, trackId, priceCents, payoutWallet, validAfterBlocknumber, validAfterBlocknumber) + insertPendingPurchase(t, pool, signature, trackId, validAfterBlocknumber) + insertPaymentRows(t, pool, signature, map[string]int64{ + payoutWallet.String(): payoutSplit, + config.Cfg.SolanaConfig.StakingBridgeUsdcTokenAccount.String(): networkSplit, + }) + + rev := NewRevalidator(pool, config.Cfg, zap.NewNop()) + require.NoError(t, rev.revalidateContent(ctx, "track", trackId)) + + isValid := scanIsValid(t, pool, signature) + require.NotNil(t, isValid) + assert.True(t, *isValid, "fully-paid purchase should be marked valid") +} + +// End-to-end: install the trigger, start the revalidator, then UPDATE the +// tracks blocknumber. The trigger should fire NOTIFY, the listener should +// consume it, and is_valid should flip. +func TestRevalidatorEndToEnd(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + pool := database.CreateTestDatabase(t, "test_solana_indexer_program") + + const ( + sellerUserId = 1 + trackId = 99 + priceCents = 100 + validAfterBlocknumber = 300 + signature = "test-sig-e2e" + ) + priceUsdc := float64(priceCents * 10000) + payoutWallet := solana.NewWallet().PublicKey() + networkSplit := int64(priceUsdc * config.Cfg.NetworkTakeRate / 100.0) + payoutSplit := int64(priceUsdc) - networkSplit + + // Insert the target block first (FK from tracks.blocknumber). The Seed + // helper auto-inserts block 101, so start the track there — it's below + // validAfterBlocknumber, leaving room to bump it up later and drive the + // trigger path rather than the startup sweep. + insertBlock(t, pool, validAfterBlocknumber) + seedRevalidatorFixtures(pool, sellerUserId, trackId, priceCents, payoutWallet, 101, validAfterBlocknumber) + insertPendingPurchase(t, pool, signature, trackId, validAfterBlocknumber) + insertPaymentRows(t, pool, signature, map[string]int64{ + payoutWallet.String(): payoutSplit, + config.Cfg.SolanaConfig.StakingBridgeUsdcTokenAccount.String(): networkSplit, + }) + + rev := NewRevalidator(pool, config.Cfg, zap.NewNop()) + rev.Start(ctx) + + // Wait for the startup sweep to settle. It would resolve this row itself + // (blocks is already past validAfterBlocknumber), so re-pend after the + // sweep finishes to make sure the assertion is driven purely by the + // trigger. + time.Sleep(300 * time.Millisecond) + requireExec(t, pool, + `UPDATE sol_purchases SET is_valid = NULL WHERE signature = $1`, + signature, + ) + + requireExec(t, pool, + `UPDATE tracks SET blocknumber = $1 WHERE track_id = $2`, + validAfterBlocknumber, trackId, + ) + + pollUntil(t, 5*time.Second, 25*time.Millisecond, func() bool { + isValid := scanIsValid(t, pool, signature) + return isValid != nil && *isValid + }, "is_valid should flip via trigger+listener") +} + +// --- helpers --- + +func seedRevalidatorFixtures( + pool *pgxpool.Pool, + sellerUserId, trackId, priceCents int, + payoutWallet solana.PublicKey, + trackBlocknumber int, + priceBlocknumber int, +) { + database.Seed(pool, database.FixtureMap{ + "users": { + {"user_id": sellerUserId}, + }, + "user_payout_wallet_history": { + { + "user_id": sellerUserId, + "spl_usdc_payout_wallet": payoutWallet.String(), + }, + }, + "tracks": { + { + "track_id": trackId, + "owner_id": sellerUserId, + "blocknumber": trackBlocknumber, + }, + }, + "track_price_history": { + { + "track_id": trackId, + "splits": `[{"user_id": ` + strconv.Itoa(sellerUserId) + `, "percentage": 100}]`, + "total_price_cents": priceCents, + // Must be >= the purchase's valid_after_blocknumber so + // getRelevantPrice's `blocknumber >= @blocknumber` predicate + // matches this row. + "blocknumber": priceBlocknumber, + }, + }, + }) +} + +func insertBlock(t *testing.T, pool *pgxpool.Pool, number int) { + t.Helper() + // is_current is left false to avoid colliding with the auto-inserted block + // 101 that database.Seed adds (a partial unique index allows only one row + // with is_current=true). validatePurchase reads MAX(number), not by + // is_current, so this is fine for these tests. + _, err := pool.Exec(t.Context(), ` + INSERT INTO blocks (blockhash, parenthash, is_current, number) + VALUES ('test-block-' || $1::integer::text, NULL, false, $1::integer) + ON CONFLICT DO NOTHING + `, number) + require.NoError(t, err) +} + +func insertPendingPurchase(t *testing.T, pool *pgxpool.Pool, signature string, contentId, validAfterBlocknumber int) { + t.Helper() + _, err := pool.Exec(t.Context(), ` + INSERT INTO sol_purchases ( + signature, instruction_index, amount, slot, + from_account, content_type, content_id, buyer_user_id, + access_type, valid_after_blocknumber, is_valid + ) VALUES ( + $1, 0, 1000000, 1, + 'from-account', 'track', $2, 2, + 'stream', $3, NULL + ) + `, signature, contentId, validAfterBlocknumber) + require.NoError(t, err) +} + +func insertPaymentRows(t *testing.T, pool *pgxpool.Pool, signature string, routes map[string]int64) { + t.Helper() + routeIndex := 0 + for account, amount := range routes { + _, err := pool.Exec(t.Context(), ` + INSERT INTO sol_payments (signature, instruction_index, route_index, to_account, amount, slot) + VALUES ($1, 0, $2, $3, $4, 1) + `, signature, routeIndex, account, amount) + require.NoError(t, err) + routeIndex++ + } +} + +func scanIsValid(t *testing.T, pool *pgxpool.Pool, signature string) *bool { + t.Helper() + var isValid *bool + err := pool.QueryRow(t.Context(), + `SELECT is_valid FROM sol_purchases WHERE signature = $1 AND instruction_index = 0`, + signature, + ).Scan(&isValid) + if err == pgx.ErrNoRows { + return nil + } + require.NoError(t, err) + return isValid +} + +func requireExec(t *testing.T, pool *pgxpool.Pool, sql string, args ...any) { + t.Helper() + _, err := pool.Exec(t.Context(), sql, args...) + require.NoError(t, err) +} + +// pollUntil polls cond until it returns true or timeout elapses. Replacement +// for testify Eventually (not in this fork of testify). +func pollUntil(t *testing.T, timeout, interval time.Duration, cond func() bool, msg string) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(interval) + } + t.Fatalf("condition not met within %s: %s", timeout, msg) +} + diff --git a/solana/indexer/solana_indexer.go b/solana/indexer/solana_indexer.go index 8c6ffb38..5a95c1d3 100644 --- a/solana/indexer/solana_indexer.go +++ b/solana/indexer/solana_indexer.go @@ -55,9 +55,10 @@ func New(config config.Config) *SolanaIndexer { // The min write pool size is set to the number of workers // plus 1 for the connection that listens for artist_coins changes, + // plus 1 for the purchase revalidator's LISTEN connection, // and add 10 as a buffer. workerCount := int32(config.SolanaIndexerWorkers) - connConfig.MaxConns = workerCount + 1 + 10 + connConfig.MaxConns = workerCount + 2 + 10 pool, err := pgxpool.NewWithConfig(context.Background(), connConfig) if err != nil { diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 305ca77b..cbd5dced 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -3,8 +3,8 @@ -- --- Dumped from database version 17.9 (Debian 17.9-1.pgdg13+1) --- Dumped by pg_dump version 17.9 (Debian 17.9-1.pgdg13+1) +-- Dumped from database version 17.10 (Debian 17.10-1.pgdg13+1) +-- Dumped by pg_dump version 17.10 (Debian 17.10-1.pgdg13+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2398,8 +2398,7 @@ begin -- Only create notifications if the track is public if track_is_public then - -- For each follower of the event creator and each user who favorited the track - -- Using UNION to ensure we don't get duplicate user_ids + -- Followers of the host, track savers, and users who follow the contest (event subscribers) for notified_user_id in select distinct user_id from ( @@ -2417,6 +2416,13 @@ begin and s.save_type = 'track' and s.is_current = true and s.is_delete = false + union + select sub.subscriber_id as user_id + from subscriptions sub + where sub.entity_type = 'Event' + and sub.user_id = new.event_id + and sub.is_current = true + and sub.is_delete = false ) as users_to_notify loop -- Create a notification for this user @@ -3669,6 +3675,159 @@ END; $$; +-- +-- Name: handle_sol_purchase(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_sol_purchase() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + resolved_seller_user_id integer; +begin + if new.is_valid is not true then + return null; + end if; + + if new.content_type = 'track' then + select owner_id into resolved_seller_user_id + from tracks + where track_id = new.content_id + and is_current = true + limit 1; + else + select playlist_owner_id into resolved_seller_user_id + from playlists + where playlist_id = new.content_id + and is_current = true + limit 1; + end if; + + if resolved_seller_user_id is null then + return null; + end if; + + insert into notification + (slot, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.slot, + ARRAY [resolved_seller_user_id], + new.created_at, + 'usdc_purchase_seller', + new.buyer_user_id, + 'usdc_purchase_seller:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type, + json_build_object( + 'content_type', new.content_type, + 'buyer_user_id', new.buyer_user_id, + 'seller_user_id', resolved_seller_user_id, + 'amount', new.amount, + 'extra_amount', null, + 'content_id', new.content_id, + 'vendor', null + ) + ), + ( + new.slot, + ARRAY [new.buyer_user_id], + new.created_at, + 'usdc_purchase_buyer', + new.buyer_user_id, + 'usdc_purchase_buyer:' || 'seller_user_id:' || resolved_seller_user_id || ':buyer_user_id:' || new.buyer_user_id || ':content_id:' || new.content_id || ':content_type:' || new.content_type, + json_build_object( + 'content_type', new.content_type, + 'buyer_user_id', new.buyer_user_id, + 'seller_user_id', resolved_seller_user_id, + 'amount', new.amount, + 'extra_amount', null, + 'content_id', new.content_id, + 'vendor', null + ) + ) + on conflict do nothing; + + return null; + exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$; + + +-- +-- Name: handle_sol_reward_disbursement(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_sol_reward_disbursement() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + resolved_user_id integer; + existing_notification integer; + reward_code_exists boolean; +begin + select users.user_id + into resolved_user_id + from users + where users.wallet = new.recipient_eth_address + and users.is_current = true + limit 1; + + if resolved_user_id is null then + return null; + end if; + + select exists(select 1 from reward_codes where code = new.challenge_id) into reward_code_exists; + + if not reward_code_exists then + select id into existing_notification + from notification + where type = 'challenge_reward' + and resolved_user_id = any(user_ids) + and timestamp >= (new.created_at - interval '1 hour') + limit 1; + + if existing_notification is null then + insert into notification + (slot, user_ids, timestamp, type, group_id, specifier, data) + values + ( + new.slot, + ARRAY [resolved_user_id], + new.created_at, + 'challenge_reward', + 'challenge_reward:' || resolved_user_id || ':challenge:' || new.challenge_id || ':specifier:' || new.specifier, + resolved_user_id, + json_build_object('specifier', new.specifier, 'challenge_id', new.challenge_id, 'amount', new.amount) + ) + on conflict do nothing; + end if; + end if; + + perform pg_notify( + 'challenge_disbursed', + json_build_object( + 'user_id', resolved_user_id, + 'challenge_id', new.challenge_id, + 'specifier', new.specifier, + 'amount', new.amount, + 'signature', new.signature, + 'slot', new.slot + )::text + ); + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; + +end; +$$; + + -- -- Name: handle_sol_token_balance_change(); Type: FUNCTION; Schema: public; Owner: - -- @@ -3807,8 +3966,8 @@ begin count(*) from tracks t where t.is_current is true - and t.is_delete is false - and t.is_available is true + and t.is_delete = false + and t.is_available = true and t.stem_of is null and t.access_authorities is null and t.owner_id = new.owner_id @@ -3873,7 +4032,7 @@ begin raise warning 'An error occurred in %: %', tg_name, sqlerrm; end; - -- If new remix is a submission to an active remix contest, check for milestone notifications + -- If new remix is a submission to an active remix contest, milestones and follower alerts begin if track_should_notify(OLD, new, TG_OP) AND new.remix_of is not null THEN declare @@ -3882,14 +4041,15 @@ begin submission_count int; milestone int; parent_track_id int := (new.remix_of->'tracks'->0->>'parent_track_id')::int; + contest_follower int; begin - select event_id, user_id + select e.event_id, e.user_id into contest_event_id, contest_creator_id - from events - where event_type = 'remix_contest' - and is_deleted = false - and end_date > now() - and entity_id = parent_track_id + from events e + where e.event_type = 'remix_contest' + and e.is_deleted = false + and (e.end_date is null or e.end_date > now()) + and e.entity_id = parent_track_id limit 1; if contest_event_id is not null then @@ -3899,6 +4059,7 @@ begin join events e on e.event_type = 'remix_contest' and e.is_deleted = false and e.entity_id = parent_track_id + and e.event_id = contest_event_id where t.is_current = true and t.is_delete = false and t.remix_of is not null @@ -3927,6 +4088,44 @@ begin on conflict do nothing; END IF; END LOOP; + + -- Notify everyone following the contest (and the host) of a new submission, + -- excluding the submitter. The host is included even if they are not a + -- subscriber so they get a per-submission alert in addition to the + -- existing milestone-based artist_remix_contest_submissions notification. + for contest_follower in + select user_id from ( + select s.subscriber_id as user_id + from subscriptions s + where s.entity_type = 'Event' + and s.user_id = contest_event_id + and s.is_current = true + and s.is_delete = false + union + select contest_creator_id as user_id + ) recipients + where user_id != new.owner_id + loop + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.blocknumber, + ARRAY [contest_follower], + new.updated_at, + 'fan_remix_contest_submission', + contest_follower, + 'fan_remix_contest_submission:' || contest_event_id || ':submission:' || new.track_id, + json_build_object( + 'event_id', contest_event_id, + 'entity_id', parent_track_id, + 'entity_user_id', contest_creator_id, + 'submission_track_id', new.track_id, + 'submitter_user_id', new.owner_id + ) + ) + on conflict do nothing; + end loop; end if; end; end if; @@ -3937,8 +4136,7 @@ begin -- If a track with an active remix contest transitions from unlisted to public, -- create fan_remix_contest_started notifications for the contest creator's - -- followers and the track's savers. Mirrors handle_event.sql for the case - -- where the contest was created while the track was still unlisted. + -- followers, the track's savers, and contest followers. begin if TG_OP = 'UPDATE' and OLD.is_unlisted = true and new.is_unlisted = false then insert into notification @@ -3968,10 +4166,17 @@ begin and s.save_type = 'track' and s.is_current = true and s.is_delete = false + union + select sub.subscriber_id as user_id + from subscriptions sub + where sub.entity_type = 'Event' + and sub.user_id = e.event_id + and sub.is_current = true + and sub.is_delete = false ) u on true where e.event_type = 'remix_contest' and e.is_deleted = false - and e.end_date > now() + and (e.end_date is null or e.end_date > now()) and e.entity_id = new.track_id on conflict do nothing; end if; @@ -3983,9 +4188,9 @@ begin return null; exception - when others then - raise warning 'An error occurred in %: %', tg_name, sqlerrm; - raise; + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + raise; end; $$; @@ -4722,6 +4927,60 @@ END; $$; +-- +-- Name: notify_pending_purchase_revalidation(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.notify_pending_purchase_revalidation() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + v_content_type text; + v_content_id int; + v_blocknumber int; +begin + if tg_table_name = 'tracks' then + v_content_type := 'track'; + v_content_id := new.track_id; + elsif tg_table_name = 'playlists' then + v_content_type := 'album'; + v_content_id := new.playlist_id; + else + return null; + end if; + + v_blocknumber := new.blocknumber; + if v_blocknumber is null then + return null; + end if; + + -- Cheap EXISTS guard: tracks/playlists update churn dwarfs the pending + -- purchase set, so it's almost always a no-op. Uses sol_purchases_valid_idx + -- + sol_purchases_content_idx. + if exists ( + select 1 from sol_purchases sp + where sp.content_id = v_content_id + and sp.content_type = v_content_type + and sp.is_valid is null + and sp.valid_after_blocknumber <= v_blocknumber + ) then + perform pg_notify( + 'pending_purchase_revalidation', + v_content_type || ':' || v_content_id::text + ); + end if; + + return null; +exception + when others then + -- Never let a notify failure break a tracks/playlists write. The sweep + -- will catch any pending rows that don't get notified. + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$; + + -- -- Name: on_new_row(); Type: FUNCTION; Schema: public; Owner: - -- @@ -9331,6 +9590,83 @@ CREATE TABLE public.user_tips ( ); +-- +-- Name: v_challenge_disbursements; Type: VIEW; Schema: public; Owner: - +-- + +CREATE VIEW public.v_challenge_disbursements AS + SELECT rd.challenge_id, + rd.specifier, + (rd.amount)::text AS amount, + rd.signature, + rd.slot, + rd.created_at, + users.user_id + FROM (public.sol_reward_disbursements rd + JOIN public.users ON ((((users.wallet)::text = rd.recipient_eth_address) AND (users.is_current = true)))); + + +-- +-- Name: VIEW v_challenge_disbursements; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON VIEW public.v_challenge_disbursements IS 'Compatibility view that exposes sol_reward_disbursements in the column shape the API routes used to read from challenge_disbursements. Resolves user_id via the indexer-populated recipient_eth_address (see migration 0172).'; + + +-- +-- Name: v_usdc_purchases; Type: VIEW; Schema: public; Owner: - +-- + +CREATE VIEW public.v_usdc_purchases AS + SELECT sp.signature, + sp.slot, + sp.buyer_user_id, + CASE sp.content_type + WHEN 'track'::text THEN t.owner_id + WHEN 'album'::text THEN p.playlist_owner_id + WHEN 'playlist'::text THEN p.playlist_owner_id + ELSE NULL::integer + END AS seller_user_id, + sp.amount, + (sp.content_type)::public.usdc_purchase_content_type AS content_type, + sp.content_id, + sp.created_at, + GREATEST((sp.amount - COALESCE( + CASE sp.content_type + WHEN 'track'::text THEN ( SELECT (tph.total_price_cents * 10000) + FROM public.track_price_history tph + WHERE ((tph.track_id = sp.content_id) AND (tph.block_timestamp <= sp.created_at)) + ORDER BY tph.block_timestamp DESC + LIMIT 1) + ELSE ( SELECT (aph.total_price_cents * 10000) + FROM public.album_price_history aph + WHERE ((aph.playlist_id = sp.content_id) AND (aph.block_timestamp <= sp.created_at)) + ORDER BY aph.block_timestamp DESC + LIMIT 1) + END, (0)::bigint)), (0)::bigint) AS extra_amount, + (sp.access_type)::public.usdc_purchase_access_type AS access, + sp.city, + sp.region, + sp.country, + ( SELECT COALESCE(jsonb_agg(jsonb_build_object('user_id', COALESCE(u_payout.user_id, u_sca.user_id), 'payout_wallet', pay.to_account, 'amount', pay.amount, 'percentage', (((pay.amount)::numeric * 100.0) / (NULLIF(sp.amount, 0))::numeric)) ORDER BY pay.route_index), '[]'::jsonb) AS "coalesce" + FROM (((public.sol_payments pay + LEFT JOIN public.users u_payout ON ((((u_payout.spl_usdc_payout_wallet)::text = (pay.to_account)::text) AND (u_payout.is_current = true)))) + LEFT JOIN public.sol_claimable_accounts sca ON ((((sca.account)::text = (pay.to_account)::text) AND ((sca.mint)::text = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v'::text)))) + LEFT JOIN public.users u_sca ON ((((u_sca.wallet)::text = (sca.ethereum_address)::text) AND (u_sca.is_current = true)))) + WHERE (((pay.signature)::text = (sp.signature)::text) AND (pay.instruction_index = sp.instruction_index))) AS splits + FROM ((public.sol_purchases sp + LEFT JOIN public.tracks t ON ((((sp.content_type)::text = 'track'::text) AND (t.track_id = sp.content_id) AND (t.is_current = true)))) + LEFT JOIN public.playlists p ON ((((sp.content_type)::text = ANY ((ARRAY['album'::character varying, 'playlist'::character varying])::text[])) AND (p.playlist_id = sp.content_id) AND (p.is_current = true)))) + WHERE (sp.is_valid IS TRUE); + + +-- +-- Name: VIEW v_usdc_purchases; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON VIEW public.v_usdc_purchases IS 'Compatibility view exposing sol_purchases + sol_payments in the column shape API routes used to read from usdc_purchases. seller_user_id is the current content owner (not snapshotted at purchase time). extra_amount is amount paid minus base price from price history. vendor is intentionally dropped.'; + + -- -- Name: volume_leader_exclusions; Type: TABLE; Schema: public; Owner: - -- @@ -11256,6 +11592,20 @@ CREATE INDEX idx_playlist_status ON public.playlists USING btree (playlist_id, i CREATE INDEX idx_playlist_tracks_track_id ON public.playlist_tracks USING btree (track_id, created_at); +-- +-- Name: idx_playlists_albums_published; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_playlists_albums_published ON public.playlists USING btree (playlist_id) WHERE ((is_album = true) AND (is_delete = false) AND (is_current = true)); + + +-- +-- Name: INDEX idx_playlists_albums_published; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.idx_playlists_albums_published IS 'Partial index for GetTracks album_backlink subquery; lets non-album lookups skip the heap entirely.'; + + -- -- Name: idx_reward_manager_txs_slot; Type: INDEX; Schema: public; Owner: - -- @@ -11536,13 +11886,6 @@ CREATE INDEX ix_subscriptions_blocknumber ON public.subscriptions USING btree (b CREATE INDEX ix_subscriptions_user_id ON public.subscriptions USING btree (user_id); --- --- Name: subscriptions_entity_type_entity_id_idx; Type: INDEX; Schema: public; Owner: - --- - -CREATE INDEX subscriptions_entity_type_entity_id_idx ON public.subscriptions USING btree (entity_type, entity_id) WHERE ((is_current = true) AND (is_delete = false)); - - -- -- Name: ix_supporter_rank_ups_receiver_user_id; Type: INDEX; Schema: public; Owner: - -- @@ -11943,24 +12286,24 @@ COMMENT ON INDEX public.sol_purchases_content_idx IS 'Used for getting sales of -- --- Name: sol_purchases_from_account_idx; Type: INDEX; Schema: public; Owner: - +-- Name: sol_purchases_created_at_idx; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX sol_purchases_from_account_idx ON public.sol_purchases USING btree (from_account, is_valid); +CREATE INDEX sol_purchases_created_at_idx ON public.sol_purchases USING btree (created_at); -- --- Name: INDEX sol_purchases_from_account_idx; Type: COMMENT; Schema: public; Owner: - +-- Name: sol_purchases_from_account_idx; Type: INDEX; Schema: public; Owner: - -- -COMMENT ON INDEX public.sol_purchases_from_account_idx IS 'Used for getting purchases by a user via their account.'; +CREATE INDEX sol_purchases_from_account_idx ON public.sol_purchases USING btree (from_account, is_valid); -- --- Name: sol_purchases_created_at_idx; Type: INDEX; Schema: public; Owner: - +-- Name: INDEX sol_purchases_from_account_idx; Type: COMMENT; Schema: public; Owner: - -- -CREATE INDEX sol_purchases_created_at_idx ON public.sol_purchases USING btree (created_at); +COMMENT ON INDEX public.sol_purchases_from_account_idx IS 'Used for getting purchases by a user via their account.'; -- @@ -11991,6 +12334,20 @@ CREATE INDEX sol_reward_disbursements_challenge_idx ON public.sol_reward_disburs COMMENT ON INDEX public.sol_reward_disbursements_challenge_idx IS 'Used for getting reward disbursements for a specific challenge type or claim.'; +-- +-- Name: sol_reward_disbursements_created_at_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX sol_reward_disbursements_created_at_idx ON public.sol_reward_disbursements USING btree (created_at); + + +-- +-- Name: sol_reward_disbursements_recipient_eth_address_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX sol_reward_disbursements_recipient_eth_address_idx ON public.sol_reward_disbursements USING btree (recipient_eth_address); + + -- -- Name: sol_reward_disbursements_user_bank_idx; Type: INDEX; Schema: public; Owner: - -- @@ -12110,6 +12467,13 @@ CREATE INDEX sol_user_balances_mint_user_id_idx ON public.sol_user_balances USIN COMMENT ON INDEX public.sol_user_balances_mint_user_id_idx IS 'Index for quick access to user balances by mint and user ID.'; +-- +-- Name: subscriptions_entity_type_entity_id_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX subscriptions_entity_type_entity_id_idx ON public.subscriptions USING btree (entity_type, entity_id) WHERE ((is_current = true) AND (is_delete = false)); + + -- -- Name: tag_track_user_idx; Type: INDEX; Schema: public; Owner: - -- @@ -12390,6 +12754,13 @@ CREATE TRIGGER on_play AFTER INSERT ON public.plays FOR EACH ROW EXECUTE FUNCTIO CREATE TRIGGER on_playlist AFTER INSERT ON public.playlists FOR EACH ROW EXECUTE FUNCTION public.handle_playlist(); +-- +-- Name: playlists on_playlist_notify_pending_purchase_revalidation; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_playlist_notify_pending_purchase_revalidation AFTER INSERT OR UPDATE OF blocknumber ON public.playlists FOR EACH ROW EXECUTE FUNCTION public.notify_pending_purchase_revalidation(); + + -- -- Name: playlist_tracks on_playlist_track; Type: TRIGGER; Schema: public; Owner: - -- @@ -12446,6 +12817,20 @@ CREATE TRIGGER on_sol_claimable_accounts AFTER INSERT ON public.sol_claimable_ac COMMENT ON TRIGGER on_sol_claimable_accounts ON public.sol_claimable_accounts IS 'Updates sol_user_balances whenever a sol_claimable_account is inserted.'; +-- +-- Name: sol_purchases on_sol_purchase; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_sol_purchase AFTER INSERT ON public.sol_purchases FOR EACH ROW EXECUTE FUNCTION public.handle_sol_purchase(); + + +-- +-- Name: sol_reward_disbursements on_sol_reward_disbursement; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_sol_reward_disbursement AFTER INSERT ON public.sol_reward_disbursements FOR EACH ROW EXECUTE FUNCTION public.handle_sol_reward_disbursement(); + + -- -- Name: sol_token_account_balance_changes on_sol_token_account_balance_changes; Type: TRIGGER; Schema: public; Owner: - -- @@ -12474,6 +12859,13 @@ CREATE TRIGGER on_supporter_rank_up AFTER INSERT ON public.supporter_rank_ups FO CREATE TRIGGER on_track AFTER INSERT OR UPDATE ON public.tracks FOR EACH ROW EXECUTE FUNCTION public.handle_track(); +-- +-- Name: tracks on_track_notify_pending_purchase_revalidation; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_track_notify_pending_purchase_revalidation AFTER INSERT OR UPDATE OF blocknumber ON public.tracks FOR EACH ROW EXECUTE FUNCTION public.notify_pending_purchase_revalidation(); + + -- -- Name: usdc_purchases on_usdc_purchase; Type: TRIGGER; Schema: public; Owner: - -- @@ -12880,105 +13272,6 @@ ALTER TABLE ONLY public.users ADD CONSTRAINT users_blocknumber_fkey FOREIGN KEY (blocknumber) REFERENCES public.blocks(number) ON DELETE CASCADE; --- --- Name: v_challenge_disbursements; Type: VIEW; Schema: public; Owner: - --- - -CREATE VIEW public.v_challenge_disbursements AS - SELECT rd.challenge_id, - rd.specifier, - (rd.amount)::text AS amount, - rd.signature, - rd.slot, - rd.created_at, - users.user_id - FROM (public.sol_reward_disbursements rd - JOIN public.users ON (((users.wallet = rd.recipient_eth_address) AND (users.is_current = true)))); - - --- --- Name: v_usdc_purchases; Type: VIEW; Schema: public; Owner: - --- - -CREATE VIEW public.v_usdc_purchases AS - SELECT sp.signature, - sp.slot, - sp.buyer_user_id, - CASE sp.content_type - WHEN 'track'::text THEN t.owner_id - WHEN 'album'::text THEN p.playlist_owner_id - WHEN 'playlist'::text THEN p.playlist_owner_id - END AS seller_user_id, - sp.amount, - (sp.content_type)::public.usdc_purchase_content_type AS content_type, - sp.content_id, - sp.created_at, - GREATEST( - sp.amount - COALESCE( - CASE sp.content_type - WHEN 'track'::text THEN ( - SELECT (tph.total_price_cents * 10000) - FROM public.track_price_history tph - WHERE tph.track_id = sp.content_id - AND tph.block_timestamp <= sp.created_at - ORDER BY tph.block_timestamp DESC - LIMIT 1 - ) - ELSE ( - SELECT (aph.total_price_cents * 10000) - FROM public.album_price_history aph - WHERE aph.playlist_id = sp.content_id - AND aph.block_timestamp <= sp.created_at - ORDER BY aph.block_timestamp DESC - LIMIT 1 - ) - END, - 0 - ), - 0 - ) AS extra_amount, - (sp.access_type)::public.usdc_purchase_access_type AS access, - sp.city, - sp.region, - sp.country, - ( - SELECT COALESCE( - jsonb_agg( - jsonb_build_object( - 'user_id', COALESCE(u_payout.user_id, u_sca.user_id), - 'payout_wallet', pay.to_account, - 'amount', pay.amount, - 'percentage', ((pay.amount * 100.0) / NULLIF(sp.amount, 0)) - ) - ORDER BY pay.route_index - ), - '[]'::jsonb - ) - FROM public.sol_payments pay - LEFT JOIN public.users u_payout - ON u_payout.spl_usdc_payout_wallet = pay.to_account - AND u_payout.is_current = true - LEFT JOIN public.sol_claimable_accounts sca - ON sca.account = pay.to_account - AND sca.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v'::text - LEFT JOIN public.users u_sca - ON u_sca.wallet = sca.ethereum_address - AND u_sca.is_current = true - WHERE pay.signature = sp.signature - AND pay.instruction_index = sp.instruction_index - ) AS splits - FROM public.sol_purchases sp - LEFT JOIN public.tracks t - ON sp.content_type = 'track'::text - AND t.track_id = sp.content_id - AND t.is_current = true - LEFT JOIN public.playlists p - ON sp.content_type IN ('album'::text, 'playlist'::text) - AND p.playlist_id = sp.content_id - AND p.is_current = true - WHERE sp.is_valid IS TRUE; - - -- -- PostgreSQL database dump complete --