how to use prefect
1from datetime import datetime
2from typing import Any
3
4from atproto import Client
5from prefect import flow, task
6from prefect.artifacts import create_markdown_artifact
7from prefect.cache_policies import INPUTS
8from pydantic import Field
9from pydantic_settings import BaseSettings, SettingsConfigDict
10
11
12class ATProtoSettings(BaseSettings):
13 model_config = SettingsConfigDict(env_file=".env", env_prefix="ATPROTO_")
14
15 username: str = Field(..., description="The username to authenticate as")
16 password: str = Field(..., description="The password to authenticate with")
17
18
19@task(name="authenticate-atproto")
20def authenticate_client(settings: ATProtoSettings) -> Client:
21 client = Client()
22 client.login(settings.username, settings.password)
23 return client
24
25
26@task(name="fetch-profile", cache_policy=INPUTS - "client")
27def fetch_user_profile(client: Client, handle: str) -> dict[str, Any]:
28 profile = client.get_profile(handle)
29 return {
30 "display_name": profile.display_name,
31 "description": profile.description,
32 "followers_count": profile.followers_count,
33 "follows_count": profile.follows_count,
34 "posts_count": profile.posts_count,
35 "handle": profile.handle,
36 "created_at": profile.created_at,
37 }
38
39
40@task(name="fetch-recent-posts", cache_policy=INPUTS - "client")
41def fetch_recent_posts(client: Client, handle: str, limit: int = 5) -> list[dict[str, Any]]:
42 posts = client.get_author_feed(actor=handle, limit=limit)
43 original_posts = []
44
45 for post_view in posts.feed:
46 # Skip reposts entirely - only include original content
47 if post_view.reason and hasattr(post_view.reason, "py_type") and "repost" in post_view.reason.py_type:
48 continue
49
50 original_posts.append(
51 {
52 "text": post_view.post.record.text,
53 "created_at": post_view.post.record.created_at,
54 "reply_count": post_view.post.reply_count or 0,
55 "repost_count": post_view.post.repost_count or 0,
56 "like_count": post_view.post.like_count or 0,
57 }
58 )
59
60 return original_posts
61
62
63@task(name="analyze-activity")
64def analyze_user_activity(profile: dict[str, Any], posts: list[dict[str, Any]]) -> dict[str, Any]:
65 if not posts:
66 return {"insight": "No recent posts to analyze"}
67
68 total_engagement = sum(post["reply_count"] + post["repost_count"] + post["like_count"] for post in posts)
69
70 most_engaged_post = max(posts, key=lambda p: p["reply_count"] + p["repost_count"] + p["like_count"])
71
72 return {
73 "total_posts_analyzed": len(posts),
74 "total_engagement": total_engagement,
75 "average_engagement_per_post": round(total_engagement / len(posts), 2),
76 "most_engaged_post_text": most_engaged_post["text"][:100] + "...",
77 "profile_summary": f"{profile['display_name']} has {profile['followers_count']} followers and {profile['posts_count']} total posts",
78 }
79
80
81@flow(name="atproto-user-insights")
82def atproto_user_insights_flow(handle: str | None = None) -> dict[str, Any]:
83 settings = ATProtoSettings()
84 client = authenticate_client(settings)
85
86 handle = handle or settings.username
87
88 profile = fetch_user_profile(client, handle)
89 posts = fetch_recent_posts(client, handle, limit=10)
90 insights = analyze_user_activity(profile, posts)
91
92 result = {
93 "timestamp": datetime.now().isoformat(),
94 "analyzed_user": handle,
95 "profile": profile,
96 "insights": insights,
97 }
98
99 create_markdown_artifact(
100 markdown=f"""# ATProto User Insights for @{handle}
101
102## Profile Overview
103**{result["profile"]["display_name"]}** (@{result["profile"]["handle"]})
104
105{result["profile"]["description"]}
106
107### Stats
108- **Followers:** {result["profile"]["followers_count"]:,}
109- **Following:** {result["profile"]["follows_count"]:,}
110- **Total Posts:** {result["profile"]["posts_count"]:,}
111- **Joined:** {result["profile"]["created_at"][:10]}
112
113## Engagement Analysis
114Based on {result["insights"]["total_posts_analyzed"]} recent posts:
115
116- **Total Engagement:** {result["insights"]["total_engagement"]} interactions
117- **Average per Post:** {result["insights"]["average_engagement_per_post"]} interactions
118
119### Most Engaged Post
120> {result["insights"]["most_engaged_post_text"]}
121
122---
123*Analysis completed at {result["timestamp"][:19]}*
124"""
125 )
126
127 return result
128
129
130if __name__ == "__main__":
131 atproto_user_insights_flow()