recommendation algorithm tests
at main 2.8 kB view raw
1import argparse 2import asyncio 3from collections import Counter 4from typing import List 5 6from atproto import AtUri, Client, DidDocument 7from atproto_identity.did.resolver import DidResolver 8from microcosmClient import AsyncMicrocosmClient 9 10client = Client(base_url="https://bsky.social") 11resolver = DidResolver(plc_url="https://plc.directory") 12 13 14async def getRecs(uri: AtUri, n: int = 100, concurrency: int = 10) -> List[tuple]: 15 """Return top liked subject URIs as (uri, count) tuples for likers of the given target. 16 17 - Uses Microcosm to find distinct likers of `uri`. 18 - For each liker (repo), resolves the DID and calls the blocking AtProto repo.list_records in a thread. 19 """ 20 async with AsyncMicrocosmClient() as micro: 21 data = await micro.links_distinct_dids(uri, "app.bsky.feed.like", ".subject.uri") 22 23 likers = data.get("linking_dids", []) 24 if not likers: 25 return [] 26 27 semaphore = asyncio.Semaphore(concurrency) 28 29 async def fetch(repo: str): 30 async with semaphore: 31 try: 32 did: DidDocument | None = await asyncio.to_thread(resolver.resolve, repo) 33 if not did or not did.service: 34 return [] 35 await asyncio.to_thread(client.update_base_url, did.service[0].service_endpoint) 36 likes = await asyncio.to_thread(lambda: client.com.atproto.repo.list_records(params={"collection": "app.bsky.feed.like", "repo": repo, "limit": n})) 37 return [r["value"]["subject"]["uri"] for r in likes["records"]] 38 except Exception: 39 return [] 40 41 tasks = [asyncio.create_task(fetch(u)) for u in likers] 42 records: List[str] = [] 43 for t in asyncio.as_completed(tasks): 44 records.extend(await t) 45 46 records = [r for r in records if r and r != str(uri)] 47 c = Counter(records) 48 return c.most_common(5) 49 50 51def fromPost(post: str) -> AtUri: 52 handle, rkey = post.split("/post/") 53 handle = handle.split("https://bsky.app/profile/")[1] 54 did = client.com.atproto.identity.resolve_handle(params={"handle": handle}) 55 return AtUri.from_str(f"at://{did.did}/app.bsky.feed.post/{rkey}") 56 57 58def toPost(uri) -> str: 59 did = uri.split('//')[1].split('/')[0] 60 rkey = uri.split('.post/')[1] 61 return f"https://bsky.app/profile/{did}/post/{rkey}" 62 63 64async def main(post: str): 65 uri = fromPost(post) 66 recs = await getRecs(uri) 67 if not recs: 68 print("No recommendations found.") 69 return 70 for item, count in recs: 71 print(f"{toPost(item)}: {count}") 72 73 74if __name__ == "__main__": 75 parser = argparse.ArgumentParser() 76 parser.add_argument( 77 "post", nargs="?", default="https://bsky.app/profile/beverage2000.bsky.social/post/3lzpn5gewbc2e") 78 args = parser.parse_args() 79 asyncio.run(main(args.post))