at main 4.5 kB view raw
1from datetime import datetime 2from typing import Any 3 4from atproto import Client 5from prefect import flow, task 6from prefect.artifacts import create_markdown_artifact 7from prefect.cache_policies import INPUTS 8from pydantic import Field 9from pydantic_settings import BaseSettings, SettingsConfigDict 10 11 12class ATProtoSettings(BaseSettings): 13 model_config = SettingsConfigDict(env_file=".env", env_prefix="ATPROTO_") 14 15 username: str = Field(..., description="The username to authenticate as") 16 password: str = Field(..., description="The password to authenticate with") 17 18 19@task(name="authenticate-atproto") 20def authenticate_client(settings: ATProtoSettings) -> Client: 21 client = Client() 22 client.login(settings.username, settings.password) 23 return client 24 25 26@task(name="fetch-profile", cache_policy=INPUTS - "client") 27def fetch_user_profile(client: Client, handle: str) -> dict[str, Any]: 28 profile = client.get_profile(handle) 29 return { 30 "display_name": profile.display_name, 31 "description": profile.description, 32 "followers_count": profile.followers_count, 33 "follows_count": profile.follows_count, 34 "posts_count": profile.posts_count, 35 "handle": profile.handle, 36 "created_at": profile.created_at, 37 } 38 39 40@task(name="fetch-recent-posts", cache_policy=INPUTS - "client") 41def fetch_recent_posts(client: Client, handle: str, limit: int = 5) -> list[dict[str, Any]]: 42 posts = client.get_author_feed(actor=handle, limit=limit) 43 original_posts = [] 44 45 for post_view in posts.feed: 46 # Skip reposts entirely - only include original content 47 if post_view.reason and hasattr(post_view.reason, "py_type") and "repost" in post_view.reason.py_type: 48 continue 49 50 original_posts.append( 51 { 52 "text": post_view.post.record.text, 53 "created_at": post_view.post.record.created_at, 54 "reply_count": post_view.post.reply_count or 0, 55 "repost_count": post_view.post.repost_count or 0, 56 "like_count": post_view.post.like_count or 0, 57 } 58 ) 59 60 return original_posts 61 62 63@task(name="analyze-activity") 64def analyze_user_activity(profile: dict[str, Any], posts: list[dict[str, Any]]) -> dict[str, Any]: 65 if not posts: 66 return {"insight": "No recent posts to analyze"} 67 68 total_engagement = sum(post["reply_count"] + post["repost_count"] + post["like_count"] for post in posts) 69 70 most_engaged_post = max(posts, key=lambda p: p["reply_count"] + p["repost_count"] + p["like_count"]) 71 72 return { 73 "total_posts_analyzed": len(posts), 74 "total_engagement": total_engagement, 75 "average_engagement_per_post": round(total_engagement / len(posts), 2), 76 "most_engaged_post_text": most_engaged_post["text"][:100] + "...", 77 "profile_summary": f"{profile['display_name']} has {profile['followers_count']} followers and {profile['posts_count']} total posts", 78 } 79 80 81@flow(name="atproto-user-insights") 82def atproto_user_insights_flow(handle: str | None = None) -> dict[str, Any]: 83 settings = ATProtoSettings() 84 client = authenticate_client(settings) 85 86 handle = handle or settings.username 87 88 profile = fetch_user_profile(client, handle) 89 posts = fetch_recent_posts(client, handle, limit=10) 90 insights = analyze_user_activity(profile, posts) 91 92 result = { 93 "timestamp": datetime.now().isoformat(), 94 "analyzed_user": handle, 95 "profile": profile, 96 "insights": insights, 97 } 98 99 create_markdown_artifact( 100 markdown=f"""# ATProto User Insights for @{handle} 101 102## Profile Overview 103**{result["profile"]["display_name"]}** (@{result["profile"]["handle"]}) 104 105{result["profile"]["description"]} 106 107### Stats 108- **Followers:** {result["profile"]["followers_count"]:,} 109- **Following:** {result["profile"]["follows_count"]:,} 110- **Total Posts:** {result["profile"]["posts_count"]:,} 111- **Joined:** {result["profile"]["created_at"][:10]} 112 113## Engagement Analysis 114Based on {result["insights"]["total_posts_analyzed"]} recent posts: 115 116- **Total Engagement:** {result["insights"]["total_engagement"]} interactions 117- **Average per Post:** {result["insights"]["average_engagement_per_post"]} interactions 118 119### Most Engaged Post 120> {result["insights"]["most_engaged_post_text"]} 121 122--- 123*Analysis completed at {result["timestamp"][:19]}* 124""" 125 ) 126 127 return result 128 129 130if __name__ == "__main__": 131 atproto_user_insights_flow()