Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
at dev 7.9 kB view raw
1<?php 2 3declare(strict_types=1); 4 5namespace SocialDept\AtpSignals\Tests\Integration; 6 7use Orchestra\Testbench\TestCase; 8use SocialDept\AtpSignals\Core\CAR; 9use SocialDept\AtpSignals\Core\CBOR; 10use SocialDept\AtpSignals\Core\CID; 11 12class FirehoseConsumerTest extends TestCase 13{ 14 public function test_cbor_can_decode_firehose_message_header(): void 15 { 16 // Simulate a Firehose message header 17 // Map with 't' => '#commit', 'op' => 1 18 $header = [ 19 't' => '#commit', 20 'op' => 1, 21 ]; 22 23 // Encode it manually for testing 24 $cbor = "\xA2"; // Map with 2 items 25 $cbor .= "\x61t"; // Text string 't' 26 $cbor .= "\x67#commit"; // Text string '#commit' 27 $cbor .= "\x62op"; // Text string 'op' 28 $cbor .= "\x01"; // Integer 1 29 30 [$decoded, $remainder] = CBOR::decodeFirst($cbor); 31 32 $this->assertIsArray($decoded); 33 $this->assertArrayHasKey('t', $decoded); 34 $this->assertArrayHasKey('op', $decoded); 35 $this->assertSame('#commit', $decoded['t']); 36 $this->assertSame(1, $decoded['op']); 37 } 38 39 public function test_cbor_can_decode_commit_payload(): void 40 { 41 // Simplified commit payload structure 42 $payload = [ 43 'repo' => 'did:plc:test123', 44 'rev' => 'test-rev', 45 'seq' => 12345, 46 'time' => '2024-01-01T00:00:00Z', 47 'ops' => [], 48 ]; 49 50 // Encode a simple payload 51 $cbor = "\xA5"; // Map with 5 items 52 53 // 'repo' key 54 $cbor .= "\x64repo"; // Text string 'repo' 55 $cbor .= "\x6Fdid:plc:test123"; // Text string 'did:plc:test123' 56 57 // 'rev' key 58 $cbor .= "\x63rev"; // Text string 'rev' 59 $cbor .= "\x68test-rev"; // Text string 'test-rev' 60 61 // 'seq' key 62 $cbor .= "\x63seq"; // Text string 'seq' 63 $cbor .= "\x19\x30\x39"; // Integer 12345 64 65 // 'time' key 66 $cbor .= "\x64time"; // Text string 'time' 67 $cbor .= "\x74" . "2024-01-01T00:00:00Z"; // Text string (length 20) 68 69 // 'ops' key 70 $cbor .= "\x63ops"; // Text string 'ops' 71 $cbor .= "\x80"; // Empty array 72 73 $decoded = CBOR::decode($cbor); 74 75 $this->assertIsArray($decoded); 76 $this->assertSame('did:plc:test123', $decoded['repo']); 77 $this->assertSame('test-rev', $decoded['rev']); 78 $this->assertSame(12345, $decoded['seq']); 79 } 80 81 public function test_cid_can_be_decoded_from_cbor_tag(): void 82 { 83 // Create a CID and encode it as CBOR tag 42 84 $hash = hash('sha256', 'test-content', true); 85 $cidBinary = "\x01\x71\x12\x20" . $hash; // CIDv1, dag-cbor, sha256 86 $cidBytes = "\x00" . $cidBinary; // Add 0x00 prefix 87 88 // CBOR tag 42 + byte string 89 $length = strlen($cidBytes); 90 $cbor = "\xD8\x2A\x58" . chr($length) . $cidBytes; 91 92 $decoded = CBOR::decode($cbor); 93 94 $this->assertInstanceOf(CID::class, $decoded); 95 $this->assertSame(1, $decoded->version); 96 $this->assertSame(0x71, $decoded->codec); 97 } 98 99 public function test_car_can_extract_blocks(): void 100 { 101 // Create a minimal CAR with header and one block 102 $car = ''; 103 104 // CAR header (minimal) - {version: 1} 105 $headerCbor = "\xA1\x67version\x01"; 106 $headerLength = strlen($headerCbor); 107 $car .= chr($headerLength) . $headerCbor; 108 109 // Create a block with CID and data 110 // Block data: {test: "value"} 111 $blockData = "\xA1\x64test\x65value"; 112 113 // Create CID: version 1, codec 0x71 (dag-cbor), sha256 hash 114 $cid = CID::fromBinary("\x01\x71\x12\x20" . str_repeat("\x00", 32)); 115 $cidBinary = $cid->toBinary(); 116 117 // In CAR format: varint(cid_length + data_length), CID bytes, data bytes 118 $totalLength = strlen($cidBinary) + strlen($blockData); 119 $car .= chr($totalLength) . $cidBinary . $blockData; 120 121 // Parse blocks 122 $blocks = CAR::blockMap($car, 'did:plc:test'); 123 124 $this->assertIsArray($blocks); 125 $this->assertNotEmpty($blocks); 126 } 127 128 public function test_firehose_consumer_message_structure(): void 129 { 130 // Test the exact structure FirehoseConsumer expects 131 132 // 1. Create CBOR header 133 $headerMap = [ 134 't' => '#commit', 135 'op' => 1, 136 ]; 137 138 $header = "\xA2"; // Map with 2 items 139 $header .= "\x61t\x67#commit"; // 't' => '#commit' 140 $header .= "\x62op\x01"; // 'op' => 1 141 142 // 2. Create CBOR payload 143 $payload = "\xA6"; // Map with 6 items 144 $payload .= "\x63seq\x19\x30\x39"; // 'seq' => 12345 145 $payload .= "\x66rebase\xF4"; // 'rebase' => false 146 $payload .= "\x64repo\x6Fdid:plc:test123"; // 'repo' => 'did:plc:test123' 147 $payload .= "\x66commit\xA0"; // 'commit' => {} 148 $payload .= "\x63rev\x68test-rev"; // 'rev' => 'test-rev' 149 $payload .= "\x65since\x66origin"; // 'since' => 'origin' 150 151 // Add required fields 152 $payload .= "\x66blocks\x40"; // 'blocks' => empty byte string 153 $payload .= "\x63ops\x80"; // 'ops' => [] 154 $payload .= "\x64time\x74" . "2024-01-01T00:00:00Z"; // 'time' => timestamp 155 156 // Combine header + payload 157 $message = $header . $payload; 158 159 // Test decoding header 160 [$decodedHeader, $remainder] = CBOR::decodeFirst($message); 161 162 $this->assertIsArray($decodedHeader); 163 $this->assertSame('#commit', $decodedHeader['t']); 164 $this->assertSame(1, $decodedHeader['op']); 165 166 // Test decoding payload 167 $decodedPayload = CBOR::decode($remainder); 168 169 $this->assertIsArray($decodedPayload); 170 $this->assertArrayHasKey('seq', $decodedPayload); 171 $this->assertArrayHasKey('repo', $decodedPayload); 172 $this->assertArrayHasKey('rev', $decodedPayload); 173 } 174 175 public function test_complete_firehose_message_flow(): void 176 { 177 // This test simulates the complete flow that FirehoseConsumer::handleMessage() uses 178 179 // Step 1: CBOR header 180 $header = "\xA2\x61t\x67#commit\x62op\x01"; 181 182 // Step 2: CBOR payload with all required fields 183 $payload = "\xA9"; // Map with 9 items 184 $payload .= "\x63seq\x19\x30\x39"; // seq: 12345 185 $payload .= "\x66rebase\xF4"; // rebase: false 186 $payload .= "\x64repo\x6Fdid:plc:test123"; // repo: "did:plc:test123" 187 $payload .= "\x66commit\xA0"; // commit: {} 188 $payload .= "\x63rev\x68test-rev"; // rev: "test-rev" 189 $payload .= "\x65since\x66origin"; // since: "origin" 190 $payload .= "\x66blocks\x40"; // blocks: b'' 191 $payload .= "\x63ops\x80"; // ops: [] 192 $payload .= "\x64time\x74" . "2024-01-01T00:00:00Z"; // time: "2024-01-01T00:00:00Z" 193 194 $message = $header . $payload; 195 196 // Simulate FirehoseConsumer::handleMessage() logic 197 198 // 1. Decode CBOR header 199 [$decodedHeader, $remainder] = CBOR::decodeFirst($message); 200 201 $this->assertArrayHasKey('t', $decodedHeader); 202 $this->assertArrayHasKey('op', $decodedHeader); 203 204 // 2. Check operation 205 $this->assertSame(1, $decodedHeader['op']); 206 207 // 3. Decode payload 208 $decodedPayload = CBOR::decode($remainder); 209 210 // 4. Verify required fields exist 211 $requiredFields = ['seq', 'rebase', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'time']; 212 foreach ($requiredFields as $field) { 213 $this->assertArrayHasKey($field, $decodedPayload); 214 } 215 216 // 5. Verify data types 217 $this->assertIsInt($decodedPayload['seq']); 218 $this->assertIsBool($decodedPayload['rebase']); 219 $this->assertIsString($decodedPayload['repo']); 220 $this->assertIsArray($decodedPayload['ops']); 221 222 // Success! The message structure is valid 223 $this->assertTrue(true); 224 } 225}