+1
.gitignore
+1
.gitignore
···
1
+
__pycache__/
+59
-29
main.py
+59
-29
main.py
···
1
+
import argparse
2
+
import asyncio
1
3
from collections import Counter
4
+
from typing import List
2
5
3
-
import requests
4
-
from atproto import AtUri, Client
6
+
from atproto import AtUri, Client, DidDocument
5
7
from atproto_identity.did.resolver import DidResolver
8
+
from microcosmClient import AsyncMicrocosmClient
6
9
7
10
client = Client(base_url="https://bsky.social")
8
11
resolver = DidResolver(plc_url="https://plc.directory")
9
12
10
13
11
-
def getRecs(uri: AtUri, n=100):
12
-
# get the users that liked that item
13
-
# go get the n most recent liked items for each of those users
14
-
# return the most common items from that list
14
+
async def getRecs(uri: AtUri, n: int = 100, concurrency: int = 10) -> List[tuple]:
15
+
"""Return top liked subject URIs as (uri, count) tuples for likers of the given target.
15
16
16
-
response = requests.get(
17
-
f"https://constellation.microcosm.blue/links/distinct-dids?target={uri}&collection=app.bsky.feed.like&path=.subject.uri&limit={n}", headers={'Accept': 'application/json'})
17
+
- Uses Microcosm to find distinct likers of `uri`.
18
+
- For each liker (repo), resolves the DID and calls the blocking AtProto repo.list_records in a thread.
19
+
"""
20
+
async with AsyncMicrocosmClient() as micro:
21
+
data = await micro.links_distinct_dids(uri, "app.bsky.feed.like", ".subject.uri")
18
22
19
-
if response.status_code == 200:
20
-
data = response.json()
21
-
likers = data['linking_dids']
23
+
likers = data.get("linking_dids", [])
24
+
if not likers:
25
+
return []
22
26
23
-
records = []
24
-
for user in likers:
25
-
didDoc = resolver.resolve(user)
26
-
client.update_base_url(didDoc.service[0].service_endpoint)
27
-
likes = client.com.atproto.repo.list_records(params={"collection": "app.bsky.feed.like",
28
-
"repo": user,
29
-
"limit": n})
30
-
for record in likes.records:
31
-
records.append(record.value.subject.uri)
27
+
semaphore = asyncio.Semaphore(concurrency)
32
28
33
-
c = Counter([r for r in records if r != str(uri)])
34
-
for item, count in c.most_common(10):
35
-
print(
36
-
f"https://bsky.app/profile/{item.split('at://')[1].split('/')[0] + '/post/' + item.split('/')[-1]}: {count}")
37
-
else:
38
-
print("Error fetching data:", response.status_code)
29
+
async def fetch(repo: str):
30
+
async with semaphore:
31
+
try:
32
+
did: DidDocument | None = await asyncio.to_thread(resolver.resolve, repo)
33
+
if not did or not did.service:
34
+
return []
35
+
await asyncio.to_thread(client.update_base_url, did.service[0].service_endpoint)
36
+
likes = await asyncio.to_thread(lambda: client.com.atproto.repo.list_records(params={"collection": "app.bsky.feed.like", "repo": repo, "limit": n}))
37
+
return [r["value"]["subject"]["uri"] for r in likes["records"]]
38
+
except Exception:
39
+
return []
40
+
41
+
tasks = [asyncio.create_task(fetch(u)) for u in likers]
42
+
records: List[str] = []
43
+
for t in asyncio.as_completed(tasks):
44
+
records.extend(await t)
45
+
46
+
records = [r for r in records if r and r != str(uri)]
47
+
c = Counter(records)
48
+
return c.most_common(5)
39
49
40
50
41
51
def fromPost(post: str) -> AtUri:
42
52
handle, rkey = post.split("/post/")
43
53
handle = handle.split("https://bsky.app/profile/")[1]
44
-
did = client.com.atproto.identity.resolve_handle(
45
-
params={"handle": handle})
54
+
did = client.com.atproto.identity.resolve_handle(params={"handle": handle})
46
55
return AtUri.from_str(f"at://{did.did}/app.bsky.feed.post/{rkey}")
47
56
48
57
49
-
getRecs(fromPost("https://bsky.app/profile/beverage2000.bsky.social/post/3lzpn5gewbc2e"))
58
+
def toPost(uri) -> str:
59
+
did = uri.split('//')[1].split('/')[0]
60
+
rkey = uri.split('.post/')[1]
61
+
return f"https://bsky.app/profile/{did}/post/{rkey}"
62
+
63
+
64
+
async def main(post: str):
65
+
uri = fromPost(post)
66
+
recs = await getRecs(uri)
67
+
if not recs:
68
+
print("No recommendations found.")
69
+
return
70
+
for item, count in recs:
71
+
print(f"{toPost(item)}: {count}")
72
+
73
+
74
+
if __name__ == "__main__":
75
+
parser = argparse.ArgumentParser()
76
+
parser.add_argument(
77
+
"post", nargs="?", default="https://bsky.app/profile/beverage2000.bsky.social/post/3lzpn5gewbc2e")
78
+
args = parser.parse_args()
79
+
asyncio.run(main(args.post))