Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9
10class FirehoseStream
11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13 DEFAULT_RELAY = 'bsky.network'
14
15 def initialize(service = nil)
16 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
17 @service = service || DEFAULT_RELAY
18
19 @show_progress = (@env == :development) ? true : false
20 @log_status = true
21 @log_posts = (@env == :development) ? :matching : false
22 @save_posts = (@env == :development) ? :all : :matching
23 @replay_events = (@env == :development) ? false : true
24
25 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
26 end
27
28 def start
29 return if @sky
30
31 last_cursor = load_or_init_cursor
32 cursor = @replay_events ? last_cursor : nil
33
34 @sky = sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
35
36 # set your user agent here to identify yourself on the relay
37 # @sky.user_agent = "My Feed Server (@my.handle) #{@sky.version_string}"
38
39 @sky.check_heartbeat = true
40
41 @sky.on_message do |m|
42 process_message(m)
43 end
44
45 if @log_status
46 @sky.on_connecting { |u| log "Connecting to #{u}..." }
47
48 @sky.on_connect {
49 @replaying = !!(cursor)
50 log "Connected ✓"
51 }
52
53 @sky.on_disconnect {
54 log "Disconnected."
55 save_cursor(sky.cursor)
56 }
57
58 @sky.on_timeout { log "Trying to reconnect..." }
59 @sky.on_reconnect { log "Connection lost, reconnecting..." }
60 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
61 end
62
63 @sky.connect
64 end
65
66 def stop
67 @sky&.disconnect
68 @sky = nil
69 end
70
71 def load_or_init_cursor
72 if sub = Subscription.find_by(service: @service)
73 sub.cursor
74 else
75 Subscription.create!(service: @service, cursor: 0)
76 nil
77 end
78 end
79
80 def save_cursor(cursor)
81 Subscription.where(service: @service).update_all(cursor: cursor)
82 end
83
84 def process_message(msg)
85 if msg.type == :info
86 # AtProto error, the only one right now is "OutdatedCursor"
87 log "InfoMessage: #{msg}"
88
89 elsif msg.type == :identity
90 # use these events if you want to track handle changes:
91 # log "Handle change: #{msg.repo} => #{msg.handle}"
92
93 elsif msg.type == :account
94 # tracking account status changes, e.g. suspensions, deactivations and deletes
95 process_account_message(msg)
96
97 elsif msg.is_a?(Skyfall::Firehose::UnknownMessage)
98 log "Unknown message type: #{msg.type} (#{msg.seq})"
99 end
100
101 return unless msg.type == :commit
102
103 if @replaying
104 log "Replaying events since #{msg.time.getlocal} -->"
105 @replaying = false
106 end
107
108 if msg.seq % 10 == 0
109 save_cursor(msg.seq)
110 end
111
112 msg.operations.each do |op|
113 case op.type
114 when :bsky_post
115 process_post(msg, op)
116
117 when :bsky_like, :bsky_repost
118 # if you want to use the number of likes and/or reposts for filtering or sorting:
119 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
120
121 when :bsky_follow
122 # if you want to make a personalized feed that needs info about given user's follows/followers:
123 # add a followers table, then add/remove records here depending on op.action
124
125 else
126 # other types like :bsky_block, :bsky_profile (includes profile edits)
127 end
128 end
129 end
130
131 def process_account_message(msg)
132 if msg.status == :deleted
133 # delete all data we have stored about this account
134 FeedPost.joins(:post).where(post: { repo: msg.did }).delete_all
135 Post.where(repo: msg.did).delete_all
136 end
137 end
138
139 def process_post(msg, op)
140 if op.action == :delete
141 if post = Post.find_by_repo_rkey(op.repo, op.rkey)
142 post.destroy
143 end
144 end
145
146 return unless op.action == :create
147
148 begin
149 if op.raw_record.nil?
150 log "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
151 return
152 end
153 rescue CBOR::UnpackError => e
154 log "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
155 return
156 end
157
158 # ignore posts with past date from Twitter etc. imported using some kind of tool
159 begin
160 post_time = Time.parse(op.raw_record['createdAt'])
161 return if post_time < msg.time - 86400
162 rescue StandardError => e
163 log "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
164 return
165 end
166
167 text = op.raw_record['text']
168
169 # to save space, delete redundant post text and type from the saved data JSON
170 trimmed_record = op.raw_record.dup
171 trimmed_record.delete('$type')
172 trimmed_record.delete('text')
173 trimmed_json = JSON.generate(trimmed_record)
174
175 # tip: if you don't need full record data for debugging, delete the data column in posts
176 post = Post.new(
177 repo: op.repo,
178 time: msg.time,
179 text: text,
180 rkey: op.rkey,
181 data: trimmed_json,
182 record: op.raw_record
183 )
184
185 matched = false
186
187 @feeds.each do |feed|
188 if feed.post_matches?(post)
189 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts
190 matched = true
191 end
192 end
193
194 if @log_posts == :all || @log_posts && matched
195 puts
196 puts text
197 end
198
199 post.save! if @save_posts == :all
200
201 print '.' if @show_progress && @log_posts != :all
202 rescue StandardError => e
203 log "Error in #process_post: #{e}"
204
205 unless e.message == "nesting of 100 is too deep"
206 log msg.inspect
207 log e.backtrace.reject { |x| x.include?('/ruby/') }
208 end
209 end
210
211 def log(text)
212 puts if @show_progress
213 puts "[#{Time.now}] #{text}"
214 end
215
216 def inspect
217 vars = instance_variables - [:@feeds, :@timer]
218 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
219 "#<#{self.class}:0x#{object_id} #{values}>"
220 end
221end