Build Reactive Signals for Bluesky's AT Protocol Firehose in Laravel
1<?php
2
3declare(strict_types=1);
4
5namespace SocialDept\AtpSignals\Tests\Integration;
6
7use Orchestra\Testbench\TestCase;
8use SocialDept\AtpSignals\Core\CAR;
9use SocialDept\AtpSignals\Core\CBOR;
10use SocialDept\AtpSignals\Core\CID;
11
12class FirehoseConsumerTest extends TestCase
13{
14 public function test_cbor_can_decode_firehose_message_header(): void
15 {
16 // Simulate a Firehose message header
17 // Map with 't' => '#commit', 'op' => 1
18 $header = [
19 't' => '#commit',
20 'op' => 1,
21 ];
22
23 // Encode it manually for testing
24 $cbor = "\xA2"; // Map with 2 items
25 $cbor .= "\x61t"; // Text string 't'
26 $cbor .= "\x67#commit"; // Text string '#commit'
27 $cbor .= "\x62op"; // Text string 'op'
28 $cbor .= "\x01"; // Integer 1
29
30 [$decoded, $remainder] = CBOR::decodeFirst($cbor);
31
32 $this->assertIsArray($decoded);
33 $this->assertArrayHasKey('t', $decoded);
34 $this->assertArrayHasKey('op', $decoded);
35 $this->assertSame('#commit', $decoded['t']);
36 $this->assertSame(1, $decoded['op']);
37 }
38
39 public function test_cbor_can_decode_commit_payload(): void
40 {
41 // Simplified commit payload structure
42 $payload = [
43 'repo' => 'did:plc:test123',
44 'rev' => 'test-rev',
45 'seq' => 12345,
46 'time' => '2024-01-01T00:00:00Z',
47 'ops' => [],
48 ];
49
50 // Encode a simple payload
51 $cbor = "\xA5"; // Map with 5 items
52
53 // 'repo' key
54 $cbor .= "\x64repo"; // Text string 'repo'
55 $cbor .= "\x6Fdid:plc:test123"; // Text string 'did:plc:test123'
56
57 // 'rev' key
58 $cbor .= "\x63rev"; // Text string 'rev'
59 $cbor .= "\x68test-rev"; // Text string 'test-rev'
60
61 // 'seq' key
62 $cbor .= "\x63seq"; // Text string 'seq'
63 $cbor .= "\x19\x30\x39"; // Integer 12345
64
65 // 'time' key
66 $cbor .= "\x64time"; // Text string 'time'
67 $cbor .= "\x74" . "2024-01-01T00:00:00Z"; // Text string (length 20)
68
69 // 'ops' key
70 $cbor .= "\x63ops"; // Text string 'ops'
71 $cbor .= "\x80"; // Empty array
72
73 $decoded = CBOR::decode($cbor);
74
75 $this->assertIsArray($decoded);
76 $this->assertSame('did:plc:test123', $decoded['repo']);
77 $this->assertSame('test-rev', $decoded['rev']);
78 $this->assertSame(12345, $decoded['seq']);
79 }
80
81 public function test_cid_can_be_decoded_from_cbor_tag(): void
82 {
83 // Create a CID and encode it as CBOR tag 42
84 $hash = hash('sha256', 'test-content', true);
85 $cidBinary = "\x01\x71\x12\x20" . $hash; // CIDv1, dag-cbor, sha256
86 $cidBytes = "\x00" . $cidBinary; // Add 0x00 prefix
87
88 // CBOR tag 42 + byte string
89 $length = strlen($cidBytes);
90 $cbor = "\xD8\x2A\x58" . chr($length) . $cidBytes;
91
92 $decoded = CBOR::decode($cbor);
93
94 $this->assertInstanceOf(CID::class, $decoded);
95 $this->assertSame(1, $decoded->version);
96 $this->assertSame(0x71, $decoded->codec);
97 }
98
99 public function test_car_can_extract_blocks(): void
100 {
101 // Create a minimal CAR with header and one block
102 $car = '';
103
104 // CAR header (minimal) - {version: 1}
105 $headerCbor = "\xA1\x67version\x01";
106 $headerLength = strlen($headerCbor);
107 $car .= chr($headerLength) . $headerCbor;
108
109 // Create a block with CID and data
110 // Block data: {test: "value"}
111 $blockData = "\xA1\x64test\x65value";
112
113 // Create CID: version 1, codec 0x71 (dag-cbor), sha256 hash
114 $cid = CID::fromBinary("\x01\x71\x12\x20" . str_repeat("\x00", 32));
115 $cidBinary = $cid->toBinary();
116
117 // In CAR format: varint(cid_length + data_length), CID bytes, data bytes
118 $totalLength = strlen($cidBinary) + strlen($blockData);
119 $car .= chr($totalLength) . $cidBinary . $blockData;
120
121 // Parse blocks
122 $blocks = CAR::blockMap($car, 'did:plc:test');
123
124 $this->assertIsArray($blocks);
125 $this->assertNotEmpty($blocks);
126 }
127
128 public function test_firehose_consumer_message_structure(): void
129 {
130 // Test the exact structure FirehoseConsumer expects
131
132 // 1. Create CBOR header
133 $headerMap = [
134 't' => '#commit',
135 'op' => 1,
136 ];
137
138 $header = "\xA2"; // Map with 2 items
139 $header .= "\x61t\x67#commit"; // 't' => '#commit'
140 $header .= "\x62op\x01"; // 'op' => 1
141
142 // 2. Create CBOR payload
143 $payload = "\xA6"; // Map with 6 items
144 $payload .= "\x63seq\x19\x30\x39"; // 'seq' => 12345
145 $payload .= "\x66rebase\xF4"; // 'rebase' => false
146 $payload .= "\x64repo\x6Fdid:plc:test123"; // 'repo' => 'did:plc:test123'
147 $payload .= "\x66commit\xA0"; // 'commit' => {}
148 $payload .= "\x63rev\x68test-rev"; // 'rev' => 'test-rev'
149 $payload .= "\x65since\x66origin"; // 'since' => 'origin'
150
151 // Add required fields
152 $payload .= "\x66blocks\x40"; // 'blocks' => empty byte string
153 $payload .= "\x63ops\x80"; // 'ops' => []
154 $payload .= "\x64time\x74" . "2024-01-01T00:00:00Z"; // 'time' => timestamp
155
156 // Combine header + payload
157 $message = $header . $payload;
158
159 // Test decoding header
160 [$decodedHeader, $remainder] = CBOR::decodeFirst($message);
161
162 $this->assertIsArray($decodedHeader);
163 $this->assertSame('#commit', $decodedHeader['t']);
164 $this->assertSame(1, $decodedHeader['op']);
165
166 // Test decoding payload
167 $decodedPayload = CBOR::decode($remainder);
168
169 $this->assertIsArray($decodedPayload);
170 $this->assertArrayHasKey('seq', $decodedPayload);
171 $this->assertArrayHasKey('repo', $decodedPayload);
172 $this->assertArrayHasKey('rev', $decodedPayload);
173 }
174
175 public function test_complete_firehose_message_flow(): void
176 {
177 // This test simulates the complete flow that FirehoseConsumer::handleMessage() uses
178
179 // Step 1: CBOR header
180 $header = "\xA2\x61t\x67#commit\x62op\x01";
181
182 // Step 2: CBOR payload with all required fields
183 $payload = "\xA9"; // Map with 9 items
184 $payload .= "\x63seq\x19\x30\x39"; // seq: 12345
185 $payload .= "\x66rebase\xF4"; // rebase: false
186 $payload .= "\x64repo\x6Fdid:plc:test123"; // repo: "did:plc:test123"
187 $payload .= "\x66commit\xA0"; // commit: {}
188 $payload .= "\x63rev\x68test-rev"; // rev: "test-rev"
189 $payload .= "\x65since\x66origin"; // since: "origin"
190 $payload .= "\x66blocks\x40"; // blocks: b''
191 $payload .= "\x63ops\x80"; // ops: []
192 $payload .= "\x64time\x74" . "2024-01-01T00:00:00Z"; // time: "2024-01-01T00:00:00Z"
193
194 $message = $header . $payload;
195
196 // Simulate FirehoseConsumer::handleMessage() logic
197
198 // 1. Decode CBOR header
199 [$decodedHeader, $remainder] = CBOR::decodeFirst($message);
200
201 $this->assertArrayHasKey('t', $decodedHeader);
202 $this->assertArrayHasKey('op', $decodedHeader);
203
204 // 2. Check operation
205 $this->assertSame(1, $decodedHeader['op']);
206
207 // 3. Decode payload
208 $decodedPayload = CBOR::decode($remainder);
209
210 // 4. Verify required fields exist
211 $requiredFields = ['seq', 'rebase', 'repo', 'commit', 'rev', 'since', 'blocks', 'ops', 'time'];
212 foreach ($requiredFields as $field) {
213 $this->assertArrayHasKey($field, $decodedPayload);
214 }
215
216 // 5. Verify data types
217 $this->assertIsInt($decodedPayload['seq']);
218 $this->assertIsBool($decodedPayload['rebase']);
219 $this->assertIsString($decodedPayload['repo']);
220 $this->assertIsArray($decodedPayload['ops']);
221
222 // Success! The message structure is valid
223 $this->assertTrue(true);
224 }
225}