Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9
10class FirehoseStream
11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13 def initialize
14 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
15 @service = 'bsky.social'
16
17 @show_progress = (@env == :development) ? true : false
18 @log_status = true
19 @log_posts = (@env == :development) ? :matching : false
20 @save_posts = (@env == :development) ? :all : :matching
21 @replay_events = (@env == :development) ? false : true
22
23 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
24 end
25
26 def start
27 return if @sky
28
29 last_cursor = load_or_init_cursor
30 cursor = @replay_events ? last_cursor : nil
31
32 @sky = Skyfall::Stream.new(@service, :subscribe_repos, cursor)
33
34 @sky.on_message do |m|
35 handle_message(m)
36 end
37
38 if @log_status
39 @sky.on_connecting { |u| puts "Connecting to #{u}..." }
40 @sky.on_connect {
41 @replaying = !!(cursor)
42 puts "Connected #{Time.now} ✓"
43 }
44 @sky.on_disconnect { puts; puts "Disconnected #{Time.now}" }
45 @sky.on_reconnect { puts "Connection lost, reconnecting..." }
46 @sky.on_error { |e| puts "ERROR: #{Time.now} #{e.class} #{e.message}" }
47 end
48
49 @sky.connect
50 end
51
52 def stop
53 @sky&.disconnect
54 @sky = nil
55 end
56
57 def load_or_init_cursor
58 if sub = Subscription.find_by(service: @service)
59 sub.cursor
60 else
61 Subscription.create!(service: @service, cursor: 0)
62 nil
63 end
64 end
65
66 def save_cursor(cursor)
67 Subscription.where(service: @service).update_all(cursor: cursor)
68 end
69
70 def handle_message(msg)
71 if msg.is_a?(Skyfall::InfoMessage)
72 # AtProto error, the only one right now is "OutdatedCursor"
73 puts "InfoMessage: #{msg}"
74 elsif msg.is_a?(Skyfall::HandleMessage)
75 # use these events if you want to track handle changes:
76 # puts "Handle change: #{msg.repo} => #{msg.handle}"
77 elsif msg.is_a?(Skyfall::UnknownMessage)
78 puts "Unknown message type: #{msg.type}"
79 end
80
81 return unless msg.type == :commit
82
83 if @replaying
84 puts "Replaying events since #{msg.time.getlocal} -->"
85 @replaying = false
86 end
87
88 if msg.seq % 10 == 0
89 save_cursor(msg.seq)
90 end
91
92 msg.operations.each do |op|
93 case op.type
94 when :bsky_post
95 process_post(msg, op)
96
97 when :bsky_like, :bsky_repost
98 # if you want to use the number of likes and/or reposts for filtering or sorting:
99 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
100
101 when :bsky_follow
102 # if you want to make a personalized feed that needs info about given user's follows/followers:
103 # add a followers table, then add/remove records here depending on op.action
104
105 else
106 # other types like :bsky_block, :bsky_profile (includes profile edits)
107 end
108 end
109 end
110
111 def process_post(msg, op)
112 if op.action == :delete
113 if post = Post.find_by(repo: op.repo, rkey: op.rkey)
114 post.destroy
115 end
116 end
117
118 return unless op.action == :create
119
120 begin
121 text = op.raw_record['text']
122
123 # tip: if you don't need full record data for debugging, delete the data column in posts
124 post = Post.new(
125 repo: op.repo,
126 time: msg.time,
127 text: text,
128 rkey: op.rkey,
129 data: JSON.generate(op.raw_record),
130 record: op.raw_record
131 )
132
133 matched = false
134
135 @feeds.each do |feed|
136 if feed.post_matches?(post)
137 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts
138 matched = true
139 end
140 end
141
142 if @log_posts == :all || @log_posts && matched
143 puts
144 puts text
145 end
146
147 post.save! if @save_posts == :all
148 rescue StandardError => e
149 puts "Error: #{e}"
150 p msg unless @env == :production || e.message == "nesting of 100 is too deep"
151 end
152
153 print '.' if @show_progress && @log_posts != :all
154 end
155end