Don't forget to lycansubscribe
1require 'skyfall'
2
3require_relative 'init'
4require_relative 'models/subscription'
5
6class FirehoseClient
7 attr_accessor :start_cursor, :service
8
9 DEFAULT_RELAY = 'bsky.network'
10
11 def initialize
12 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
13 @service = DEFAULT_RELAY
14 end
15
16 def start
17 return if @sky
18
19 log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
20
21 last_cursor = load_or_init_cursor
22 cursor = @start_cursor || last_cursor
23
24 @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
25 @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}"
26 @sky.check_heartbeat = true
27
28 @sky.on_message do |m|
29 start_time = Time.now
30 diff = start_time - @last_update
31
32 if diff > 30
33 log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
34 end
35
36 @last_update = start_time
37 process_message(m)
38 end
39
40 @sky.on_connecting { |u| log "Connecting to #{u}..." }
41 @sky.on_connect {
42 log "Connected ✓"
43
44 @replaying = true
45 @last_update = Time.now
46
47 @timer ||= EM::PeriodicTimer.new(20) do
48 now = Time.now
49 diff = now - @last_update
50
51 if diff > 30
52 log "Timer: last update #{sprintf('%.1f', diff)}s ago"
53 end
54 end
55 }
56
57 @sky.on_disconnect {
58 log "Disconnected."
59 }
60
61 @sky.on_reconnect {
62 log "Connection lost, reconnecting..."
63
64 @timer&.cancel
65 @timer = nil
66 }
67
68 @sky.on_timeout {
69 log "Trying to reconnect..."
70 }
71
72 @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
73
74 @sky.connect
75 end
76
77 def stop
78 save_cursor(@sky.cursor) unless @sky.nil?
79
80 @sky&.disconnect
81 @sky = nil
82 end
83
84 def load_or_init_cursor
85 if sub = Subscription.find_by(service: @service)
86 sub.cursor
87 else
88 Subscription.create!(service: @service, cursor: 0)
89 nil
90 end
91 end
92
93 def save_cursor(cursor)
94 Subscription.where(service: @service).update_all(cursor: cursor)
95 end
96
97 def process_message(msg)
98 save_cursor(msg.seq) if msg.seq % 1000 == 0
99
100 case msg.type
101 when :info
102 log "InfoMessage: #{msg}"
103 when :account
104 process_account_event(msg)
105 when :commit
106 if @replaying
107 log "Replaying events since #{msg.time.getlocal} -->"
108 @replaying = false
109 end
110
111 msg.operations.each do |op|
112 case op.type
113 when :bsky_post
114 # ...
115 end
116 end
117 end
118 end
119
120 def process_account_event(msg)
121 if msg.status == :deleted
122 # ...
123 end
124 end
125
126 def log(text)
127 puts "[#{Time.now}] #{text}"
128 end
129
130 def inspect
131 vars = instance_variables - [:@timer]
132 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
133 "#<#{self.class}:0x#{object_id} #{values}>"
134 end
135end