Template of a custom feed generator service for the Bluesky network in Ruby
1require 'json'
2require 'sinatra/activerecord'
3require 'skyfall'
4
5require_relative 'config'
6require_relative 'models/feed_post'
7require_relative 'models/post'
8require_relative 'models/subscription'
9
10class FirehoseStream
11 attr_accessor :show_progress, :log_status, :log_posts, :save_posts, :replay_events
12
13 DEFAULT_RELAY = 'bsky.network'
14
15 def initialize(service = nil)
16 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
17 @service = service || DEFAULT_RELAY
18
19 @show_progress = (@env == :development) ? true : false
20 @log_status = true
21 @log_posts = (@env == :development) ? :matching : false
22 @save_posts = (@env == :development) ? :all : :matching
23 @replay_events = (@env == :development) ? false : true
24
25 @feeds = BlueFactory.all_feeds.select(&:is_updating?)
26 end
27
28 def start
29 return if @sky
30
31 last_cursor = load_or_init_cursor
32 cursor = @replay_events ? last_cursor : nil
33
34 @sky = sky = Skyfall::Stream.new(@service, :subscribe_repos, cursor)
35
36 @sky.on_message do |m|
37 process_message(m)
38 end
39
40 if @log_status
41 @sky.on_connecting { |u| puts "Connecting to #{u}..." }
42 @sky.on_connect {
43 @replaying = !!(cursor)
44 puts "Connected #{Time.now} ✓"
45 }
46 @sky.on_disconnect {
47 puts
48 puts "Disconnected #{Time.now}"
49 save_cursor(sky.cursor)
50 }
51 @sky.on_reconnect { puts "Connection lost, reconnecting..." }
52 @sky.on_error { |e| puts "ERROR: #{Time.now} #{e.class} #{e.message}" }
53 end
54
55 @sky.connect
56 end
57
58 def stop
59 @sky&.disconnect
60 @sky = nil
61 end
62
63 def load_or_init_cursor
64 if sub = Subscription.find_by(service: @service)
65 sub.cursor
66 else
67 Subscription.create!(service: @service, cursor: 0)
68 nil
69 end
70 end
71
72 def save_cursor(cursor)
73 Subscription.where(service: @service).update_all(cursor: cursor)
74 end
75
76 def process_message(msg)
77 if msg.is_a?(Skyfall::InfoMessage)
78 # AtProto error, the only one right now is "OutdatedCursor"
79 puts "InfoMessage: #{msg}"
80 elsif msg.is_a?(Skyfall::HandleMessage)
81 # use these events if you want to track handle changes:
82 # puts "Handle change: #{msg.repo} => #{msg.handle}"
83 elsif msg.is_a?(Skyfall::UnknownMessage)
84 puts "Unknown message type: #{msg.type} (#{msg.seq})"
85 end
86
87 return unless msg.type == :commit
88
89 if @replaying
90 puts "Replaying events since #{msg.time.getlocal} -->"
91 @replaying = false
92 end
93
94 if msg.seq % 10 == 0
95 save_cursor(msg.seq)
96 end
97
98 msg.operations.each do |op|
99 case op.type
100 when :bsky_post
101 process_post(msg, op)
102
103 when :bsky_like, :bsky_repost
104 # if you want to use the number of likes and/or reposts for filtering or sorting:
105 # add a likes/reposts column to feeds, then do +1 / -1 here depending on op.action
106
107 when :bsky_follow
108 # if you want to make a personalized feed that needs info about given user's follows/followers:
109 # add a followers table, then add/remove records here depending on op.action
110
111 else
112 # other types like :bsky_block, :bsky_profile (includes profile edits)
113 end
114 end
115 end
116
117 def process_post(msg, op)
118 if op.action == :delete
119 if post = Post.find_by(repo: op.repo, rkey: op.rkey)
120 post.destroy
121 end
122 end
123
124 return unless op.action == :create
125
126 begin
127 if op.raw_record.nil?
128 puts "Error: missing expected record data in operation: #{op.uri} (#{msg.seq})"
129 return
130 end
131 rescue CBOR::UnpackError => e
132 puts "Error: couldn't decode record data for #{op.uri} (#{msg.seq}): #{e}"
133 return
134 end
135
136 # ignore posts with past date from Twitter etc. imported using some kind of tool
137 begin
138 post_time = Time.parse(op.raw_record['createdAt'])
139 return if post_time < msg.time - 86400
140 rescue StandardError => e
141 puts "Skipping post with invalid timestamp: #{op.raw_record['createdAt'].inspect} (#{op.repo}, #{msg.seq})"
142 return
143 end
144
145 text = op.raw_record['text']
146
147 # to save space, delete redundant post text and type from the saved data JSON
148 trimmed_record = op.raw_record.dup
149 trimmed_record.delete('$type')
150 trimmed_record.delete('text')
151 trimmed_json = JSON.generate(trimmed_record)
152
153 # tip: if you don't need full record data for debugging, delete the data column in posts
154 post = Post.new(
155 repo: op.repo,
156 time: msg.time,
157 text: text,
158 rkey: op.rkey,
159 data: trimmed_json,
160 record: op.raw_record
161 )
162
163 matched = false
164
165 @feeds.each do |feed|
166 if feed.post_matches?(post)
167 FeedPost.create!(feed_id: feed.feed_id, post: post, time: msg.time) unless !@save_posts
168 matched = true
169 end
170 end
171
172 if @log_posts == :all || @log_posts && matched
173 puts
174 puts text
175 end
176
177 post.save! if @save_posts == :all
178
179 print '.' if @show_progress && @log_posts != :all
180 rescue StandardError => e
181 puts "Error in #process_post: #{e}"
182
183 unless e.message == "nesting of 100 is too deep"
184 p msg
185 puts e.backtrace.reject { |x| x.include?('/ruby/') }
186 end
187 end
188
189 def inspect
190 vars = instance_variables - [:@feeds, :@timer]
191 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
192 "#<#{self.class}:0x#{object_id} #{values}>"
193 end
194end