Don't forget to lycansubscribe

Compare changes

Choose any two refs to compare.

+10
.env.example
··· 1 + # hostname on which you're running the Lycan server 2 + SERVER_HOSTNAME=lycan.feeds.blue 3 + 4 + # customizing servers we connect to 5 + RELAY_HOST=bsky.network 6 + # or: JETSTREAM_HOST=jetstream2.us-east.bsky.network 7 + APPVIEW_HOST=public.api.bsky.app 8 + 9 + # put your handle here 10 + # FIREHOSE_USER_AGENT="Lycan (@my.handle)"
+1
.gitignore
··· 1 + .env 1 2 log
+4 -3
Gemfile
··· 2 2 3 3 gem 'activerecord', '~> 7.2' 4 4 gem 'sinatra-activerecord', '~> 2.0' 5 + gem 'sinatra' 5 6 gem 'pg' 6 7 gem 'rake' 7 8 gem 'irb' 9 + 8 10 gem 'rainbow' 9 - 10 - gem 'sinatra' 11 + gem 'dotenv' 11 12 12 13 gem 'minisky', '~> 0.5' 13 14 gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit' ··· 17 18 gem 'jwt' 18 19 19 20 group :development do 20 - gem 'puma' 21 + gem 'thin' 21 22 gem 'rackup' 22 23 gem 'capistrano', '~> 2.0' 23 24
+41 -34
Gemfile.lock
··· 7 7 GEM 8 8 remote: https://rubygems.org/ 9 9 specs: 10 - activemodel (7.2.2.2) 11 - activesupport (= 7.2.2.2) 12 - activerecord (7.2.2.2) 13 - activemodel (= 7.2.2.2) 14 - activesupport (= 7.2.2.2) 10 + activemodel (7.2.3) 11 + activesupport (= 7.2.3) 12 + activerecord (7.2.3) 13 + activemodel (= 7.2.3) 14 + activesupport (= 7.2.3) 15 15 timeout (>= 0.4.0) 16 - activesupport (7.2.2.2) 16 + activesupport (7.2.3) 17 17 base64 18 18 benchmark (>= 0.3) 19 19 bigdecimal ··· 29 29 base58 (0.2.3) 30 30 base64 (0.3.0) 31 31 bcrypt_pbkdf (1.1.1) 32 - benchmark (0.4.1) 33 - bigdecimal (3.2.2) 32 + benchmark (0.5.0) 33 + bigdecimal (3.3.1) 34 34 capistrano (2.15.11) 35 35 highline 36 36 net-scp (>= 1.0.0) ··· 39 39 net-ssh-gateway (>= 1.1.0) 40 40 cbor (0.5.10.1) 41 41 concurrent-ruby (1.3.5) 42 - connection_pool (2.5.3) 43 - date (3.4.1) 42 + connection_pool (2.5.4) 43 + daemons (1.4.1) 44 + date (3.5.0) 45 + dotenv (3.1.8) 44 46 drb (2.2.3) 45 47 ed25519 (1.4.0) 46 - erb (5.0.2) 48 + erb (6.0.0) 47 49 eventmachine (1.2.7) 48 50 faye-websocket (0.12.0) 49 51 eventmachine (>= 0.12.0) ··· 53 55 i18n (1.14.7) 54 56 concurrent-ruby (~> 1.0) 55 57 io-console (0.8.1) 56 - irb (1.15.2) 58 + irb (1.15.3) 57 59 pp (>= 0.6.0) 58 60 rdoc (>= 4.0.0) 59 61 reline (>= 0.4.2) ··· 62 64 logger (1.7.0) 63 65 minisky (0.5.0) 64 66 base64 (~> 0.1) 65 - minitest (5.25.5) 67 + minitest (5.26.1) 66 68 mustermann (3.0.4) 67 69 ruby2_keywords (~> 0.0.1) 68 70 net-scp (4.1.0) ··· 72 74 net-ssh (7.3.0) 73 75 net-ssh-gateway (2.0.0) 74 76 net-ssh (>= 4.0.0) 75 - nio4r (2.7.4) 76 - pg (1.6.1) 77 - pg (1.6.1-aarch64-linux) 78 - pg (1.6.1-aarch64-linux-musl) 79 - pg (1.6.1-arm64-darwin) 80 - pg (1.6.1-x86_64-darwin) 81 - pg (1.6.1-x86_64-linux) 82 - pg (1.6.1-x86_64-linux-musl) 83 - pp (0.6.2) 77 + pg (1.6.2) 78 + pg (1.6.2-aarch64-linux) 79 + pg (1.6.2-aarch64-linux-musl) 80 + pg (1.6.2-arm64-darwin) 81 + pg (1.6.2-x86_64-darwin) 82 + pg (1.6.2-x86_64-linux) 83 + pg (1.6.2-x86_64-linux-musl) 84 + pp (0.6.3) 84 85 prettyprint 85 86 prettyprint (0.2.0) 86 87 psych (5.2.6) 87 88 date 88 89 stringio 89 - puma (6.6.1) 90 - nio4r (~> 2.0) 91 - rack (3.2.0) 92 - rack-protection (4.1.1) 90 + rack (3.2.4) 91 + rack-protection (4.2.1) 93 92 base64 (>= 0.1.0) 94 93 logger (>= 1.6.0) 95 94 rack (>= 3.0.0, < 4) ··· 99 98 rackup (2.2.1) 100 99 rack (>= 3) 101 100 rainbow (3.1.1) 102 - rake (13.3.0) 103 - rdoc (6.14.2) 101 + rake (13.3.1) 102 + rdoc (6.15.1) 104 103 erb 105 104 psych (>= 4.0.0) 106 - reline (0.6.2) 105 + tsort 106 + reline (0.6.3) 107 107 io-console (~> 0.5) 108 108 ruby2_keywords (0.0.5) 109 109 securerandom (0.4.1) 110 - sinatra (4.1.1) 110 + sinatra (4.2.1) 111 111 logger (>= 1.6.0) 112 112 mustermann (~> 3.0) 113 113 rack (>= 3.0.0, < 4) 114 - rack-protection (= 4.1.1) 114 + rack-protection (= 4.2.1) 115 115 rack-session (>= 2.0.0, < 3) 116 116 tilt (~> 2.0) 117 117 sinatra-activerecord (2.0.28) ··· 123 123 cbor (~> 0.5, >= 0.5.9.6) 124 124 eventmachine (~> 1.2, >= 1.2.7) 125 125 faye-websocket (~> 0.12) 126 - stringio (3.1.7) 126 + stringio (3.1.8) 127 + thin (2.0.1) 128 + daemons (~> 1.0, >= 1.0.9) 129 + eventmachine (~> 1.0, >= 1.0.4) 130 + logger 131 + rack (>= 1, < 4) 127 132 tilt (2.6.1) 128 - timeout (0.4.3) 133 + timeout (0.4.4) 134 + tsort (0.2.0) 129 135 tzinfo (2.0.6) 130 136 concurrent-ruby (~> 1.0) 131 137 websocket-driver (0.8.0) ··· 148 154 bcrypt_pbkdf (>= 1.0, < 2.0) 149 155 capistrano (~> 2.0) 150 156 didkit (~> 0.2)! 157 + dotenv 151 158 ed25519 (>= 1.2, < 2.0) 152 159 irb 153 160 jwt 154 161 minisky (~> 0.5) 155 162 pg 156 - puma 157 163 rackup 158 164 rainbow 159 165 rake 160 166 sinatra 161 167 sinatra-activerecord (~> 2.0) 162 168 skyfall (~> 0.6) 169 + thin 163 170 164 171 BUNDLED WITH 165 172 2.7.0
+3
Procfile
··· 1 + server: bin/server 2 + firehose: bin/firehose 3 + worker: bin/worker
+110
README.md
··· 1 + # Lycan 🐺 2 + 3 + A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive. 4 + 5 + 6 + ## How it works 7 + 8 + Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose. 9 + 10 + At the moment, Lycan indexes four types of content: 11 + 12 + - posts you've liked 13 + - posts you've reposted 14 + - posts you've quoted 15 + - your old-style bookmarks (using the 📌 emoji method) 16 + 17 + New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added. 18 + 19 + Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API – the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)). 20 + 21 + The service consists of three separate components: 22 + 23 + - a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported 24 + - a **background worker**, which runs the import process 25 + - an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying 26 + 27 + 28 + ## Setting up on localhost 29 + 30 + This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it. 31 + 32 + A Postgres database is also required (again, any non-ancient version should work). 33 + 34 + Download or clone the repository, then install the dependencies: 35 + 36 + ``` 37 + bundle install 38 + ``` 39 + 40 + Next, create the database – the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task: 41 + 42 + ``` 43 + bundle exec rake db:create 44 + ``` 45 + 46 + Then, run the migrations: 47 + 48 + ``` 49 + bundle exec rake db:migrate 50 + ``` 51 + 52 + To run an import, you will need to run three separate processes, probably in separate terminal tabs: 53 + 54 + 1) the firehose client, [`bin/firehose`](bin/firehose) 55 + 2) the background worker, [`bin/worker`](bin/worker) 56 + 3) the Sinatra HTTP server, [`bin/server`](bin/server) 57 + 58 + The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu – but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL. 59 + 60 + You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.) 61 + 62 + 63 + ## Configuration 64 + 65 + There's a few things you can configure through ENV variables: 66 + 67 + - `RELAY_HOST` – hostname of the relay to use for the firehose (default: `bsky.network`) 68 + - `JETSTREAM_HOST` – alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance 69 + - `FIREHOSE_USER_AGENT` – when running in production, it's recommended that you set this to some name that identifies who is running the service 70 + - `APPVIEW_HOST` – hostname of the AppView used to download posts (default: `public.api.bsky.app`) 71 + - `SERVER_HOSTNAME` – hostname of the server on which you're running the service in production 72 + 73 + 74 + ## Rake tasks 75 + 76 + Some Rake tasks that might be useful: 77 + 78 + ``` 79 + bundle exec rake enqueue_user DID=did:plc:qweqwe 80 + ``` 81 + 82 + - request an import of the given account (to be handled by firehose + worker) 83 + 84 + ``` 85 + bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all 86 + ``` 87 + 88 + - run a complete import synchronously 89 + 90 + ``` 91 + bundle exec rake process_posts 92 + ``` 93 + 94 + - process all previously queued and unfinished or failed items 95 + 96 + 97 + ## Running in production 98 + 99 + This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup. 100 + 101 + On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server – my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose). 102 + 103 + 104 + ## Credits 105 + 106 + Copyright © 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)). 107 + 108 + The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 109 + 110 + Bug reports and pull requests are welcome 😎
+1
Rakefile
··· 1 1 require 'bundler/setup' 2 2 require 'sinatra/activerecord' 3 3 require 'sinatra/activerecord/rake' 4 + require 'dotenv/load' 4 5 5 6 Rake.add_rakelib File.join(__dir__, 'lib', 'tasks') 6 7
+14 -2
app/authenticator.rb
··· 6 6 require 'openssl' 7 7 8 8 class Authenticator 9 + class InvalidTokenError < StandardError 10 + end 11 + 9 12 def initialize(hostname:) 10 13 @@pkey_cache ||= {} 11 14 @hostname = hostname ··· 15 18 return nil unless auth_header.start_with?('Bearer ') 16 19 17 20 token = auth_header.gsub(/\ABearer /, '') 18 - data = JSON.parse(Base64.decode64(token.split('.')[1])) 21 + parts = token.split('.') 22 + raise InvalidTokenError, "Invalid JWT token" if parts.length != 3 23 + 24 + begin 25 + decoded_data = Base64.decode64(parts[1]) 26 + data = JSON.parse(decoded_data) 27 + rescue StandardError => e 28 + raise InvalidTokenError, "Invalid JWT token" 29 + end 30 + 19 31 did = data['iss'] 20 - return nil if data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint 32 + return nil if did.nil? || data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint 21 33 22 34 pkey = pkey_for_user(did) 23 35
-42
app/console_report.rb
··· 1 - class ConsoleReport 2 - def initialize 3 - @data = {} 4 - @start = Time.now 5 - end 6 - 7 - def update(data) 8 - deep_merge(@data, data) 9 - render 10 - end 11 - 12 - def deep_merge(target, updates) 13 - updates.each do |k, v| 14 - if v.is_a?(Hash) 15 - target[k] ||= {} 16 - deep_merge(target[k], v) 17 - else 18 - target[k] = v 19 - end 20 - end 21 - end 22 - 23 - def render 24 - print " " * 80 + "\r" 25 - puts "Elapsed time: #{(Time.now - @start).to_i} s" 26 - 27 - importers = @data[:importers] || {} 28 - 29 - importers.each do |name, data| 30 - print " " * 80 + "\r" 31 - puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}" 32 - end 33 - 34 - print " " * 80 + "\r" 35 - puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" 36 - 37 - print " " * 80 + "\r" 38 - puts "Queue size: #{@data.dig(:queue, :length) || 0}" 39 - 40 - print "\e[#{3 + importers.length}A" 41 - end 42 - end
+2
app/errors.rb
··· 1 + class InvalidRecordError < StandardError 2 + end
+50 -9
app/firehose_client.rb
··· 1 1 require 'skyfall' 2 2 3 3 require_relative 'init' 4 + require_relative 'models/import_job' 4 5 require_relative 'models/post' 5 6 require_relative 'models/subscription' 6 7 require_relative 'models/user' ··· 12 13 13 14 def initialize 14 15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 15 - @service = DEFAULT_RELAY 16 + 17 + if ENV['RELAY_HOST'] 18 + @service = ENV['RELAY_HOST'] 19 + elsif ENV['JETSTREAM_HOST'] 20 + @service = ENV['JETSTREAM_HOST'] 21 + @jetstream = true 22 + else 23 + @service = DEFAULT_RELAY 24 + end 16 25 end 17 26 18 27 def start ··· 25 34 last_cursor = load_or_init_cursor 26 35 cursor = @start_cursor || last_cursor 27 36 28 - @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 29 - @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}" 37 + @sky = if @jetstream 38 + Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] }) 39 + else 40 + Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 41 + end 42 + 43 + @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string 30 44 @sky.check_heartbeat = true 31 45 32 46 @sky.on_message do |m| ··· 48 62 @replaying = true 49 63 @last_update = Time.now 50 64 51 - @timer ||= EM::PeriodicTimer.new(20) do 65 + @live_check_timer ||= EM::PeriodicTimer.new(20) do 52 66 now = Time.now 53 67 diff = now - @last_update 54 68 55 69 if diff > 30 56 70 log "Timer: last update #{sprintf('%.1f', diff)}s ago" 71 + end 72 + end 73 + 74 + @jobs_timer ||= EM::PeriodicTimer.new(3) do 75 + ImportJob.all.each do |job| 76 + @active_users[job.user.did] = job.user 77 + job.create_imports 78 + job.destroy 57 79 end 58 80 end 59 81 } ··· 129 151 end 130 152 end 131 153 end 154 + rescue CBOR::UnpackError 155 + # ignore invalid records 132 156 end 133 157 134 158 def process_account_event(msg) 135 159 if msg.status == :deleted 136 160 if user = User.find_by(did: msg.repo) 137 161 user.destroy 162 + @active_users.delete_if { |k, u| u.id == user.id } 138 163 end 139 164 end 140 165 end ··· 143 168 return unless @current_user 144 169 145 170 if op.action == :create 146 - @current_user.likes.import_from_record(op.uri, op.raw_record) 171 + return if op.raw_record.nil? 172 + @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 147 173 elsif op.action == :delete 148 174 @current_user.likes.where(rkey: op.rkey).delete_all 149 175 end 176 + rescue StandardError => e 177 + log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}" 178 + log e.backtrace.reject { |x| x.include?('/ruby/') } 179 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 150 180 end 151 181 152 182 def process_repost(msg, op) 153 183 return unless @current_user 154 184 155 185 if op.action == :create 156 - @current_user.reposts.import_from_record(op.uri, op.raw_record) 186 + return if op.raw_record.nil? 187 + @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 157 188 elsif op.action == :delete 158 189 @current_user.reposts.where(rkey: op.rkey).delete_all 159 190 end 191 + rescue StandardError => e 192 + log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}" 193 + log e.backtrace.reject { |x| x.include?('/ruby/') } 194 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 160 195 end 161 196 162 197 def process_post(msg, op) 163 198 if op.action == :create 199 + return if op.raw_record.nil? 200 + 164 201 if @current_user 165 - @current_user.quotes.import_from_record(op.uri, op.raw_record) 166 - @current_user.pins.import_from_record(op.uri, op.raw_record) 202 + @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 203 + @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) 167 204 end 168 205 elsif op.action == :delete 169 206 if @current_user ··· 175 212 post.destroy 176 213 end 177 214 end 215 + rescue StandardError => e 216 + log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}" 217 + log e.backtrace.reject { |x| x.include?('/ruby/') } 218 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 178 219 end 179 220 180 221 def log(text) ··· 182 223 end 183 224 184 225 def inspect 185 - vars = instance_variables - [:@timer] 226 + vars = instance_variables - [:@jobs_timer, :@live_check_timer] 186 227 values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 187 228 "#<#{self.class}:0x#{object_id} #{values}>" 188 229 end
+21 -9
app/import_manager.rb
··· 5 5 require_relative 'post_downloader' 6 6 7 7 class ImportManager 8 - attr_accessor :report, :time_limit 8 + attr_accessor :report, :time_limit, :logger, :log_status_updates 9 9 10 10 def initialize(user) 11 11 @user = user 12 12 end 13 13 14 - def start(sets, include_pending) 15 - queued_items = [] 14 + def start(sets) 16 15 importers = [] 17 16 sets = [sets] unless sets.is_a?(Array) 18 17 19 18 sets.each do |set| 20 19 case set 21 20 when 'likes' 22 - queued_items += @user.likes.pending.to_a if include_pending 23 21 importers << LikesImporter.new(@user) 24 22 when 'reposts' 25 - queued_items += @user.reposts.pending.to_a if include_pending 26 23 importers << RepostsImporter.new(@user) 27 24 when 'posts' 28 - queued_items += @user.quotes.pending.to_a + @user.pins.pending.to_a if include_pending 29 25 importers << PostsImporter.new(@user) 30 26 when 'all' 31 - queued_items += @user.all_pending_items if include_pending 32 27 importers += [ 33 28 LikesImporter.new(@user), 34 29 RepostsImporter.new(@user), ··· 39 34 end 40 35 end 41 36 37 + queued_items = @user.all_items_in_queue(:import).sort_by(&:time).reverse 42 38 queue = ItemQueue.new(queued_items) 43 39 44 40 downloader = PostDownloader.new 45 41 downloader.report = @report 42 + downloader.logger = @logger 46 43 47 - download_thread = Thread.new { downloader.import_from_queue(queue) } 44 + download_thread = Thread.new do 45 + @logger&.info "Starting downloader thread for #{@user}" if @log_status_updates 46 + 47 + downloader.import_from_queue(queue) 48 + 49 + @logger&.info "Ended downloader thread for #{@user}" if @log_status_updates 50 + end 48 51 49 52 import_threads = importers.map do |import| 50 53 import.item_queue = queue 51 54 import.report = @report 55 + import.logger = @logger 56 + 57 + Thread.new do 58 + @logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates 52 59 53 - Thread.new { import.run_import(@time_limit) } 60 + import.run_import(@time_limit) 61 + 62 + @logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates 63 + end 54 64 end 55 65 56 66 import_threads.each { |i| i.join } 67 + 68 + @logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates 57 69 58 70 downloader.stop_when_empty = true 59 71 download_thread.join
+94
app/import_worker.rb
··· 1 + require 'active_record' 2 + require 'minisky' 3 + require 'time' 4 + 5 + require_relative 'init' 6 + require_relative 'import_manager' 7 + require_relative 'models/import' 8 + require_relative 'post_downloader' 9 + require_relative 'reports/basic_report' 10 + 11 + class ImportWorker 12 + attr_accessor :verbose, :logger 13 + 14 + class UserThread < Thread 15 + def initialize(user, collections, logger, verbose = false) 16 + @user = user 17 + @verbose = verbose 18 + @logger = logger 19 + 20 + super { run(collections) } 21 + end 22 + 23 + def user_id 24 + @user.id 25 + end 26 + 27 + def run(collections) 28 + @logger&.info "Starting import thread for #{@user}" 29 + 30 + if @user.registered_at.nil? 31 + registration_time = get_registration_time(@user) 32 + @user.update!(registered_at: registration_time) 33 + end 34 + 35 + import = ImportManager.new(@user) 36 + 37 + if @logger 38 + import.report = BasicReport.new(@logger) if @verbose 39 + import.logger = @logger 40 + import.log_status_updates = true 41 + end 42 + 43 + import.start(collections) 44 + 45 + @logger&.info "Ended import thread for #{@user}" 46 + end 47 + 48 + def get_registration_time(user) 49 + sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 50 + profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 51 + 52 + Time.parse(profile['createdAt']) 53 + end 54 + end 55 + 56 + def run 57 + @user_threads = [] 58 + 59 + @firehose_thread = Thread.new { process_firehose_items } 60 + @downloader = PostDownloader.new 61 + @downloader.logger = @logger 62 + 63 + loop do 64 + @user_threads.delete_if { |t| !t.alive? } 65 + 66 + users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a 67 + 68 + users.each do |user| 69 + collections = user.imports.unfinished.map(&:collection) 70 + thread = UserThread.new(user, collections, @logger, @verbose) 71 + @user_threads << thread 72 + end 73 + 74 + # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 75 + sleep 5 76 + end 77 + end 78 + 79 + def process_firehose_items 80 + loop do 81 + items = [Like, Repost, Pin, Quote] 82 + .map { |type| type.in_queue(:firehose).order('time').limit(25) } 83 + .flatten 84 + .sort_by(&:time) 85 + .first(25) 86 + 87 + if items.length > 0 88 + @downloader.process_items(items) 89 + else 90 + sleep 5 91 + end 92 + end 93 + end 94 + end
+11 -3
app/importers/base_importer.rb
··· 1 1 require 'didkit' 2 2 require 'minisky' 3 + require 'time' 3 4 4 5 require_relative '../at_uri' 6 + require_relative '../errors' 5 7 require_relative '../models/post' 6 8 require_relative '../models/user' 7 9 8 10 class BaseImporter 9 - attr_accessor :item_queue, :report 11 + attr_accessor :item_queue, :report, :logger 10 12 11 13 def initialize(user) 12 14 @did = DID.new(user.did) ··· 31 33 end 32 34 33 35 @time_limit = requested_time_limit || @import.last_completed 34 - puts "Fetching until: #{@time_limit}" if @time_limit 36 + @logger&.info "Fetching until: #{@time_limit}" if @time_limit 35 37 36 38 import_items 37 39 38 40 @import.update!(last_completed: @import.started_from) unless requested_time_limit 39 - @import.update!(cursor: nil, started_from: nil) 41 + @import.update!(cursor: nil, started_from: nil, fetched_until: nil) 40 42 @report&.update(importers: { importer_name => { :finished => true }}) 41 43 end 42 44 43 45 def import_items 44 46 raise NotImplementedError 47 + end 48 + 49 + def created_at(record) 50 + Time.parse(record['createdAt']) 51 + rescue StandardError 52 + raise InvalidRecordError 45 53 end 46 54 end
+13 -10
app/importers/likes_importer.rb
··· 1 - require 'time' 2 1 require_relative 'base_importer' 3 2 4 3 class LikesImporter < BaseImporter ··· 11 10 12 11 records = response['records'] 13 12 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 13 + oldest_date = nil 18 14 19 15 records.each do |record| 20 16 begin 21 - like = @user.likes.import_from_record(record['uri'], record['value']) 17 + like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = like&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 22 21 23 22 if like && like.pending? && @item_queue 24 23 @item_queue.push(like) 25 24 @report&.update(queue: { length: @item_queue.length }) 26 25 end 27 - rescue StandardError => e 28 - puts "Error in LikesImporter: #{record['uri']}: #{e}" 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}" 29 28 end 30 29 end 31 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 32 35 params[:cursor] = cursor 33 - @import.update!(cursor: cursor) 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 34 37 35 38 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 37 40 end 38 41 end 39 42 end
+14 -11
app/importers/posts_importer.rb
··· 1 - require 'time' 2 1 require_relative 'base_importer' 3 2 4 3 class PostsImporter < BaseImporter ··· 11 10 12 11 records = response['records'] 13 12 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 13 + oldest_date = nil 18 14 19 15 records.each do |record| 20 16 begin 21 - quote = @user.quotes.import_from_record(record['uri'], record['value']) 22 - pin = @user.pins.import_from_record(record['uri'], record['value']) 17 + quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 18 + pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 19 + 20 + record_date = quote&.time || pin&.time || created_at(record['value']) 21 + oldest_date = [oldest_date, record_date].compact.min 23 22 24 23 if @item_queue 25 24 if quote && quote.pending? ··· 32 31 33 32 @report&.update(queue: { length: @item_queue.length }) 34 33 end 35 - rescue StandardError => e 36 - puts "Error in LikesImporter: #{record['uri']}: #{e}" 34 + rescue InvalidRecordError => e 35 + @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}" 37 36 end 38 37 end 39 38 39 + @imported_count += records.length 40 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 41 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 42 + 40 43 params[:cursor] = cursor 41 - @import.update!(cursor: cursor) 44 + @import.update!(cursor: cursor, fetched_until: oldest_date) 42 45 43 46 break if !cursor 44 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 47 + break if @time_limit && oldest_date && oldest_date < @time_limit 45 48 end 46 49 end 47 50 end
+13 -10
app/importers/reposts_importer.rb
··· 1 - require 'time' 2 1 require_relative 'base_importer' 3 2 4 3 class RepostsImporter < BaseImporter ··· 11 10 12 11 records = response['records'] 13 12 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 13 + oldest_date = nil 18 14 19 15 records.each do |record| 20 16 begin 21 - repost = @user.reposts.import_from_record(record['uri'], record['value']) 17 + repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = repost&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 22 21 23 22 if repost && repost.pending? && @item_queue 24 23 @item_queue.push(repost) 25 24 @report&.update(queue: { length: @item_queue.length }) 26 25 end 27 - rescue StandardError => e 28 - puts "Error in RepostsImporter: #{record['uri']}: #{e}" 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}" 29 28 end 30 29 end 31 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 32 35 params[:cursor] = cursor 33 - @import.update!(cursor: cursor) 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 34 37 35 38 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 37 40 end 38 41 end 39 42 end
+1
app/init.rb
··· 1 1 require 'sinatra/activerecord' 2 + require 'dotenv/load' 2 3 3 4 ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.datetime_type = :timestamptz 4 5 RubyVM::YJIT.enable
+28
app/models/import.rb
··· 7 7 8 8 validates_inclusion_of :collection, in: %w(likes reposts posts) 9 9 validates_uniqueness_of :collection, scope: :user_id 10 + 11 + scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') } 12 + 13 + IMPORT_END = Time.at(0) 14 + 15 + def imported_until 16 + return nil if cursor.nil? && last_completed.nil? 17 + 18 + groups = case collection 19 + when 'likes' 20 + [:likes] 21 + when 'reposts' 22 + [:reposts] 23 + when 'posts' 24 + [:pins, :quotes] 25 + end 26 + 27 + newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last } 28 + newest_queued = newest_queued_items.compact.sort_by(&:time).last 29 + 30 + if newest_queued 31 + newest_queued.time 32 + elsif fetched_until 33 + fetched_until 34 + else 35 + IMPORT_END 36 + end 37 + end 10 38 end
+13
app/models/import_job.rb
··· 1 + require 'active_record' 2 + 3 + require_relative 'user' 4 + 5 + class ImportJob < ActiveRecord::Base 6 + belongs_to :user 7 + 8 + def create_imports 9 + %w(likes reposts posts).each do |group| 10 + self.user.imports.find_or_create_by!(collection: group) 11 + end 12 + end 13 + end
+12 -6
app/models/importable.rb
··· 8 8 9 9 included do 10 10 scope :pending, -> { where(post: nil) } 11 + scope :in_queue, ->(q) { where(queue: q) } 12 + 13 + enum :queue, { firehose: 0, import: 1 } 14 + 15 + validates_presence_of :post_uri, if: -> { post_id.nil? } 16 + validate :check_queue 11 17 12 18 def pending? 13 19 post_uri != nil 14 20 end 15 21 16 - def import_item! 22 + def check_queue 23 + errors.add(:queue, 'must be nil if already processed') if queue && post 24 + end 25 + 26 + def import_item!(args = {}) 17 27 post_uri = AT_URI(self.post_uri) 18 28 return nil if !post_uri.is_post? 19 29 20 - if post = Post.find_by_at_uri(post_uri) 21 - self.post = post 22 - self.post_uri = nil 23 - end 24 - 30 + self.assign_attributes(args) 25 31 self.save! 26 32 self 27 33 end
-2
app/models/like.rb
··· 14 14 validates_presence_of :time, :rkey 15 15 validates_length_of :rkey, is: 13 16 16 17 - validates_presence_of :post_uri, if: -> { post_id.nil? } 18 - 19 17 belongs_to :user, foreign_key: 'actor_id' 20 18 belongs_to :post, optional: true 21 19
-2
app/models/pin.rb
··· 16 16 validates_length_of :rkey, is: 13 17 17 validates :pin_text, length: { minimum: 0, maximum: 1000, allow_nil: false } 18 18 19 - validates_presence_of :post_uri, if: -> { post_id.nil? } 20 - 21 19 belongs_to :user, foreign_key: 'actor_id' 22 20 belongs_to :post, optional: true 23 21
+7
app/models/post.rb
··· 10 10 validates_length_of :text, maximum: 1000 11 11 validates_length_of :data, maximum: 10000 12 12 13 + validate :check_null_bytes 14 + 13 15 belongs_to :user 14 16 15 17 has_many :likes, dependent: :delete_all ··· 26 28 27 29 def at_uri 28 30 "at://#{user.did}/app.bsky.feed.post/#{rkey}" 31 + end 32 + 33 + def check_null_bytes 34 + # Postgres doesn't allow null bytes in strings 35 + errors.add(:text, 'must not contain a null byte') if text.include?("\u0000") 29 36 end 30 37 end
-2
app/models/quote.rb
··· 14 14 validates_length_of :rkey, is: 13 15 15 validates :quote_text, length: { minimum: 0, maximum: 1000, allow_nil: false } 16 16 17 - validates_presence_of :post_uri, if: -> { post_id.nil? } 18 - 19 17 belongs_to :user, foreign_key: 'actor_id' 20 18 belongs_to :post, optional: true 21 19
-2
app/models/repost.rb
··· 13 13 validates_presence_of :time, :rkey 14 14 validates_length_of :rkey, is: 13 15 15 16 - validates_presence_of :post_uri, if: -> { post_id.nil? } 17 - 18 16 belongs_to :user, foreign_key: 'actor_id' 19 17 belongs_to :post, optional: true 20 18
+38 -28
app/models/user.rb
··· 1 1 require 'active_record' 2 2 3 3 require_relative 'import' 4 + require_relative 'import_job' 4 5 require_relative 'like' 5 6 require_relative 'quote' 6 7 require_relative 'pin' 7 8 require_relative 'post' 8 9 require_relative 'repost' 10 + require_relative 'user_importable' 9 11 10 12 class User < ActiveRecord::Base 11 13 validates_presence_of :did 12 14 validates_length_of :did, maximum: 260 15 + validates_format_of :did, with: /\Adid:(plc:[0-9a-z]{24}|web:[0-9a-z\-]+(\.[0-9a-z\-]+)+)\Z/ 13 16 14 17 has_many :posts 15 18 has_many :imports, dependent: :delete_all 19 + has_one :import_job, dependent: :delete 16 20 17 21 before_destroy :delete_posts_cascading 18 22 19 - has_many :likes, foreign_key: 'actor_id', dependent: :delete_all do 20 - def import_from_record(like_uri, record) 21 - like = self.new_from_record(like_uri, record) 22 - return nil if like.nil? || self.where(rkey: like.rkey).exists? 23 + has_many :likes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 24 + has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 25 + has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 26 + has_many :pins, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 23 27 24 - like.import_item! 25 - end 28 + def self.active 29 + self.joins(:imports).distinct 26 30 end 27 31 28 - has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all do 29 - def import_from_record(repost_uri, record) 30 - repost = self.new_from_record(repost_uri, record) 31 - return nil if repost.nil? || self.where(rkey: repost.rkey).exists? 32 + def self.with_unfinished_import 33 + self.where(id: Import.unfinished.select('user_id').distinct) 34 + .or(self.where(id: Like.in_queue(:import).select('actor_id').distinct)) 35 + .or(self.where(id: Repost.in_queue(:import).select('actor_id').distinct)) 36 + .or(self.where(id: Quote.in_queue(:import).select('actor_id').distinct)) 37 + .or(self.where(id: Pin.in_queue(:import).select('actor_id').distinct)) 38 + end 32 39 33 - repost.import_item! 34 - end 40 + def active? 41 + imports.exists? 35 42 end 36 43 37 - has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all do 38 - def import_from_record(post_uri, record) 39 - quote = self.new_from_record(post_uri, record) 40 - return nil if quote.nil? || self.where(rkey: quote.rkey).exists? 44 + def all_pending_items 45 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+) 46 + end 41 47 42 - quote.import_item! 43 - end 48 + def all_items_in_queue(queue) 49 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).in_queue(queue).to_a }.reduce(&:+) 44 50 end 45 51 46 - has_many :pins, foreign_key: 'actor_id', dependent: :delete_all do 47 - def import_from_record(post_uri, record) 48 - pin = self.new_from_record(post_uri, record) 49 - return nil if pin.nil? || self.where(rkey: pin.rkey).exists? 52 + def imported_until 53 + import_positions = self.imports.map(&:imported_until) 50 54 51 - pin.import_item! 55 + if import_positions.empty? || import_positions.any? { |x| x.nil? } 56 + nil 57 + else 58 + import_positions.sort.last 52 59 end 53 60 end 54 61 55 - def self.active 56 - self.joins(:imports).distinct 57 - end 62 + def erase_imports! 63 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).delete_all } 58 64 59 - def all_pending_items 60 - [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+) 65 + self.import_job&.destroy 66 + self.imports.delete_all 61 67 end 62 68 63 69 def delete_posts_cascading ··· 69 75 Quote.where(post_id: posts_subquery).delete_all 70 76 71 77 Post.where(user: self).delete_all 78 + end 79 + 80 + def to_s 81 + %(<User id: #{id}, did: "#{did}">) 72 82 end 73 83 end
+20
app/models/user_importable.rb
··· 1 + require_relative '../errors' 2 + 3 + module UserImportable 4 + def import_from_record(item_uri, record, **args) 5 + item = try_build_from_record(item_uri, record) 6 + return nil if item.nil? || already_imported?(item) 7 + 8 + item.import_item!(args) 9 + end 10 + 11 + def try_build_from_record(item_uri, record) 12 + self.new_from_record(item_uri, record) 13 + rescue StandardError 14 + raise InvalidRecordError 15 + end 16 + 17 + def already_imported?(item) 18 + self.where(rkey: item.rkey).exists? 19 + end 20 + end
+62 -35
app/post_downloader.rb
··· 6 6 require_relative 'models/user' 7 7 8 8 class PostDownloader 9 - attr_accessor :report, :stop_when_empty 9 + attr_accessor :report, :logger, :stop_when_empty 10 10 11 11 def initialize 12 - @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil) 12 + @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 13 13 14 14 @total_count = 0 15 15 @oldest_imported = Time.now ··· 31 31 32 32 @report&.update(queue: { length: queue.length }) 33 33 34 - existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 34 + process_items(items) 35 + end 36 + end 37 + 38 + def process_items(items) 39 + existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 35 40 36 - items.dup.each do |item| 37 - if post = existing_posts.detect { |post| post.at_uri == item.post_uri } 38 - update_item(item, post) 39 - items.delete(item) 40 - end 41 + items.dup.each do |item| 42 + if post = existing_posts.detect { |post| post.at_uri == item.post_uri } 43 + update_item(item, post) 44 + items.delete(item) 41 45 end 46 + end 42 47 43 - next if items.empty? 48 + return if items.empty? 44 49 45 - begin 46 - response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 50 + begin 51 + response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 47 52 48 - response['posts'].each do |data| 49 - begin 50 - item = items.detect { |x| x.post_uri == data['uri'] } 51 - items.delete(item) 53 + response['posts'].each do |data| 54 + current_items = items.select { |x| x.post_uri == data['uri'] } 55 + items -= current_items 52 56 53 - post = save_post(data['uri'], data['record']) 57 + begin 58 + post = save_post(data['uri'], data['record']) 54 59 55 - if post.valid? 56 - update_item(item, post) 57 - else 58 - puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}" 59 - invalidate_item(item) 60 - end 61 - rescue StandardError => e 62 - puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}" 60 + if post.valid? 61 + current_items.each { |i| update_item(i, post) } 62 + else 63 + @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 64 + current_items.each { |i| invalidate_item(i) } 63 65 end 66 + rescue InvalidRecordError => e 67 + @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 68 + current_items.each { |i| i.update!(queue: nil) } 64 69 end 65 - 66 - check_missing_items(items) 67 - rescue StandardError => e 68 - puts "Error in PostDownloader: #{e.class}: #{e}" 69 70 end 71 + 72 + check_missing_items(items) 73 + rescue StandardError => e 74 + @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" 70 75 end 71 76 end 72 77 73 78 def save_post(post_uri, record) 74 79 did, _, rkey = AT_URI(post_uri) 75 80 81 + begin 82 + author = User.find_or_create_by!(did: did) 83 + rescue ActiveRecord::RecordInvalid => e 84 + raise InvalidRecordError 85 + end 86 + 87 + if post = Post.find_by(user: author, rkey: rkey) 88 + return post 89 + else 90 + post = build_post(author, rkey, record) 91 + post.save 92 + post 93 + end 94 + end 95 + 96 + def build_post(author, rkey, record) 76 97 text = record.delete('text') 77 98 created = record.delete('createdAt') 78 99 79 - author = User.find_or_create_by!(did: did) 100 + record.delete('$type') 80 101 81 - Post.create( 102 + Post.new( 82 103 user: author, 83 104 rkey: rkey, 84 105 time: Time.parse(created), 85 106 text: text, 86 107 data: JSON.generate(record) 87 108 ) 109 + rescue StandardError 110 + raise InvalidRecordError 88 111 end 89 112 90 113 def update_item(item, post) 91 - item.update!(post: post, post_uri: nil) 114 + item.update!(post: post, post_uri: nil, queue: nil) 92 115 93 116 @total_count += 1 94 117 @oldest_imported = [@oldest_imported, item.time].min ··· 130 153 case status 131 154 when :active 132 155 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 133 - puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 156 + # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 134 157 item.destroy 135 158 when nil 136 159 # account was deleted, so all posts were deleted too 137 - puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 160 + # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 138 161 item.destroy 139 162 else 140 163 # account is inactive/suspended, but could come back, so leave it for now 141 - puts "#{item.post_uri}: account #{did} is inactive: #{status}" 164 + # puts "#{item.post_uri}: account #{did} is inactive: #{status}" 142 165 end 143 166 rescue StandardError => e 144 167 hostname = did_obj.document.pds_host rescue "???" 145 - puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 168 + @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 146 169 147 170 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 148 171 item.destroy if hostname == 'bsky.social' 149 172 end 173 + end 174 + 175 + if !item.destroyed? 176 + item.update!(queue: nil) 150 177 end 151 178 end 152 179 end
+17
app/reports/basic_report.rb
··· 1 + class BasicReport 2 + def initialize(logger) 3 + @logger = logger 4 + end 5 + 6 + def update(data) 7 + data.each do |k, v| 8 + if k == :downloader 9 + @logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 10 + elsif k == :queue 11 + next 12 + else 13 + @logger.info({ k => v}.inspect) 14 + end 15 + end 16 + end 17 + end
+43
app/reports/console_report.rb
··· 1 + class ConsoleReport 2 + def initialize 3 + @data = {} 4 + @start = Time.now 5 + @mutex = Mutex.new 6 + end 7 + 8 + def update(data) 9 + @mutex.synchronize { deep_merge(@data, data) } 10 + render 11 + end 12 + 13 + def deep_merge(target, updates) 14 + updates.each do |k, v| 15 + if v.is_a?(Hash) 16 + target[k] ||= {} 17 + deep_merge(target[k], v) 18 + else 19 + target[k] = v 20 + end 21 + end 22 + end 23 + 24 + def render 25 + print " " * 80 + "\r" 26 + puts "Elapsed time: #{(Time.now - @start).to_i} s" 27 + 28 + importers = @data[:importers] || {} 29 + 30 + importers.each do |name, data| 31 + print " " * 80 + "\r" 32 + puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}" 33 + end 34 + 35 + print " " * 80 + "\r" 36 + puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" 37 + 38 + print " " * 80 + "\r" 39 + puts "Queue size: #{@data.dig(:queue, :length) || 0}" 40 + 41 + print "\e[#{3 + importers.length}A" 42 + end 43 + end
+9
app/reports/simple_logger.rb
··· 1 + require 'logger' 2 + 3 + class SimpleLogger < Logger 4 + def initialize 5 + super(STDOUT) 6 + 7 + self.formatter = proc { |level, time, prog, msg| "[#{time}] #{msg}\n" } 8 + end 9 + end
+105 -23
app/server.rb
··· 8 8 9 9 class Server < Sinatra::Application 10 10 register Sinatra::ActiveRecordExtension 11 - set :port, 3000 11 + set :port, ENV['PORT'] || 3000 12 12 13 13 PAGE_LIMIT = 25 14 - HOSTNAME = 'lycan.feeds.blue' 14 + HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue' 15 15 16 16 helpers do 17 17 def json_response(data) ··· 23 23 content_type :json 24 24 [status, JSON.generate({ error: name, message: message })] 25 25 end 26 + 27 + def get_user_did 28 + if settings.development? 29 + did = if request.post? 30 + json = JSON.parse(request.body.read) 31 + request.body.rewind 32 + json['user'] 33 + else 34 + params[:user] 35 + end 36 + 37 + if did 38 + return did 39 + else 40 + halt json_error('AuthMissing', 'Missing "user" parameter', status: 401) 41 + end 42 + else 43 + auth_header = env['HTTP_AUTHORIZATION'] 44 + if auth_header.to_s.strip.empty? 45 + halt json_error('AuthMissing', 'Missing authentication header', status: 401) 46 + end 47 + 48 + begin 49 + method = request.path.split('/')[2] 50 + did = @authenticator.decode_user_from_jwt(auth_header, method) 51 + rescue StandardError => e 52 + halt json_error('InvalidToken', e.message, status: 401) 53 + end 54 + 55 + if did 56 + return did 57 + else 58 + halt json_error('InvalidToken', "Invalid JWT token", status: 401) 59 + end 60 + end 61 + end 62 + 63 + def load_user 64 + did = get_user_did 65 + 66 + if user = User.find_by(did: did) 67 + return user 68 + else 69 + halt json_error('AccountNotFound', 'Account not found', status: 401) 70 + end 71 + end 26 72 end 27 73 28 74 before do 29 75 @authenticator = Authenticator.new(hostname: HOSTNAME) 30 - end 31 - 32 - get '/xrpc/blue.feeds.lycan.searchPosts' do 33 - headers['access-control-allow-origin'] = '*' 34 76 35 77 if settings.development? 36 - user = User.find_by(did: params[:user]) 37 - return json_error('UserNotFound', 'Missing "user" parameter') if user.nil? 38 - else 39 - begin 40 - auth_header = env['HTTP_AUTHORIZATION'] 41 - did = @authenticator.decode_user_from_jwt(auth_header, 'blue.feeds.lycan.searchPosts') 42 - rescue StandardError => e 43 - p e 44 - end 45 - 46 - user = did && User.find_by(did: did) 47 - return json_error('UserNotFound', 'Missing authentication header') if user.nil? 78 + headers['Access-Control-Allow-Origin'] = '*' 48 79 end 80 + end 81 + 82 + get '/' do 83 + "Awoo 🐺" 84 + end 85 + 86 + get '/xrpc/blue.feeds.lycan.searchPosts' do 87 + @user = load_user 49 88 50 89 if params[:query].to_s.strip.empty? 51 90 return json_error('MissingParameter', 'Missing "query" parameter') ··· 58 97 query = QueryParser.new(params[:query]) 59 98 60 99 collection = case params[:collection] 61 - when 'likes' then user.likes 62 - when 'pins' then user.pins 63 - when 'quotes' then user.quotes 64 - when 'reposts' then user.reposts 100 + when 'likes' then @user.likes 101 + when 'pins' then @user.pins 102 + when 'quotes' then @user.quotes 103 + when 'reposts' then @user.reposts 65 104 else return json_error('InvalidParameter', 'Invalid search collection') 66 105 end 67 106 ··· 80 119 81 120 post_uris = case params[:collection] 82 121 when 'quotes' 83 - items.map { |x| "at://#{user.did}/app.bsky.feed.post/#{x.rkey}" } 122 + items.map { |x| "at://#{@user.did}/app.bsky.feed.post/#{x.rkey}" } 84 123 else 85 124 items.map(&:post).map(&:at_uri) 86 125 end 87 126 88 127 json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor) 128 + end 129 + 130 + if settings.development? 131 + options '/xrpc/blue.feeds.lycan.startImport' do 132 + headers['Access-Control-Allow-Origin'] = '*' 133 + headers['Access-Control-Allow-Headers'] = 'content-type' 134 + end 135 + end 136 + 137 + post '/xrpc/blue.feeds.lycan.startImport' do 138 + did = get_user_did 139 + user = User.find_or_create_by(did: did) 140 + 141 + if !user.valid? 142 + json_error('InvalidRequest', 'Invalid DID') 143 + elsif user.import_job || user.active? 144 + json_response(message: "Import has already started") 145 + else 146 + user.create_import_job! 147 + 148 + status 202 149 + json_response(message: "Import has been scheduled") 150 + end 151 + end 152 + 153 + get '/xrpc/blue.feeds.lycan.getImportStatus' do 154 + did = get_user_did 155 + user = User.find_by(did: did) 156 + until_date = user&.imported_until 157 + 158 + case until_date 159 + when nil 160 + if user.import_job || user.imports.exists? 161 + json_response(status: 'scheduled') 162 + else 163 + json_response(status: 'not_started') 164 + end 165 + when Import::IMPORT_END 166 + json_response(status: 'finished') 167 + else 168 + progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at) 169 + json_response(status: 'in_progress', position: until_date.iso8601, progress: progress) 170 + end 89 171 end 90 172 91 173 get '/.well-known/did.json' do
+1
bin/server
··· 4 4 5 5 require 'bundler/setup' 6 6 require 'app/server' 7 + require 'thin' 7 8 8 9 Server.run!
+53
bin/worker
··· 1 + #!/usr/bin/env ruby 2 + 3 + require 'bundler/setup' 4 + require_relative '../app/import_worker' 5 + require_relative '../app/reports/simple_logger' 6 + 7 + $stdout.sync = true 8 + 9 + if ENV['ARLOG'] == '1' 10 + ActiveRecord::Base.logger = Logger.new(STDOUT) 11 + else 12 + ActiveRecord::Base.logger = nil 13 + end 14 + 15 + def print_help 16 + puts "Usage: #{$0} [options...]" 17 + puts "Options:" 18 + puts " -v = verbose" 19 + end 20 + 21 + logger = SimpleLogger.new 22 + worker = ImportWorker.new 23 + worker.logger = logger 24 + 25 + args = ARGV.dup 26 + 27 + while arg = args.shift 28 + case arg 29 + when '-v', '--verbose' 30 + worker.verbose = true 31 + when '-h', '--help' 32 + print_help 33 + exit 0 34 + else 35 + puts "Unrecognized option: #{arg}" 36 + print_help 37 + exit 1 38 + end 39 + end 40 + 41 + trap("SIGINT") { 42 + puts 43 + puts "[#{Time.now}] Stopping..." 44 + exit 45 + } 46 + 47 + trap("SIGTERM") { 48 + puts "[#{Time.now}] Shutting down the service..." 49 + exit 50 + } 51 + 52 + puts "[#{Time.now}] Starting background worker..." 53 + worker.run
+5
config/deploy.rb
··· 22 22 23 23 after 'deploy', 'deploy:cleanup' 24 24 after 'deploy:migrations', 'deploy:cleanup' 25 + after 'deploy:update_code', 'deploy:link_shared' 25 26 26 27 namespace :deploy do 27 28 task :restart, :roles => :web do ··· 32 33 run "cd #{release_path} && bundle config set --local deployment 'true'" 33 34 run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'" 34 35 run "cd #{release_path} && bundle config set --local without 'development test'" 36 + end 37 + 38 + task :link_shared do 39 + run "ln -s #{shared_path}/env #{release_path}/.env" 35 40 end 36 41 end
+9
db/migrate/20250920182018_add_import_jobs.rb
··· 1 + class AddImportJobs < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :import_jobs do |t| 4 + t.integer "user_id", null: false 5 + end 6 + 7 + add_index :import_jobs, :user_id, unique: true 8 + end 9 + end
+7
db/migrate/20250923014702_add_queued_field.rb
··· 1 + class AddQueuedField < ActiveRecord::Migration[7.2] 2 + def change 3 + [:likes, :reposts, :quotes, :pins].each do |table| 4 + add_column(table, :queue, :smallint, null: true) 5 + end 6 + end 7 + end
+5
db/migrate/20250923180153_add_user_created_at.rb
··· 1 + class AddUserCreatedAt < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :users, :registered_at, :datetime, null: true 4 + end 5 + end
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
··· 1 + class AddFetchedUntilToImports < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :imports, :fetched_until, :datetime 4 + end 5 + end
+12 -1
db/schema.rb
··· 10 10 # 11 11 # It's strongly recommended that you check this file into your version control system. 12 12 13 - ActiveRecord::Schema[7.2].define(version: 2025_09_18_024627) do 13 + ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do 14 14 # These are extensions that must be enabled in order to support this database 15 15 enable_extension "plpgsql" 16 + 17 + create_table "import_jobs", force: :cascade do |t| 18 + t.integer "user_id", null: false 19 + t.index ["user_id"], name: "index_import_jobs_on_user_id", unique: true 20 + end 16 21 17 22 create_table "imports", force: :cascade do |t| 18 23 t.integer "user_id", null: false ··· 20 25 t.datetime "started_from" 21 26 t.datetime "last_completed" 22 27 t.string "collection", limit: 20, null: false 28 + t.datetime "fetched_until" 23 29 t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true 24 30 end 25 31 ··· 29 35 t.datetime "time", null: false 30 36 t.bigint "post_id" 31 37 t.string "post_uri" 38 + t.integer "queue", limit: 2 32 39 t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true 33 40 t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 34 41 end ··· 40 47 t.text "pin_text", null: false 41 48 t.bigint "post_id" 42 49 t.string "post_uri" 50 + t.integer "queue", limit: 2 43 51 t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true 44 52 t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 45 53 end ··· 61 69 t.text "quote_text", null: false 62 70 t.bigint "post_id" 63 71 t.string "post_uri" 72 + t.integer "queue", limit: 2 64 73 t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true 65 74 t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 66 75 end ··· 71 80 t.datetime "time", null: false 72 81 t.bigint "post_id" 73 82 t.string "post_uri" 83 + t.integer "queue", limit: 2 74 84 t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true 75 85 t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 76 86 end ··· 82 92 83 93 create_table "users", id: :serial, force: :cascade do |t| 84 94 t.string "did", limit: 260, null: false 95 + t.datetime "registered_at" 85 96 t.index ["did"], name: "index_users_on_did", unique: true 86 97 end 87 98 end
+24 -6
lib/tasks/import.rake
··· 1 - require_relative '../../app/console_report' 2 1 require_relative '../../app/import_manager' 3 2 require_relative '../../app/item_queue' 4 3 require_relative '../../app/models/user' 5 4 require_relative '../../app/post_downloader' 5 + require_relative '../../app/reports/console_report' 6 + require_relative '../../app/reports/simple_logger' 7 + 8 + task :enqueue_user do 9 + unless ENV['DID'] 10 + raise "Required DID parameter missing" 11 + end 12 + 13 + user = User.find_or_create_by!(did: ENV['DID']) 14 + 15 + if user.import_job 16 + puts "Import for #{user.did} is already scheduled." 17 + elsif user.active? 18 + puts "Import for #{user.did} has already started." 19 + else 20 + user.create_import_job! 21 + puts "Import for #{user.did} scheduled ✓" 22 + end 23 + end 6 24 7 25 task :import_user do 8 - unless ENV['USER'] 9 - raise "Required USER parameter missing" 26 + unless ENV['DID'] 27 + raise "Required DID parameter missing" 10 28 end 11 29 12 - user = User.find_or_create_by!(did: ENV['USER']) 13 - pending = !ENV['SKIP_PENDING'] 30 + user = User.find_or_create_by!(did: ENV['DID']) 14 31 15 32 unless ENV['COLLECTION'] 16 33 raise "Required COLLECTION parameter missing" ··· 18 35 19 36 import = ImportManager.new(user) 20 37 import.report = ConsoleReport.new 38 + import.logger = SimpleLogger.new 21 39 import.time_limit = ENV['UNTIL'] 22 40 23 41 trap("SIGINT") { ··· 25 43 exit 26 44 } 27 45 28 - import.start(ENV['COLLECTION'], pending) 46 + import.start(ENV['COLLECTION']) 29 47 30 48 puts "\n\n\n\n\n" 31 49 end