diff --git a/server/attester_cmd.go b/server/attester_cmd.go index 7fcfc227..8f150d1e 100644 --- a/server/attester_cmd.go +++ b/server/attester_cmd.go @@ -1,12 +1,10 @@ package server import ( - "bytes" "context" "encoding/hex" "encoding/json" "fmt" - "io" "net/http" "net/url" "os" @@ -23,7 +21,6 @@ import ( cmttypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" - cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" @@ -129,11 +126,6 @@ func NewAttesterCmd() *cobra.Command { cancel() }() - cmd.Println("Joining attester set...") - if err := joinAttesterSet(ctx, config, valAddr, operatorPrivKey, consensusPrivKey, clientCtx); err != nil { - return fmt.Errorf("join attester set: %w", err) - } - cmd.Println("Starting to watch for new blocks...") if err := pullBlocksAndAttest(ctx, config, valAddr, operatorPrivKey, consensusPrivKey, clientCtx); err != nil { return fmt.Errorf("error watching blocks: %w", err) @@ -152,277 +144,60 @@ func NewAttesterCmd() *cobra.Command { return cmd } -func joinAttesterSet( +func assertRegistered( ctx context.Context, - config *AttesterConfig, - valAddr sdk.ValAddress, - operatorPrivKey *secp256k1.PrivKey, consensusPrivKey *pvm.FilePV, clientCtx client.Context, ) error { - sdkPubKey, err := cryptocodec.FromCmtPubKeyInterface(consensusPrivKey.Key.PubKey) - if err != nil { - return fmt.Errorf("convert public key: %w", err) - } - - authorityAddr, err := clientCtx.InterfaceRegistry.SigningContext().AddressCodec().BytesToString(operatorPrivKey.PubKey().Address()) - if err != nil { - return fmt.Errorf("convert authority address: %w", err) - } - - msg, err := networktypes.NewMsgJoinAttesterSet(authorityAddr, valAddr.String(), sdkPubKey) - if err != nil { - return fmt.Errorf("create join attester set msg: %w", err) - } - - txHash, err := broadcastTx(ctx, config, msg, operatorPrivKey, clientCtx) - if err != nil { - return fmt.Errorf("broadcast join attester set tx: %w", err) - } - - if config.Verbose { - fmt.Printf("šŸ“ Transaction submitted with hash: %s\n", txHash) - } - - time.Sleep(500 * time.Millisecond) - - var txResult *sdk.TxResponse - var retries = 10 - for range retries { - txResult, err = authtx.QueryTx(clientCtx, txHash) - if err == nil { - break - } - time.Sleep(500 * time.Millisecond) - } - + consAddr := sdk.ConsAddress(consensusPrivKey.Key.PubKey.Address()).String() + queryClient := networktypes.NewQueryClient(clientCtx) + resp, err := queryClient.AttesterSet(ctx, &networktypes.QueryAttesterSetRequest{}) if err != nil { - return fmt.Errorf("transaction %s not found after %d attempts: %w", txHash, retries, err) + return fmt.Errorf("query attester set: %w", err) } - - if config.Verbose { - fmt.Printf("šŸ“Š Transaction Result: Code=%d, Height=%d\n", txResult.Code, txResult.Height) - } - - if txResult.Code != 0 { - fmt.Printf("āŒ MsgJoinAttesterSet FAILED with code %d\n", txResult.Code) - fmt.Printf(" Error details: %s\n", txResult.RawLog) - - if txResult.Code == 18 && strings.Contains(txResult.RawLog, "validator already in attester set") { - fmt.Printf("ā„¹ļø Already in attester set, proceeding...\n") + for _, e := range resp.Entries { + if e.ConsensusAddress == consAddr { return nil } - - switch txResult.Code { - case 4: - fmt.Println(" Error: Unauthorized - The address may not be a valid validator") - case 5: - fmt.Println(" Error: Insufficient funds") - case 11: - fmt.Println(" Error: Out of gas") - case 18: - fmt.Println(" Error: Invalid request") - default: - fmt.Printf(" Error code %d\n", txResult.Code) - } - - return fmt.Errorf("MsgJoinAttesterSet failed with code %d: %s", txResult.Code, txResult.RawLog) } - - fmt.Printf("āœ… Successfully joined attester set\n") - time.Sleep(500 * time.Millisecond) - - return nil + return fmt.Errorf("consensus address %s is not in the attester set; must be registered in genesis", consAddr) } func pullBlocksAndAttest( ctx context.Context, config *AttesterConfig, valAddr sdk.ValAddress, - senderKey *secp256k1.PrivKey, - pv *pvm.FilePV, + operatorPrivKey *secp256k1.PrivKey, + consensusPrivKey *pvm.FilePV, clientCtx client.Context, ) error { - parsed, err := url.Parse(config.Node) - if err != nil { - return fmt.Errorf("parse node URL: %w", err) - } - - httpClient := &http.Client{ - Timeout: 10 * time.Second, - } - - resp, err := httpClient.Get(fmt.Sprintf("http://%s/status", parsed.Host)) - if err != nil { - return fmt.Errorf("error querying status: %v", err) - } - - var statusResponse struct { - Result struct { - SyncInfo struct { - LatestBlockHeight string `json:"latest_block_height"` - } `json:"sync_info"` - } `json:"result"` - } - - if err := json.NewDecoder(resp.Body).Decode(&statusResponse); err != nil { - _ = resp.Body.Close() - return fmt.Errorf("error parsing status response: %v", err) + if err := assertRegistered(ctx, consensusPrivKey, clientCtx); err != nil { + return err } - _ = resp.Body.Close() - - currentHeight, err := strconv.ParseInt(statusResponse.Result.SyncInfo.LatestBlockHeight, 10, 64) - if err != nil { - return fmt.Errorf("error parsing current height: %v", err) - } - - fmt.Printf("šŸ“Š Current blockchain height: %d\n", currentHeight) - fmt.Printf("šŸ“ Attesting blocks 1-%d...\n", currentHeight) - failedBlocks := make(map[int64]int) - maxRetries := 3 + var nextHeight int64 = 1 + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() - for height := int64(1); height <= currentHeight; height++ { + for { select { case <-ctx.Done(): - return nil - default: - if height%10 == 0 || height == 1 || height == currentHeight { - fmt.Printf("šŸ“¦ Attesting blocks... %d/%d\n", height, currentHeight) - } - - err = submitAttestation(ctx, config, height, valAddr, senderKey, pv, clientCtx) - if err != nil { - fmt.Printf("āš ļø Error attesting block %d: %v\n", height, err) - failedBlocks[height] = 1 - continue - } - - time.Sleep(time.Millisecond * 100) + return ctx.Err() + case <-ticker.C: } - } - - if len(failedBlocks) > 0 { - fmt.Printf("\nšŸ”„ Retrying %d failed blocks...\n", len(failedBlocks)) - for retryRound := 1; retryRound <= maxRetries && len(failedBlocks) > 0; retryRound++ { - fmt.Printf(" Round %d/%d - %d blocks remaining\n", retryRound, maxRetries, len(failedBlocks)) - blocksToRetry := make([]int64, 0, len(failedBlocks)) - for height := range failedBlocks { - blocksToRetry = append(blocksToRetry, height) - } - - for i := 0; i < len(blocksToRetry); i++ { - for j := i + 1; j < len(blocksToRetry); j++ { - if blocksToRetry[i] > blocksToRetry[j] { - blocksToRetry[i], blocksToRetry[j] = blocksToRetry[j], blocksToRetry[i] - } - } - } - - for _, height := range blocksToRetry { - select { - case <-ctx.Done(): - return nil - default: - fmt.Printf(" šŸ”„ Retrying block %d (attempt %d)...\n", height, failedBlocks[height]+1) - err = submitAttestation(ctx, config, height, valAddr, senderKey, pv, clientCtx) - if err != nil { - failedBlocks[height]++ - if failedBlocks[height] >= maxRetries { - fmt.Printf(" āŒ Block %d failed after %d attempts\n", height, maxRetries) - delete(failedBlocks, height) - } - } else { - fmt.Printf(" āœ… Block %d attested successfully\n", height) - delete(failedBlocks, height) - } - - time.Sleep(time.Millisecond * 300) - } - } - - if len(failedBlocks) > 0 { - time.Sleep(time.Second * 2) - } + currentHeight, err := getLatestHeight(config.Node) + if err != nil { + fmt.Printf("āš ļø status poll failed: %v\n", err) + continue } - - if len(failedBlocks) > 0 { - fmt.Printf("\nāŒ Failed to attest %d blocks after all retries\n", len(failedBlocks)) - for height := range failedBlocks { - fmt.Printf(" - Block %d\n", height) + for h := nextHeight; h <= currentHeight; h++ { + if err := submitAttestation(ctx, config, h, valAddr, operatorPrivKey, consensusPrivKey, clientCtx); err != nil { + // duplicate or transient — log and move on + fmt.Printf("attest h=%d: %v\n", h, err) } } - } - - fmt.Printf("āœ… Finished historical blocks. Watching for new blocks...\n") - - lastAttested := currentHeight - - for { - select { - case <-ctx.Done(): - return nil - default: - resp, err := httpClient.Get(fmt.Sprintf("http://%s/block", parsed.Host)) - if err != nil { - fmt.Printf("Error querying block: %v\n", err) - time.Sleep(time.Second / 10) - continue - } - - var blockResponse struct { - Result struct { - Block struct { - Header struct { - Height string `json:"height"` - AppHash string `json:"app_hash"` - } `json:"header"` - } `json:"block"` - } `json:"result"` - } - - var buf bytes.Buffer - if err := json.NewDecoder(io.TeeReader(resp.Body, &buf)).Decode(&blockResponse); err != nil { - fmt.Printf("Error parsing response: %v: %s\n", err, buf.String()) - _ = resp.Body.Close() - time.Sleep(time.Second / 10) - continue - } - _ = resp.Body.Close() - - heightStr := blockResponse.Result.Block.Header.Height - if heightStr == "" { - if config.Verbose { - fmt.Println("Height field is empty in response, retrying...") - } - time.Sleep(time.Second / 10) - continue - } - height, err := strconv.ParseInt(heightStr, 10, 64) - if err != nil { - fmt.Printf("Error parsing height: %v\n", err) - time.Sleep(time.Second / 10) - continue - } - - if height > lastAttested { - for missedHeight := lastAttested + 1; missedHeight <= height; missedHeight++ { - fmt.Printf("šŸ“¦ New block %d - attesting...\n", missedHeight) - - err = submitAttestation(ctx, config, missedHeight, valAddr, senderKey, pv, clientCtx) - if err != nil { - fmt.Printf("āš ļø Error submitting attestation for block %d: %v\n", missedHeight, err) - continue - } - fmt.Printf("āœ… Attested block %d\n", missedHeight) - } - - lastAttested = height - } - - time.Sleep(50 * time.Millisecond) - } + nextHeight = currentHeight + 1 } } @@ -652,7 +427,6 @@ func submitAttestation( if err != nil { return fmt.Errorf("getting Evolve header: %w", err) } - blockID, err := getOriginalBlockID(ctx, config.Node, height) if err != nil { return fmt.Errorf("getting original block ID: %w", err) @@ -661,60 +435,78 @@ func submitAttestation( vote := cmtproto.Vote{ Type: cmtproto.PrecommitType, Height: height, - BlockID: blockID, Round: 0, + BlockID: blockID, Timestamp: header.Time(), - ValidatorAddress: pv.Key.PrivKey.PubKey().Address(), + ValidatorAddress: pv.Key.PubKey.Address(), ValidatorIndex: 0, } - signBytes := cmttypes.VoteSignBytes(config.ChainID, &vote) - - signature, err := pv.Key.PrivKey.Sign(signBytes) + sig, err := pv.Key.PrivKey.Sign(signBytes) if err != nil { - return fmt.Errorf("signing payload: %w", err) - } - - validatorAddr := pv.Key.Address - - fmt.Printf("šŸ” DEBUG ValidatorAddr used in vote: %X\n", validatorAddr) - fmt.Printf("šŸ” DEBUG pv.GetAddress(): %X\n", pv.GetAddress()) - fmt.Printf("šŸ” DEBUG pubKey.Address(): %X\n", pv.Key.PubKey.Address()) - - attesterVote := &cmtproto.Vote{ - Type: cmtproto.PrecommitType, - ValidatorAddress: validatorAddr, - Height: height, - Round: 0, - BlockID: cmtproto.BlockID{Hash: header.Hash(), PartSetHeader: cmtproto.PartSetHeader{}}, - Timestamp: header.Time(), - Signature: signature, + return fmt.Errorf("sign vote: %w", err) } - - voteBytes, err := proto.Marshal(attesterVote) + vote.Signature = sig + voteBytes, err := proto.Marshal(&vote) if err != nil { return fmt.Errorf("marshal vote: %w", err) } authorityAddr := sdk.AccAddress(senderKey.PubKey().Address()).String() - msg := networktypes.NewMsgAttest( - authorityAddr, - valAddr.String(), - height, - voteBytes, - ) + consensusAddr := sdk.ConsAddress(pv.Key.PubKey.Address()).String() + msg := networktypes.NewMsgAttest(authorityAddr, consensusAddr, height, voteBytes) txHash, err := broadcastTx(ctx, config, msg, senderKey, clientCtx) if err != nil { return fmt.Errorf("broadcast attest tx: %w", err) } - if config.Verbose { fmt.Printf("Attestation submitted for block %d with hash: %s\n", height, txHash) } return nil } +// getLatestHeight returns the latest raw block height the sequencer has +// produced. It cannot use /status in attester mode because /status reports +// the last-attested height there (which is 0 before any attestation is made, +// causing a deadlock: attester waits for blocks to attest, but /status can't +// advance until attestations land). Instead, it hits /block with no height, +// which the RPC resolves to RollkitStore.Height — the real production height. +func getLatestHeight(nodeURL string) (int64, error) { + parsed, err := url.Parse(nodeURL) + if err != nil { + return 0, fmt.Errorf("parse node URL: %w", err) + } + httpClient := &http.Client{Timeout: 10 * time.Second} + resp, err := httpClient.Get(fmt.Sprintf("http://%s/block", parsed.Host)) + if err != nil { + return 0, fmt.Errorf("query block: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + var blockResp struct { + Result struct { + Block struct { + Header struct { + Height string `json:"height"` + } `json:"header"` + } `json:"block"` + } `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&blockResp); err != nil { + return 0, fmt.Errorf("decode block: %w", err) + } + heightStr := blockResp.Result.Block.Header.Height + if heightStr == "" { + return 0, nil + } + h, err := strconv.ParseInt(heightStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("parse height %q: %w", heightStr, err) + } + return h, nil +} + func getEvolveHeader(node string, height int64) (*evolvetypes.Header, error) { parsed, err := url.Parse(node) if err != nil { diff --git a/server/attester_cmd_test.go b/server/attester_cmd_test.go index 6bf0421b..9a98a1d0 100644 --- a/server/attester_cmd_test.go +++ b/server/attester_cmd_test.go @@ -2,6 +2,9 @@ package server import ( "context" + "net/http" + "net/http/httptest" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -48,41 +51,274 @@ func TestPrivateKeyFromMnemonic(t *testing.T) { } func TestGetOriginalBlockID(t *testing.T) { + t.Run("height 0 returns empty block ID without RPC", func(t *testing.T) { + blockID, err := getOriginalBlockID(context.Background(), "tcp://localhost:26657", 0) + require.NoError(t, err) + assert.Empty(t, blockID.Hash) + assert.Empty(t, blockID.PartSetHeader.Hash) + assert.Equal(t, uint32(0), blockID.PartSetHeader.Total) + }) + + t.Run("height 1 returns empty block ID without RPC", func(t *testing.T) { + blockID, err := getOriginalBlockID(context.Background(), "tcp://localhost:26657", 1) + require.NoError(t, err) + assert.Empty(t, blockID.Hash) + assert.Empty(t, blockID.PartSetHeader.Hash) + assert.Equal(t, uint32(0), blockID.PartSetHeader.Total) + }) + + t.Run("height greater than 1 reads block ID from RPC", func(t *testing.T) { + blockHash := strings.Repeat("ab", 32) + partSetHash := strings.Repeat("cd", 32) + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block_id": { + "hash": "`+blockHash+`", + "parts": { + "hash": "`+partSetHash+`", + "total": 2 + } + }, + "block": { + "header": { + "height": "2", + "time": "2026-04-22T12:00:00Z", + "chain_id": "gm" + } + } + } + }`) + + blockID, err := getOriginalBlockID(context.Background(), nodeURL, 2) + require.NoError(t, err) + assert.Equal(t, strings.Repeat("\xab", 32), string(blockID.Hash)) + assert.Equal(t, strings.Repeat("\xcd", 32), string(blockID.PartSetHeader.Hash)) + assert.Equal(t, uint32(2), blockID.PartSetHeader.Total) + }) + + t.Run("invalid node URL returns error", func(t *testing.T) { + _, err := getOriginalBlockID(context.Background(), "%", 2) + require.Error(t, err) + require.Contains(t, err.Error(), "parse node URL") + }) + + t.Run("invalid JSON returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{`) + + _, err := getOriginalBlockID(context.Background(), nodeURL, 2) + require.Error(t, err) + require.Contains(t, err.Error(), "decoding response") + }) + + t.Run("invalid block hash returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block_id": { + "hash": "not-hex", + "parts": { + "hash": "`+strings.Repeat("cd", 32)+`", + "total": 1 + } + } + } + }`) + + _, err := getOriginalBlockID(context.Background(), nodeURL, 2) + require.Error(t, err) + require.Contains(t, err.Error(), "decoding block ID hash") + }) + + t.Run("invalid part set hash returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block_id": { + "hash": "`+strings.Repeat("ab", 32)+`", + "parts": { + "hash": "not-hex", + "total": 1 + } + } + } + }`) + + _, err := getOriginalBlockID(context.Background(), nodeURL, 2) + require.Error(t, err) + require.Contains(t, err.Error(), "decoding part set header hash") + }) +} + +func TestGetLatestHeight(t *testing.T) { tests := []struct { - name string - height int64 - expectEmpty bool + name string + response string + want int64 + wantErr string }{ { - name: "height 0", - height: 0, - expectEmpty: true, + name: "valid height", + response: `{ + "result": { + "block": { + "header": { + "height": "42" + } + } + } + }`, + want: 42, }, { - name: "height 1", - height: 1, - expectEmpty: true, + name: "empty height returns zero", + response: `{ + "result": { + "block": { + "header": {} + } + } + }`, + want: 0, }, { - name: "height 2 - requires RPC call", - height: 2, - expectEmpty: false, + name: "invalid height returns error", + response: `{ + "result": { + "block": { + "header": { + "height": "not-a-number" + } + } + } + }`, + wantErr: "parse height", + }, + { + name: "invalid JSON returns error", + response: `{`, + wantErr: "decode block", }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.expectEmpty { - blockID, err := getOriginalBlockID(context.Background(), "tcp://localhost:26657", tt.height) - require.NoError(t, err) - assert.Empty(t, blockID.Hash) - assert.Empty(t, blockID.PartSetHeader.Hash) - assert.Equal(t, uint32(0), blockID.PartSetHeader.Total) - } else { - // For height > 1, we would need a running node to test - // This test would fail without a node, so we skip the actual call - t.Skip("Requires running node for integration test") + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, tc.response) + + got, err := getLatestHeight(nodeURL) + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + return } + require.NoError(t, err) + assert.Equal(t, tc.want, got) }) } + + t.Run("invalid node URL returns error", func(t *testing.T) { + _, err := getLatestHeight("%") + require.Error(t, err) + require.Contains(t, err.Error(), "parse node URL") + }) +} + +func TestGetEvolveHeader(t *testing.T) { + t.Run("valid response builds Evolve header", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block": { + "header": { + "version": { + "block": "1", + "app": "7" + }, + "height": "42", + "time": "2026-04-22T12:00:00Z", + "last_block_id": { + "hash": "`+strings.Repeat("11", 32)+`" + }, + "last_commit_hash": "`+strings.Repeat("22", 32)+`", + "data_hash": "`+strings.Repeat("33", 32)+`", + "validators_hash": "`+strings.Repeat("44", 32)+`", + "next_validators_hash": "`+strings.Repeat("55", 32)+`", + "consensus_hash": "`+strings.Repeat("66", 32)+`", + "app_hash": "`+strings.Repeat("77", 32)+`", + "last_results_hash": "`+strings.Repeat("88", 32)+`", + "evidence_hash": "`+strings.Repeat("99", 32)+`", + "proposer_address": "`+strings.Repeat("aa", 20)+`", + "chain_id": "gm" + } + } + } + }`) + + header, err := getEvolveHeader(nodeURL, 42) + require.NoError(t, err) + require.NotNil(t, header) + assert.Equal(t, uint64(42), header.Height()) + assert.Equal(t, "gm", header.ChainID()) + assert.Equal(t, uint64(7), header.Version.App) + assert.Equal(t, strings.Repeat("\x33", 32), string(header.DataHash)) + assert.Equal(t, strings.Repeat("\x44", 32), string(header.ValidatorHash)) + assert.Equal(t, strings.Repeat("\x77", 32), string(header.AppHash)) + assert.Equal(t, strings.Repeat("\xaa", 20), string(header.ProposerAddress)) + }) + + t.Run("invalid node URL returns error", func(t *testing.T) { + _, err := getEvolveHeader("%", 42) + require.Error(t, err) + require.Contains(t, err.Error(), "parse node URL") + }) + + t.Run("invalid JSON returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{`) + + _, err := getEvolveHeader(nodeURL, 42) + require.Error(t, err) + require.Contains(t, err.Error(), "decoding response") + }) + + t.Run("invalid height returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block": { + "header": { + "height": "not-a-number", + "time": "2026-04-22T12:00:00Z" + } + } + } + }`) + + _, err := getEvolveHeader(nodeURL, 42) + require.Error(t, err) + require.Contains(t, err.Error(), "parsing height") + }) + + t.Run("invalid timestamp returns error", func(t *testing.T) { + nodeURL := newAttesterRPCTestServer(t, `{ + "result": { + "block": { + "header": { + "height": "42", + "time": "not-a-time" + } + } + } + }`) + + _, err := getEvolveHeader(nodeURL, 42) + require.Error(t, err) + require.Contains(t, err.Error(), "parsing time") + }) +} + +func newAttesterRPCTestServer(t *testing.T, response string) string { + t.Helper() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(response)) + })) + t.Cleanup(server.Close) + + return "tcp://" + strings.TrimPrefix(server.URL, "http://") }