+49
-1
main.py
+49
-1
main.py
···
1
-
.
1
+
from collections import Counter
2
+
3
+
import requests
4
+
from atproto import AtUri, Client
5
+
from atproto_identity.did.resolver import DidResolver
6
+
7
+
client = Client(base_url="https://bsky.social")
8
+
resolver = DidResolver(plc_url="https://plc.directory")
9
+
10
+
11
+
def getRecs(uri: AtUri, n=100):
12
+
# get the users that liked that item
13
+
# go get the n most recent liked items for each of those users
14
+
# return the most common items from that list
15
+
16
+
response = requests.get(
17
+
f"https://constellation.microcosm.blue/links/distinct-dids?target={uri}&collection=app.bsky.feed.like&path=.subject.uri&limit={n}", headers={'Accept': 'application/json'})
18
+
19
+
if response.status_code == 200:
20
+
data = response.json()
21
+
likers = data['linking_dids']
22
+
23
+
records = []
24
+
for user in likers:
25
+
didDoc = resolver.resolve(user)
26
+
client.update_base_url(didDoc.service[0].service_endpoint)
27
+
likes = client.com.atproto.repo.list_records(params={"collection": "app.bsky.feed.like",
28
+
"repo": user,
29
+
"limit": n})
30
+
for record in likes.records:
31
+
records.append(record.value.subject.uri)
32
+
33
+
c = Counter([r for r in records if r != str(uri)])
34
+
for item, count in c.most_common(10):
35
+
print(
36
+
f"https://bsky.app/profile/{item.split('at://')[1].split('/')[0] + '/post/' + item.split('/')[-1]}: {count}")
37
+
else:
38
+
print("Error fetching data:", response.status_code)
39
+
40
+
41
+
def fromPost(post: str) -> AtUri:
42
+
handle, rkey = post.split("/post/")
43
+
handle = handle.split("https://bsky.app/profile/")[1]
44
+
did = client.com.atproto.identity.resolve_handle(
45
+
params={"handle": handle})
46
+
return AtUri.from_str(f"at://{did.did}/app.bsky.feed.post/{rkey}")
47
+
48
+
49
+
getRecs(fromPost("https://bsky.app/profile/beverage2000.bsky.social/post/3lzpn5gewbc2e"))