A tool for adding all Bluesky users with a matching handle to a user list
1require 'didkit'
2require 'fileutils'
3require 'json'
4require 'minisky'
5require 'set'
6require 'skyfall/jetstream'
7require 'skyfall/version'
8require 'yaml'
9
10class Sync
11 class ConfigError < StandardError
12 end
13
14 def initialize(config_file: 'config/config.yml', data_file: 'data/data.json', auth_file: 'config/auth.yml')
15 log "Initializing..."
16
17 @config = load_config(config_file)
18 @data = load_data(data_file)
19 @data_file = data_file
20 @sky = init_minisky(auth_file)
21
22 @handle_regexps = @config['handle_patterns'].map { |x| regexp_from_pattern(x) }
23 @list_uri = "at://#{@sky.user.did}/app.bsky.graph.list/#{@config['list_key']}"
24
25 @members = @data['list_members'] ? Set.new(@data['list_members']) : fetch_list_members
26 end
27
28 def start
29 @jetstream = Skyfall::Jetstream.new(@config['jetstream_host'], {
30 wanted_collections: 'app.bsky.none',
31 cursor: @data['cursor']
32 })
33
34 @jetstream.check_heartbeat = true
35 @jetstream.heartbeat_timeout = 120
36 @jetstream.heartbeat_interval = 20
37
38 @jetstream.on_connecting { |u| log "Connecting to #{u}..." }
39 @jetstream.on_connect { log "Connected ✓" }
40 @jetstream.on_disconnect { log "Disconnected." }
41 @jetstream.on_reconnect { log "Reconnecting..." }
42 @jetstream.on_error { |e| log "ERROR: #{e} #{e.message}" }
43
44 @jetstream.on_message do |msg|
45 if msg.type == :identity && msg.handle
46 process_identity(msg)
47 end
48 end
49
50 @jetstream.connect
51 end
52
53 def stop
54 save_data
55 @jetstream.disconnect
56 end
57
58 def log(s)
59 puts "#{Time.now}: #{s}"
60 $stdout.flush
61 end
62
63
64 private
65
66 def process_identity(msg)
67 return unless @handle_regexps.any? { |r| msg.handle =~ r }
68 return if @members.include?(msg.did)
69
70 did = DID.resolve_handle(msg.handle)
71
72 if did.nil? || did.to_s != msg.did
73 log "Error: @#{msg.handle} does not resolve to #{msg.did}"
74 return
75 end
76
77 add_did_to_list(msg.did)
78 log "Added account to list: @#{msg.handle} (#{msg.did})"
79 end
80
81 def add_did_to_list(did)
82 @sky.post_request('com.atproto.repo.createRecord', {
83 repo: @sky.user.did,
84 collection: 'app.bsky.graph.listitem',
85 record: {
86 subject: did,
87 list: @list_uri,
88 createdAt: Time.now.iso8601
89 }
90 })
91
92 @members << did
93 save_data
94 end
95
96 def fetch_list_members
97 @sky.check_access
98
99 print "#{Time.now}: Syncing current list items: "
100 records = @sky.fetch_all('com.atproto.repo.listRecords',
101 { repo: @sky.user.did, collection: 'app.bsky.graph.listitem' },
102 field: 'records', progress: '.')
103
104 members = records.map { |x| x['value'] }.select { |x| x['list'] == @list_uri }.map { |x| x['subject'] }.uniq
105 puts " #{members.length} ✓"
106
107 Set.new(members)
108 end
109
110 def load_config(config_file)
111 config_path = File.join(__dir__, '..', config_file)
112
113 if !File.exist?(config_path)
114 raise ConfigError, "Missing config file at #{config_file}"
115 end
116
117 config = YAML.load(File.read(config_path))
118
119 jetstream = config['jetstream_host']
120
121 if jetstream.nil?
122 raise ConfigError, "Missing 'jetstream_host' field in the config file"
123 end
124
125 if !jetstream.is_a?(String) || jetstream.strip.empty?
126 raise ConfigError, "Invalid 'jetstream_host' field in the config file (should be a string): #{jetstream.inspect}"
127 end
128
129 patterns = config['handle_patterns']
130
131 if patterns.nil?
132 raise ConfigError, "Missing 'handle_patterns' field in the config file"
133 end
134
135 if !patterns.is_a?(Array) || patterns.empty? || !patterns.all? { |p| p.is_a?(String) }
136 raise ConfigError, "Invalid 'handle_patterns' field in the config file (should be an array of strings)"
137 end
138
139 rkey = config['list_key']
140
141 if rkey.nil?
142 raise ConfigError, "Missing 'list_key' field in the config file"
143 end
144
145 if !rkey.is_a?(String) || rkey.length != 13
146 raise ConfigError, "Invalid 'list_key' field in the config file (should be a 13-character string)"
147 end
148
149 config
150 end
151
152 def load_data(data_file)
153 data_path = File.join(__dir__, '..', data_file)
154
155 File.exist?(data_path) ? JSON.parse(File.read(data_path)) : {}
156 end
157
158 def save_data
159 @data['cursor'] = @jetstream.cursor
160 @data['list_members'] = @members.to_a
161
162 data_path = File.join(__dir__, '..', @data_file)
163 FileUtils.mkdir_p(File.dirname(data_path))
164 File.write(data_path, JSON.pretty_generate(@data))
165 end
166
167 def init_minisky(auth_file)
168 auth_path = File.join(__dir__, '..', auth_file)
169
170 if !File.exist?(auth_path)
171 raise ConfigError, "Missing auth file at #{auth_file}"
172 end
173
174 data = YAML.load(File.read(auth_path))
175
176 if data['id'].nil?
177 raise ConfigError, "Missing 'id' field in the auth file"
178 end
179
180 did = if data['did']
181 DID.new(data['did'])
182 elsif data['id'] =~ /^did:/
183 DID.new(data['id'])
184 else
185 DID.resolve_handle(data['id'])
186 end
187
188 if did.nil?
189 raise ConfigError, "Couldn't resolve handle: @#{data['id']}"
190 end
191
192 pds = did.get_document.pds_endpoint.gsub('https://', '')
193
194 sky = Minisky.new(pds, auth_path)
195 sky.check_access
196 sky
197 end
198
199 def regexp_from_pattern(s)
200 Regexp.new("\\A" + s.gsub('.', "\\.").gsub('*', ".+") + "\\z")
201 end
202end