+2
-2
consumer/src/backfill/mod.rs
+2
-2
consumer/src/backfill/mod.rs
···
336
336
337
337
#[derive(Debug, Default)]
338
338
struct CopyStore {
339
-
likes: Vec<(String, records::StrongRef, DateTime<Utc>)>,
339
+
likes: Vec<(String, records::StrongRef, Option<records::StrongRef>, DateTime<Utc>)>,
340
340
posts: Vec<(String, Cid, records::AppBskyFeedPost)>,
341
-
reposts: Vec<(String, records::StrongRef, DateTime<Utc>)>,
341
+
reposts: Vec<(String, records::StrongRef, Option<records::StrongRef>, DateTime<Utc>)>,
342
342
blocks: Vec<(String, String, DateTime<Utc>)>,
343
343
follows: Vec<(String, String, DateTime<Utc>)>,
344
344
list_items: Vec<(String, records::AppBskyGraphListItem)>,
+2
-2
consumer/src/backfill/repo.rs
+2
-2
consumer/src/backfill/repo.rs
···
131
131
deltas.incr(&rec.subject.uri, AggregateType::Like).await;
132
132
133
133
copies.push_record(&at_uri, cid);
134
-
copies.likes.push((at_uri, rec.subject, rec.created_at));
134
+
copies.likes.push((at_uri, rec.subject, rec.via, rec.created_at));
135
135
}
136
136
RecordTypes::AppBskyFeedPost(rec) => {
137
137
let maybe_reply = rec.reply.as_ref().map(|v| v.parent.uri.clone());
···
167
167
deltas.incr(&rec.subject.uri, AggregateType::Repost).await;
168
168
169
169
copies.push_record(&at_uri, cid);
170
-
copies.reposts.push((at_uri, rec.subject, rec.created_at));
170
+
copies.reposts.push((at_uri, rec.subject, rec.via, rec.created_at));
171
171
}
172
172
RecordTypes::AppBskyGraphBlock(rec) => {
173
173
copies.push_record(&at_uri, cid);
+15
-5
consumer/src/db/copy.rs
+15
-5
consumer/src/db/copy.rs
···
14
14
Type::TEXT,
15
15
Type::TEXT,
16
16
Type::TEXT,
17
+
Type::TEXT,
18
+
Type::TEXT,
17
19
Type::TIMESTAMP,
18
20
];
19
-
type StrongRefRow = (String, records::StrongRef, DateTime<Utc>);
21
+
type StrongRefRow = (String, records::StrongRef, Option<records::StrongRef>, DateTime<Utc>);
20
22
21
23
// SubjectRefs are used in both blocks and follows
22
24
const SUBJECT_TYPES: &[Type] = &[Type::TEXT, Type::TEXT, Type::TEXT, Type::TIMESTAMP];
···
39
41
40
42
let writer = conn
41
43
.copy_in(
42
-
"COPY likes_tmp (at_uri, did, subject, subject_cid, created_at) FROM STDIN (FORMAT binary)",
44
+
"COPY likes_tmp (at_uri, did, subject, subject_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)",
43
45
)
44
46
.await?;
45
47
let writer = BinaryCopyInWriter::new(writer, STRONGREF_TYPES);
···
47
49
pin_mut!(writer);
48
50
49
51
for row in data {
52
+
let (via_uri, via_cid) = strongref_to_parts(row.2.as_ref());
53
+
50
54
let writer = writer.as_mut();
51
55
writer
52
56
.write(&[
···
54
58
&did,
55
59
&row.1.uri,
56
60
&row.1.cid.to_string(),
57
-
&row.2.naive_utc(),
61
+
&via_uri,
62
+
&via_cid,
63
+
&row.3.naive_utc(),
58
64
])
59
65
.await?;
60
66
}
···
82
88
83
89
let writer = conn
84
90
.copy_in(
85
-
"COPY reposts_tmp (at_uri, did, post, post_cid, created_at) FROM STDIN (FORMAT binary)",
91
+
"COPY reposts_tmp (at_uri, did, post, post_cid, via_uri, via_cid, created_at) FROM STDIN (FORMAT binary)",
86
92
)
87
93
.await?;
88
94
let writer = BinaryCopyInWriter::new(writer, STRONGREF_TYPES);
···
90
96
pin_mut!(writer);
91
97
92
98
for row in data {
99
+
let (via_uri, via_cid) = strongref_to_parts(row.2.as_ref());
100
+
93
101
let writer = writer.as_mut();
94
102
writer
95
103
.write(&[
···
97
105
&did,
98
106
&row.1.uri,
99
107
&row.1.cid.to_string(),
100
-
&row.2.naive_utc(),
108
+
&via_uri,
109
+
&via_cid,
110
+
&row.3.naive_utc(),
101
111
])
102
112
.await?;
103
113
}
+9
-3
consumer/src/db/record.rs
+9
-3
consumer/src/db/record.rs
···
156
156
repo: &str,
157
157
rec: AppBskyFeedLike,
158
158
) -> PgExecResult {
159
+
let (via_uri, via_cid) = strongref_to_parts(rec.via.as_ref());
160
+
159
161
conn.execute(
160
-
"INSERT INTO likes (at_uri, did, subject, subject_cid, created_at) VALUES ($1, $2, $3, $4, $5)",
161
-
&[&at_uri, &repo, &rec.subject.uri, &rec.subject.cid.to_string(), &rec.created_at]
162
+
"INSERT INTO likes (at_uri, did, subject, subject_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)",
163
+
&[&at_uri, &repo, &rec.subject.uri, &rec.subject.cid.to_string(), &via_uri, &via_cid, &rec.created_at]
162
164
).await
163
165
}
164
166
···
523
525
repo: &str,
524
526
rec: AppBskyFeedRepost,
525
527
) -> PgExecResult {
528
+
let (via_uri, via_cid) = strongref_to_parts(rec.via.as_ref());
529
+
526
530
conn.execute(
527
-
"INSERT INTO reposts (at_uri, did, post, post_cid, created_at) VALUES ($1, $2, $3, $4, $5)",
531
+
"INSERT INTO reposts (at_uri, did, post, post_cid, via_uri, via_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7)",
528
532
&[
529
533
&at_uri,
530
534
&repo,
531
535
&rec.subject.uri,
532
536
&rec.subject.cid.to_string(),
537
+
&via_uri,
538
+
&via_cid,
533
539
&rec.created_at,
534
540
],
535
541
)
+2
consumer/src/indexer/records.rs
+2
consumer/src/indexer/records.rs
···
206
206
pub struct AppBskyFeedLike {
207
207
pub subject: StrongRef,
208
208
pub created_at: DateTime<Utc>,
209
+
pub via: Option<StrongRef>,
209
210
}
210
211
211
212
#[derive(Debug, Deserialize, Serialize)]
···
260
261
pub struct AppBskyFeedRepost {
261
262
pub subject: StrongRef,
262
263
pub created_at: DateTime<Utc>,
264
+
pub via: Option<StrongRef>,
263
265
}
264
266
265
267
#[derive(Debug, Deserialize, Serialize)]
+2
migrations/2025-06-18-200415_like-repost-via/down.sql
+2
migrations/2025-06-18-200415_like-repost-via/down.sql
+7
migrations/2025-06-18-200415_like-repost-via/up.sql
+7
migrations/2025-06-18-200415_like-repost-via/up.sql
+4
parakeet-db/src/schema.rs
+4
parakeet-db/src/schema.rs
···
125
125
subject_cid -> Text,
126
126
created_at -> Timestamptz,
127
127
indexed_at -> Timestamp,
128
+
via_uri -> Nullable<Text>,
129
+
via_cid -> Nullable<Text>,
128
130
}
129
131
}
130
132
···
283
285
post_cid -> Text,
284
286
created_at -> Timestamptz,
285
287
indexed_at -> Timestamp,
288
+
via_uri -> Nullable<Text>,
289
+
via_cid -> Nullable<Text>,
286
290
}
287
291
}
288
292