+139
-4
src/lib/at/client.ts
+139
-4
src/lib/at/client.ts
···
5
5
ComAtprotoRepoListRecords
6
6
} from '@atcute/atproto';
7
7
import { Client as AtcuteClient, simpleFetchHandler } from '@atcute/client';
8
-
import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons';
8
+
import { safeParse, type Blob as AtpBlob, type Handle, type InferOutput } from '@atcute/lexicons';
9
9
import {
10
10
isDid,
11
11
parseCanonicalResourceUri,
···
82
82
83
83
const cache = cacheWithRecords;
84
84
85
+
const wrapBlobWithProgress = (
86
+
blob: Blob,
87
+
onProgress: (uploaded: number, total: number) => void
88
+
): ReadableStream<Uint8Array> => {
89
+
const totalSize = blob.size;
90
+
let uploaded = 0;
91
+
92
+
return new ReadableStream({
93
+
start: async (controller) => {
94
+
const reader = blob.stream().getReader();
95
+
96
+
const push = async () => {
97
+
const { done, value } = await reader.read();
98
+
99
+
if (done) {
100
+
controller.close();
101
+
return;
102
+
}
103
+
104
+
uploaded += value.byteLength;
105
+
onProgress(uploaded, totalSize);
106
+
107
+
controller.enqueue(value);
108
+
await push();
109
+
};
110
+
111
+
await push();
112
+
}
113
+
});
114
+
};
115
+
116
+
export type UploadStatus =
117
+
| { stage: 'auth' }
118
+
| { stage: 'uploading'; progress?: number }
119
+
| { stage: 'processing'; progress?: number }
120
+
| { stage: 'complete' };
121
+
85
122
export class AtpClient {
86
123
public atcute: AtcuteClient | null = null;
87
-
public user: { did: Did; handle: Handle } | null = null;
124
+
public user: MiniDoc | null = null;
88
125
89
126
async login(agent: OAuthUserAgent): Promise<Result<null, string>> {
90
127
try {
···
93
130
if (!res.ok) throw res.data.error;
94
131
this.user = {
95
132
did: res.data.did,
96
-
handle: res.data.handle
133
+
handle: res.data.handle,
134
+
pds: agent.session.info.aud as `${string}:${string}`,
135
+
signing_key: ''
97
136
};
98
137
this.atcute = rpc;
99
138
} catch (error) {
···
253
292
254
293
return results;
255
294
}
295
+
296
+
async uploadBlob(
297
+
blob: Blob,
298
+
onProgress?: (progress: number) => void
299
+
): Promise<Result<AtpBlob<string>, string>> {
300
+
if (!this.atcute) return err('not authenticated');
301
+
const input = wrapBlobWithProgress(blob, (uploaded, total) => onProgress?.(uploaded / total));
302
+
const res = await this.atcute.post('com.atproto.repo.uploadBlob', { input });
303
+
if (!res.ok) return err(`upload failed: ${res.data.error}`);
304
+
return ok(res.data.blob);
305
+
}
306
+
307
+
async uploadVideo(
308
+
blob: Blob,
309
+
onStatus?: (status: UploadStatus) => void
310
+
): Promise<Result<AtpBlob<string>, string>> {
311
+
if (!this.atcute || !this.user) return err('not authenticated');
312
+
313
+
onStatus?.({ stage: 'auth' });
314
+
const serviceAuthUrl = new URL(`${this.user.pds}/xrpc/com.atproto.server.getServiceAuth`);
315
+
serviceAuthUrl.searchParams.append('aud', this.user.pds.replace('https://', 'did:web:'));
316
+
serviceAuthUrl.searchParams.append('lxm', 'com.atproto.repo.uploadBlob');
317
+
serviceAuthUrl.searchParams.append('exp', (Math.floor(Date.now() / 1000) + 60 * 30).toString()); // 30 minutes
318
+
319
+
const serviceAuthResponse = await this.atcute.handler(
320
+
`${serviceAuthUrl.pathname}${serviceAuthUrl.search}`,
321
+
{
322
+
method: 'GET'
323
+
}
324
+
);
325
+
if (!serviceAuthResponse.ok) {
326
+
const error = await serviceAuthResponse.text();
327
+
return err(`failed to get service auth: ${error}`);
328
+
}
329
+
330
+
const serviceAuth = await serviceAuthResponse.json();
331
+
const token = serviceAuth.token;
332
+
333
+
onStatus?.({ stage: 'uploading' });
334
+
const uploadUrl = new URL('https://video.bsky.app/xrpc/app.bsky.video.uploadVideo');
335
+
uploadUrl.searchParams.append('did', this.user.did);
336
+
uploadUrl.searchParams.append('name', 'video.mp4');
337
+
338
+
const body = wrapBlobWithProgress(blob, (uploaded, total) =>
339
+
onStatus?.({ stage: 'uploading', progress: uploaded / total })
340
+
);
341
+
const uploadResponse = await fetch(uploadUrl.toString(), {
342
+
method: 'POST',
343
+
headers: {
344
+
Authorization: `Bearer ${token}`,
345
+
'Content-Type': 'video/mp4'
346
+
},
347
+
body
348
+
});
349
+
if (!uploadResponse.ok) {
350
+
const error = await uploadResponse.text();
351
+
return err(`failed to upload video: ${error}`);
352
+
}
353
+
354
+
const jobStatus = await uploadResponse.json();
355
+
let videoBlobRef: AtpBlob<string> = jobStatus.blob;
356
+
357
+
onStatus?.({ stage: 'processing' });
358
+
while (!videoBlobRef) {
359
+
await new Promise((resolve) => setTimeout(resolve, 1000));
360
+
361
+
const statusResponse = await fetch(
362
+
`https://video.bsky.app/xrpc/app.bsky.video.getJobStatus?jobId=${jobStatus.jobId}`
363
+
);
364
+
365
+
if (!statusResponse.ok) {
366
+
const error = await statusResponse.json();
367
+
// reuse blob
368
+
if (error.error === 'already_exists' && error.blob) {
369
+
videoBlobRef = error.blob;
370
+
break;
371
+
}
372
+
return err(`failed to get job status: ${error.message || error.error}`);
373
+
}
374
+
375
+
const status = await statusResponse.json();
376
+
if (status.jobStatus.blob) {
377
+
videoBlobRef = status.jobStatus.blob;
378
+
} else if (status.jobStatus.state === 'JOB_STATE_FAILED') {
379
+
return err(`video processing failed: ${status.jobStatus.error || 'unknown error'}`);
380
+
} else if (status.jobStatus.progress !== undefined) {
381
+
onStatus?.({
382
+
stage: 'processing',
383
+
progress: status.jobStatus.progress
384
+
});
385
+
}
386
+
}
387
+
388
+
onStatus?.({ stage: 'complete' });
389
+
return ok(videoBlobRef);
390
+
}
256
391
}
257
392
258
393
export const newPublicClient = async (ident: ActorIdentifier): Promise<AtpClient> => {
···
263
398
return atp;
264
399
}
265
400
atp.atcute = new AtcuteClient({ handler: simpleFetchHandler({ service: didDoc.value.pds }) });
266
-
atp.user = { did: didDoc.value.did, handle: didDoc.value.handle };
401
+
atp.user = didDoc.value;
267
402
return atp;
268
403
};
269
404