Don't forget to lycansubscribe

Compare changes

Choose any two refs to compare.

+10
.env.example
···
··· 1 + # hostname on which you're running the Lycan server 2 + SERVER_HOSTNAME=lycan.feeds.blue 3 + 4 + # customizing servers we connect to 5 + RELAY_HOST=bsky.network 6 + # or: JETSTREAM_HOST=jetstream2.us-east.bsky.network 7 + APPVIEW_HOST=public.api.bsky.app 8 + 9 + # put your handle here 10 + # FIREHOSE_USER_AGENT="Lycan (@my.handle)"
+1
.gitignore
··· 1 log
··· 1 + .env 2 log
+4 -3
Gemfile
··· 2 3 gem 'activerecord', '~> 7.2' 4 gem 'sinatra-activerecord', '~> 2.0' 5 gem 'pg' 6 gem 'rake' 7 gem 'irb' 8 gem 'rainbow' 9 - 10 - gem 'sinatra' 11 12 gem 'minisky', '~> 0.5' 13 gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit' ··· 17 gem 'jwt' 18 19 group :development do 20 - gem 'puma' 21 gem 'rackup' 22 gem 'capistrano', '~> 2.0' 23
··· 2 3 gem 'activerecord', '~> 7.2' 4 gem 'sinatra-activerecord', '~> 2.0' 5 + gem 'sinatra' 6 gem 'pg' 7 gem 'rake' 8 gem 'irb' 9 + 10 gem 'rainbow' 11 + gem 'dotenv' 12 13 gem 'minisky', '~> 0.5' 14 gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit' ··· 18 gem 'jwt' 19 20 group :development do 21 + gem 'thin' 22 gem 'rackup' 23 gem 'capistrano', '~> 2.0' 24
+41 -34
Gemfile.lock
··· 7 GEM 8 remote: https://rubygems.org/ 9 specs: 10 - activemodel (7.2.2.2) 11 - activesupport (= 7.2.2.2) 12 - activerecord (7.2.2.2) 13 - activemodel (= 7.2.2.2) 14 - activesupport (= 7.2.2.2) 15 timeout (>= 0.4.0) 16 - activesupport (7.2.2.2) 17 base64 18 benchmark (>= 0.3) 19 bigdecimal ··· 29 base58 (0.2.3) 30 base64 (0.3.0) 31 bcrypt_pbkdf (1.1.1) 32 - benchmark (0.4.1) 33 - bigdecimal (3.2.2) 34 capistrano (2.15.11) 35 highline 36 net-scp (>= 1.0.0) ··· 39 net-ssh-gateway (>= 1.1.0) 40 cbor (0.5.10.1) 41 concurrent-ruby (1.3.5) 42 - connection_pool (2.5.3) 43 - date (3.4.1) 44 drb (2.2.3) 45 ed25519 (1.4.0) 46 - erb (5.0.2) 47 eventmachine (1.2.7) 48 faye-websocket (0.12.0) 49 eventmachine (>= 0.12.0) ··· 53 i18n (1.14.7) 54 concurrent-ruby (~> 1.0) 55 io-console (0.8.1) 56 - irb (1.15.2) 57 pp (>= 0.6.0) 58 rdoc (>= 4.0.0) 59 reline (>= 0.4.2) ··· 62 logger (1.7.0) 63 minisky (0.5.0) 64 base64 (~> 0.1) 65 - minitest (5.25.5) 66 mustermann (3.0.4) 67 ruby2_keywords (~> 0.0.1) 68 net-scp (4.1.0) ··· 72 net-ssh (7.3.0) 73 net-ssh-gateway (2.0.0) 74 net-ssh (>= 4.0.0) 75 - nio4r (2.7.4) 76 - pg (1.6.1) 77 - pg (1.6.1-aarch64-linux) 78 - pg (1.6.1-aarch64-linux-musl) 79 - pg (1.6.1-arm64-darwin) 80 - pg (1.6.1-x86_64-darwin) 81 - pg (1.6.1-x86_64-linux) 82 - pg (1.6.1-x86_64-linux-musl) 83 - pp (0.6.2) 84 prettyprint 85 prettyprint (0.2.0) 86 psych (5.2.6) 87 date 88 stringio 89 - puma (6.6.1) 90 - nio4r (~> 2.0) 91 - rack (3.2.0) 92 - rack-protection (4.1.1) 93 base64 (>= 0.1.0) 94 logger (>= 1.6.0) 95 rack (>= 3.0.0, < 4) ··· 99 rackup (2.2.1) 100 rack (>= 3) 101 rainbow (3.1.1) 102 - rake (13.3.0) 103 - rdoc (6.14.2) 104 erb 105 psych (>= 4.0.0) 106 - reline (0.6.2) 107 io-console (~> 0.5) 108 ruby2_keywords (0.0.5) 109 securerandom (0.4.1) 110 - sinatra (4.1.1) 111 logger (>= 1.6.0) 112 mustermann (~> 3.0) 113 rack (>= 3.0.0, < 4) 114 - rack-protection (= 4.1.1) 115 rack-session (>= 2.0.0, < 3) 116 tilt (~> 2.0) 117 sinatra-activerecord (2.0.28) ··· 123 cbor (~> 0.5, >= 0.5.9.6) 124 eventmachine (~> 1.2, >= 1.2.7) 125 faye-websocket (~> 0.12) 126 - stringio (3.1.7) 127 tilt (2.6.1) 128 - timeout (0.4.3) 129 tzinfo (2.0.6) 130 concurrent-ruby (~> 1.0) 131 websocket-driver (0.8.0) ··· 148 bcrypt_pbkdf (>= 1.0, < 2.0) 149 capistrano (~> 2.0) 150 didkit (~> 0.2)! 151 ed25519 (>= 1.2, < 2.0) 152 irb 153 jwt 154 minisky (~> 0.5) 155 pg 156 - puma 157 rackup 158 rainbow 159 rake 160 sinatra 161 sinatra-activerecord (~> 2.0) 162 skyfall (~> 0.6) 163 164 BUNDLED WITH 165 2.7.0
··· 7 GEM 8 remote: https://rubygems.org/ 9 specs: 10 + activemodel (7.2.3) 11 + activesupport (= 7.2.3) 12 + activerecord (7.2.3) 13 + activemodel (= 7.2.3) 14 + activesupport (= 7.2.3) 15 timeout (>= 0.4.0) 16 + activesupport (7.2.3) 17 base64 18 benchmark (>= 0.3) 19 bigdecimal ··· 29 base58 (0.2.3) 30 base64 (0.3.0) 31 bcrypt_pbkdf (1.1.1) 32 + benchmark (0.5.0) 33 + bigdecimal (3.3.1) 34 capistrano (2.15.11) 35 highline 36 net-scp (>= 1.0.0) ··· 39 net-ssh-gateway (>= 1.1.0) 40 cbor (0.5.10.1) 41 concurrent-ruby (1.3.5) 42 + connection_pool (2.5.4) 43 + daemons (1.4.1) 44 + date (3.5.0) 45 + dotenv (3.1.8) 46 drb (2.2.3) 47 ed25519 (1.4.0) 48 + erb (6.0.0) 49 eventmachine (1.2.7) 50 faye-websocket (0.12.0) 51 eventmachine (>= 0.12.0) ··· 55 i18n (1.14.7) 56 concurrent-ruby (~> 1.0) 57 io-console (0.8.1) 58 + irb (1.15.3) 59 pp (>= 0.6.0) 60 rdoc (>= 4.0.0) 61 reline (>= 0.4.2) ··· 64 logger (1.7.0) 65 minisky (0.5.0) 66 base64 (~> 0.1) 67 + minitest (5.26.1) 68 mustermann (3.0.4) 69 ruby2_keywords (~> 0.0.1) 70 net-scp (4.1.0) ··· 74 net-ssh (7.3.0) 75 net-ssh-gateway (2.0.0) 76 net-ssh (>= 4.0.0) 77 + pg (1.6.2) 78 + pg (1.6.2-aarch64-linux) 79 + pg (1.6.2-aarch64-linux-musl) 80 + pg (1.6.2-arm64-darwin) 81 + pg (1.6.2-x86_64-darwin) 82 + pg (1.6.2-x86_64-linux) 83 + pg (1.6.2-x86_64-linux-musl) 84 + pp (0.6.3) 85 prettyprint 86 prettyprint (0.2.0) 87 psych (5.2.6) 88 date 89 stringio 90 + rack (3.2.4) 91 + rack-protection (4.2.1) 92 base64 (>= 0.1.0) 93 logger (>= 1.6.0) 94 rack (>= 3.0.0, < 4) ··· 98 rackup (2.2.1) 99 rack (>= 3) 100 rainbow (3.1.1) 101 + rake (13.3.1) 102 + rdoc (6.15.1) 103 erb 104 psych (>= 4.0.0) 105 + tsort 106 + reline (0.6.3) 107 io-console (~> 0.5) 108 ruby2_keywords (0.0.5) 109 securerandom (0.4.1) 110 + sinatra (4.2.1) 111 logger (>= 1.6.0) 112 mustermann (~> 3.0) 113 rack (>= 3.0.0, < 4) 114 + rack-protection (= 4.2.1) 115 rack-session (>= 2.0.0, < 3) 116 tilt (~> 2.0) 117 sinatra-activerecord (2.0.28) ··· 123 cbor (~> 0.5, >= 0.5.9.6) 124 eventmachine (~> 1.2, >= 1.2.7) 125 faye-websocket (~> 0.12) 126 + stringio (3.1.8) 127 + thin (2.0.1) 128 + daemons (~> 1.0, >= 1.0.9) 129 + eventmachine (~> 1.0, >= 1.0.4) 130 + logger 131 + rack (>= 1, < 4) 132 tilt (2.6.1) 133 + timeout (0.4.4) 134 + tsort (0.2.0) 135 tzinfo (2.0.6) 136 concurrent-ruby (~> 1.0) 137 websocket-driver (0.8.0) ··· 154 bcrypt_pbkdf (>= 1.0, < 2.0) 155 capistrano (~> 2.0) 156 didkit (~> 0.2)! 157 + dotenv 158 ed25519 (>= 1.2, < 2.0) 159 irb 160 jwt 161 minisky (~> 0.5) 162 pg 163 rackup 164 rainbow 165 rake 166 sinatra 167 sinatra-activerecord (~> 2.0) 168 skyfall (~> 0.6) 169 + thin 170 171 BUNDLED WITH 172 2.7.0
+3
Procfile
···
··· 1 + server: bin/server 2 + firehose: bin/firehose 3 + worker: bin/worker
+110
README.md
···
··· 1 + # Lycan ๐Ÿบ 2 + 3 + A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive. 4 + 5 + 6 + ## How it works 7 + 8 + Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose. 9 + 10 + At the moment, Lycan indexes four types of content: 11 + 12 + - posts you've liked 13 + - posts you've reposted 14 + - posts you've quoted 15 + - your old-style bookmarks (using the ๐Ÿ“Œ emoji method) 16 + 17 + New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added. 18 + 19 + Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API โ€“ the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)). 20 + 21 + The service consists of three separate components: 22 + 23 + - a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported 24 + - a **background worker**, which runs the import process 25 + - an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying 26 + 27 + 28 + ## Setting up on localhost 29 + 30 + This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it. 31 + 32 + A Postgres database is also required (again, any non-ancient version should work). 33 + 34 + Download or clone the repository, then install the dependencies: 35 + 36 + ``` 37 + bundle install 38 + ``` 39 + 40 + Next, create the database โ€“ the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task: 41 + 42 + ``` 43 + bundle exec rake db:create 44 + ``` 45 + 46 + Then, run the migrations: 47 + 48 + ``` 49 + bundle exec rake db:migrate 50 + ``` 51 + 52 + To run an import, you will need to run three separate processes, probably in separate terminal tabs: 53 + 54 + 1) the firehose client, [`bin/firehose`](bin/firehose) 55 + 2) the background worker, [`bin/worker`](bin/worker) 56 + 3) the Sinatra HTTP server, [`bin/server`](bin/server) 57 + 58 + The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu โ€“ but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL. 59 + 60 + You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.) 61 + 62 + 63 + ## Configuration 64 + 65 + There's a few things you can configure through ENV variables: 66 + 67 + - `RELAY_HOST` โ€“ hostname of the relay to use for the firehose (default: `bsky.network`) 68 + - `JETSTREAM_HOST` โ€“ alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance 69 + - `FIREHOSE_USER_AGENT` โ€“ when running in production, it's recommended that you set this to some name that identifies who is running the service 70 + - `APPVIEW_HOST` โ€“ hostname of the AppView used to download posts (default: `public.api.bsky.app`) 71 + - `SERVER_HOSTNAME` โ€“ hostname of the server on which you're running the service in production 72 + 73 + 74 + ## Rake tasks 75 + 76 + Some Rake tasks that might be useful: 77 + 78 + ``` 79 + bundle exec rake enqueue_user DID=did:plc:qweqwe 80 + ``` 81 + 82 + - request an import of the given account (to be handled by firehose + worker) 83 + 84 + ``` 85 + bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all 86 + ``` 87 + 88 + - run a complete import synchronously 89 + 90 + ``` 91 + bundle exec rake process_posts 92 + ``` 93 + 94 + - process all previously queued and unfinished or failed items 95 + 96 + 97 + ## Running in production 98 + 99 + This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup. 100 + 101 + On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server โ€“ my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose). 102 + 103 + 104 + ## Credits 105 + 106 + Copyright ยฉ 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)). 107 + 108 + The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 109 + 110 + Bug reports and pull requests are welcome ๐Ÿ˜Ž
+1
Rakefile
··· 1 require 'bundler/setup' 2 require 'sinatra/activerecord' 3 require 'sinatra/activerecord/rake' 4 5 Rake.add_rakelib File.join(__dir__, 'lib', 'tasks') 6
··· 1 require 'bundler/setup' 2 require 'sinatra/activerecord' 3 require 'sinatra/activerecord/rake' 4 + require 'dotenv/load' 5 6 Rake.add_rakelib File.join(__dir__, 'lib', 'tasks') 7
+2
app/errors.rb
···
··· 1 + class InvalidRecordError < StandardError 2 + end
+35 -4
app/firehose_client.rb
··· 13 14 def initialize 15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 16 - @service = DEFAULT_RELAY 17 end 18 19 def start ··· 26 last_cursor = load_or_init_cursor 27 cursor = @start_cursor || last_cursor 28 29 - @sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 30 - @sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}" 31 @sky.check_heartbeat = true 32 33 @sky.on_message do |m| ··· 138 end 139 end 140 end 141 end 142 143 def process_account_event(msg) 144 if msg.status == :deleted 145 if user = User.find_by(did: msg.repo) 146 user.destroy 147 - @active_users.delete_if { |u| u.id == user.id } 148 end 149 end 150 end ··· 153 return unless @current_user 154 155 if op.action == :create 156 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 157 elsif op.action == :delete 158 @current_user.likes.where(rkey: op.rkey).delete_all 159 end 160 end 161 162 def process_repost(msg, op) 163 return unless @current_user 164 165 if op.action == :create 166 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 167 elsif op.action == :delete 168 @current_user.reposts.where(rkey: op.rkey).delete_all 169 end 170 end 171 172 def process_post(msg, op) 173 if op.action == :create 174 if @current_user 175 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 176 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) ··· 185 post.destroy 186 end 187 end 188 end 189 190 def log(text)
··· 13 14 def initialize 15 @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 16 + 17 + if ENV['RELAY_HOST'] 18 + @service = ENV['RELAY_HOST'] 19 + elsif ENV['JETSTREAM_HOST'] 20 + @service = ENV['JETSTREAM_HOST'] 21 + @jetstream = true 22 + else 23 + @service = DEFAULT_RELAY 24 + end 25 end 26 27 def start ··· 34 last_cursor = load_or_init_cursor 35 cursor = @start_cursor || last_cursor 36 37 + @sky = if @jetstream 38 + Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] }) 39 + else 40 + Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 41 + end 42 + 43 + @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string 44 @sky.check_heartbeat = true 45 46 @sky.on_message do |m| ··· 151 end 152 end 153 end 154 + rescue CBOR::UnpackError 155 + # ignore invalid records 156 end 157 158 def process_account_event(msg) 159 if msg.status == :deleted 160 if user = User.find_by(did: msg.repo) 161 user.destroy 162 + @active_users.delete_if { |k, u| u.id == user.id } 163 end 164 end 165 end ··· 168 return unless @current_user 169 170 if op.action == :create 171 + return if op.raw_record.nil? 172 @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 173 elsif op.action == :delete 174 @current_user.likes.where(rkey: op.rkey).delete_all 175 end 176 + rescue StandardError => e 177 + log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}" 178 + log e.backtrace.reject { |x| x.include?('/ruby/') } 179 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 180 end 181 182 def process_repost(msg, op) 183 return unless @current_user 184 185 if op.action == :create 186 + return if op.raw_record.nil? 187 @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 188 elsif op.action == :delete 189 @current_user.reposts.where(rkey: op.rkey).delete_all 190 end 191 + rescue StandardError => e 192 + log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}" 193 + log e.backtrace.reject { |x| x.include?('/ruby/') } 194 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 195 end 196 197 def process_post(msg, op) 198 if op.action == :create 199 + return if op.raw_record.nil? 200 + 201 if @current_user 202 @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 203 @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) ··· 212 post.destroy 213 end 214 end 215 + rescue StandardError => e 216 + log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}" 217 + log e.backtrace.reject { |x| x.include?('/ruby/') } 218 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 219 end 220 221 def log(text)
+19 -3
app/import_manager.rb
··· 5 require_relative 'post_downloader' 6 7 class ImportManager 8 - attr_accessor :report, :time_limit 9 10 def initialize(user) 11 @user = user ··· 39 40 downloader = PostDownloader.new 41 downloader.report = @report 42 43 - download_thread = Thread.new { downloader.import_from_queue(queue) } 44 45 import_threads = importers.map do |import| 46 import.item_queue = queue 47 import.report = @report 48 49 - Thread.new { import.run_import(@time_limit) } 50 end 51 52 import_threads.each { |i| i.join } 53 54 downloader.stop_when_empty = true 55 download_thread.join
··· 5 require_relative 'post_downloader' 6 7 class ImportManager 8 + attr_accessor :report, :time_limit, :logger, :log_status_updates 9 10 def initialize(user) 11 @user = user ··· 39 40 downloader = PostDownloader.new 41 downloader.report = @report 42 + downloader.logger = @logger 43 44 + download_thread = Thread.new do 45 + @logger&.info "Starting downloader thread for #{@user}" if @log_status_updates 46 + 47 + downloader.import_from_queue(queue) 48 + 49 + @logger&.info "Ended downloader thread for #{@user}" if @log_status_updates 50 + end 51 52 import_threads = importers.map do |import| 53 import.item_queue = queue 54 import.report = @report 55 + import.logger = @logger 56 57 + Thread.new do 58 + @logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates 59 + 60 + import.run_import(@time_limit) 61 + 62 + @logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates 63 + end 64 end 65 66 import_threads.each { |i| i.join } 67 + 68 + @logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates 69 70 downloader.stop_when_empty = true 71 download_thread.join
+22 -6
app/import_worker.rb
··· 9 require_relative 'reports/basic_report' 10 11 class ImportWorker 12 - attr_accessor :verbose 13 14 class UserThread < Thread 15 - def initialize(user, collections, verbose = false) 16 @user = user 17 @verbose = verbose 18 19 super { run(collections) } 20 end ··· 24 end 25 26 def run(collections) 27 if @user.registered_at.nil? 28 registration_time = get_registration_time(@user) 29 @user.update!(registered_at: registration_time) 30 end 31 32 import = ImportManager.new(@user) 33 - import.report = BasicReport.new if @verbose 34 import.start(collections) 35 end 36 37 def get_registration_time(user) 38 - profile = Minisky.new('public.api.bsky.app', nil).get_request('app.bsky.actor.getProfile', { actor: user.did }) 39 Time.parse(profile['createdAt']) 40 end 41 end ··· 45 46 @firehose_thread = Thread.new { process_firehose_items } 47 @downloader = PostDownloader.new 48 49 loop do 50 @user_threads.delete_if { |t| !t.alive? } 51 52 - if user = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).first 53 collections = user.imports.unfinished.map(&:collection) 54 - thread = UserThread.new(user, collections, @verbose) 55 @user_threads << thread 56 end 57
··· 9 require_relative 'reports/basic_report' 10 11 class ImportWorker 12 + attr_accessor :verbose, :logger 13 14 class UserThread < Thread 15 + def initialize(user, collections, logger, verbose = false) 16 @user = user 17 @verbose = verbose 18 + @logger = logger 19 20 super { run(collections) } 21 end ··· 25 end 26 27 def run(collections) 28 + @logger&.info "Starting import thread for #{@user}" 29 + 30 if @user.registered_at.nil? 31 registration_time = get_registration_time(@user) 32 @user.update!(registered_at: registration_time) 33 end 34 35 import = ImportManager.new(@user) 36 + 37 + if @logger 38 + import.report = BasicReport.new(@logger) if @verbose 39 + import.logger = @logger 40 + import.log_status_updates = true 41 + end 42 + 43 import.start(collections) 44 + 45 + @logger&.info "Ended import thread for #{@user}" 46 end 47 48 def get_registration_time(user) 49 + sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 50 + profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 51 + 52 Time.parse(profile['createdAt']) 53 end 54 end ··· 58 59 @firehose_thread = Thread.new { process_firehose_items } 60 @downloader = PostDownloader.new 61 + @downloader.logger = @logger 62 63 loop do 64 @user_threads.delete_if { |t| !t.alive? } 65 66 + users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a 67 + 68 + users.each do |user| 69 collections = user.imports.unfinished.map(&:collection) 70 + thread = UserThread.new(user, collections, @logger, @verbose) 71 @user_threads << thread 72 end 73
+11 -3
app/importers/base_importer.rb
··· 1 require 'didkit' 2 require 'minisky' 3 4 require_relative '../at_uri' 5 require_relative '../models/post' 6 require_relative '../models/user' 7 8 class BaseImporter 9 - attr_accessor :item_queue, :report 10 11 def initialize(user) 12 @did = DID.new(user.did) ··· 31 end 32 33 @time_limit = requested_time_limit || @import.last_completed 34 - puts "Fetching until: #{@time_limit}" if @time_limit 35 36 import_items 37 38 @import.update!(last_completed: @import.started_from) unless requested_time_limit 39 - @import.update!(cursor: nil, started_from: nil) 40 @report&.update(importers: { importer_name => { :finished => true }}) 41 end 42 43 def import_items 44 raise NotImplementedError 45 end 46 end
··· 1 require 'didkit' 2 require 'minisky' 3 + require 'time' 4 5 require_relative '../at_uri' 6 + require_relative '../errors' 7 require_relative '../models/post' 8 require_relative '../models/user' 9 10 class BaseImporter 11 + attr_accessor :item_queue, :report, :logger 12 13 def initialize(user) 14 @did = DID.new(user.did) ··· 33 end 34 35 @time_limit = requested_time_limit || @import.last_completed 36 + @logger&.info "Fetching until: #{@time_limit}" if @time_limit 37 38 import_items 39 40 @import.update!(last_completed: @import.started_from) unless requested_time_limit 41 + @import.update!(cursor: nil, started_from: nil, fetched_until: nil) 42 @report&.update(importers: { importer_name => { :finished => true }}) 43 end 44 45 def import_items 46 raise NotImplementedError 47 + end 48 + 49 + def created_at(record) 50 + Time.parse(record['createdAt']) 51 + rescue StandardError 52 + raise InvalidRecordError 53 end 54 end
+12 -9
app/importers/likes_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class LikesImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 22 23 if like && like.pending? && @item_queue 24 @item_queue.push(like) 25 @report&.update(queue: { length: @item_queue.length }) 26 end 27 - rescue StandardError => e 28 - puts "Error in LikesImporter: #{record['uri']}: #{e}" 29 end 30 end 31 32 params[:cursor] = cursor 33 - @import.update!(cursor: cursor) 34 35 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 37 end 38 end 39 end
··· 1 require_relative 'base_importer' 2 3 class LikesImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 18 19 + record_date = like&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 + 22 if like && like.pending? && @item_queue 23 @item_queue.push(like) 24 @report&.update(queue: { length: @item_queue.length }) 25 end 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}" 28 end 29 end 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 params[:cursor] = cursor 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 37 38 break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 end 41 end 42 end
+12 -9
app/importers/posts_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class PostsImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 22 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 23 24 if @item_queue 25 if quote && quote.pending? 26 @item_queue.push(quote) ··· 32 33 @report&.update(queue: { length: @item_queue.length }) 34 end 35 - rescue StandardError => e 36 - puts "Error in PostsImporter: #{record['uri']}: #{e}" 37 end 38 end 39 40 params[:cursor] = cursor 41 - @import.update!(cursor: cursor) 42 43 break if !cursor 44 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 45 end 46 end 47 end
··· 1 require_relative 'base_importer' 2 3 class PostsImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 18 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 19 20 + record_date = quote&.time || pin&.time || created_at(record['value']) 21 + oldest_date = [oldest_date, record_date].compact.min 22 + 23 if @item_queue 24 if quote && quote.pending? 25 @item_queue.push(quote) ··· 31 32 @report&.update(queue: { length: @item_queue.length }) 33 end 34 + rescue InvalidRecordError => e 35 + @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}" 36 end 37 end 38 39 + @imported_count += records.length 40 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 41 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 42 + 43 params[:cursor] = cursor 44 + @import.update!(cursor: cursor, fetched_until: oldest_date) 45 46 break if !cursor 47 + break if @time_limit && oldest_date && oldest_date < @time_limit 48 end 49 end 50 end
+12 -9
app/importers/reposts_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class RepostsImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 22 23 if repost && repost.pending? && @item_queue 24 @item_queue.push(repost) 25 @report&.update(queue: { length: @item_queue.length }) 26 end 27 - rescue StandardError => e 28 - puts "Error in RepostsImporter: #{record['uri']}: #{e}" 29 end 30 end 31 32 params[:cursor] = cursor 33 - @import.update!(cursor: cursor) 34 35 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 37 end 38 end 39 end
··· 1 require_relative 'base_importer' 2 3 class RepostsImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 18 19 + record_date = repost&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 + 22 if repost && repost.pending? && @item_queue 23 @item_queue.push(repost) 24 @report&.update(queue: { length: @item_queue.length }) 25 end 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}" 28 end 29 end 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 params[:cursor] = cursor 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 37 38 break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 end 41 end 42 end
+1
app/init.rb
··· 1 require 'sinatra/activerecord' 2 3 ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.datetime_type = :timestamptz 4 RubyVM::YJIT.enable
··· 1 require 'sinatra/activerecord' 2 + require 'dotenv/load' 3 4 ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.datetime_type = :timestamptz 5 RubyVM::YJIT.enable
+26
app/models/import.rb
··· 9 validates_uniqueness_of :collection, scope: :user_id 10 11 scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') } 12 end
··· 9 validates_uniqueness_of :collection, scope: :user_id 10 11 scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') } 12 + 13 + IMPORT_END = Time.at(0) 14 + 15 + def imported_until 16 + return nil if cursor.nil? && last_completed.nil? 17 + 18 + groups = case collection 19 + when 'likes' 20 + [:likes] 21 + when 'reposts' 22 + [:reposts] 23 + when 'posts' 24 + [:pins, :quotes] 25 + end 26 + 27 + newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last } 28 + newest_queued = newest_queued_items.compact.sort_by(&:time).last 29 + 30 + if newest_queued 31 + newest_queued.time 32 + elsif fetched_until 33 + fetched_until 34 + else 35 + IMPORT_END 36 + end 37 + end 38 end
+7
app/models/post.rb
··· 10 validates_length_of :text, maximum: 1000 11 validates_length_of :data, maximum: 10000 12 13 belongs_to :user 14 15 has_many :likes, dependent: :delete_all ··· 26 27 def at_uri 28 "at://#{user.did}/app.bsky.feed.post/#{rkey}" 29 end 30 end
··· 10 validates_length_of :text, maximum: 1000 11 validates_length_of :data, maximum: 10000 12 13 + validate :check_null_bytes 14 + 15 belongs_to :user 16 17 has_many :likes, dependent: :delete_all ··· 28 29 def at_uri 30 "at://#{user.did}/app.bsky.feed.post/#{rkey}" 31 + end 32 + 33 + def check_null_bytes 34 + # Postgres doesn't allow null bytes in strings 35 + errors.add(:text, 'must not contain a null byte') if text.include?("\u0000") 36 end 37 end
+19 -51
app/models/user.rb
··· 7 require_relative 'pin' 8 require_relative 'post' 9 require_relative 'repost' 10 11 class User < ActiveRecord::Base 12 validates_presence_of :did ··· 19 20 before_destroy :delete_posts_cascading 21 22 - has_many :likes, foreign_key: 'actor_id', dependent: :delete_all do 23 - def import_from_record(like_uri, record, **args) 24 - like = self.new_from_record(like_uri, record) 25 - return nil if like.nil? || self.where(rkey: like.rkey).exists? 26 - 27 - like.import_item!(args) 28 - end 29 - end 30 - 31 - has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all do 32 - def import_from_record(repost_uri, record, **args) 33 - repost = self.new_from_record(repost_uri, record) 34 - return nil if repost.nil? || self.where(rkey: repost.rkey).exists? 35 - 36 - repost.import_item!(args) 37 - end 38 - end 39 - 40 - has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all do 41 - def import_from_record(post_uri, record, **args) 42 - quote = self.new_from_record(post_uri, record) 43 - return nil if quote.nil? || self.where(rkey: quote.rkey).exists? 44 - 45 - quote.import_item!(args) 46 - end 47 - end 48 - 49 - has_many :pins, foreign_key: 'actor_id', dependent: :delete_all do 50 - def import_from_record(post_uri, record, **args) 51 - pin = self.new_from_record(post_uri, record) 52 - return nil if pin.nil? || self.where(rkey: pin.rkey).exists? 53 - 54 - pin.import_item!(args) 55 - end 56 - end 57 58 def self.active 59 self.joins(:imports).distinct ··· 80 end 81 82 def imported_until 83 - return nil unless self.imports.exists? 84 85 - oldest_imported_items = [] 86 - started = false 87 - 88 - [:likes, :reposts, :pins, :quotes].each do |group| 89 - if self.send(group).where(queue: :import).exists? 90 - oldest_imported_items << self.send(group).where(queue: nil).order(:time).first 91 - end 92 end 93 94 - earliest_oldest = oldest_imported_items.compact.sort_by(&:time).last 95 96 - if earliest_oldest 97 - earliest_oldest.time 98 - elsif self.imports.merge(Import.unfinished).exists? 99 - nil 100 - else 101 - :end 102 - end 103 end 104 105 def delete_posts_cascading ··· 111 Quote.where(post_id: posts_subquery).delete_all 112 113 Post.where(user: self).delete_all 114 end 115 end
··· 7 require_relative 'pin' 8 require_relative 'post' 9 require_relative 'repost' 10 + require_relative 'user_importable' 11 12 class User < ActiveRecord::Base 13 validates_presence_of :did ··· 20 21 before_destroy :delete_posts_cascading 22 23 + has_many :likes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 24 + has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 25 + has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 26 + has_many :pins, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 27 28 def self.active 29 self.joins(:imports).distinct ··· 50 end 51 52 def imported_until 53 + import_positions = self.imports.map(&:imported_until) 54 55 + if import_positions.empty? || import_positions.any? { |x| x.nil? } 56 + nil 57 + else 58 + import_positions.sort.last 59 end 60 + end 61 62 + def erase_imports! 63 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).delete_all } 64 65 + self.import_job&.destroy 66 + self.imports.delete_all 67 end 68 69 def delete_posts_cascading ··· 75 Quote.where(post_id: posts_subquery).delete_all 76 77 Post.where(user: self).delete_all 78 + end 79 + 80 + def to_s 81 + %(<User id: #{id}, did: "#{did}">) 82 end 83 end
+20
app/models/user_importable.rb
···
··· 1 + require_relative '../errors' 2 + 3 + module UserImportable 4 + def import_from_record(item_uri, record, **args) 5 + item = try_build_from_record(item_uri, record) 6 + return nil if item.nil? || already_imported?(item) 7 + 8 + item.import_item!(args) 9 + end 10 + 11 + def try_build_from_record(item_uri, record) 12 + self.new_from_record(item_uri, record) 13 + rescue StandardError 14 + raise InvalidRecordError 15 + end 16 + 17 + def already_imported?(item) 18 + self.where(rkey: item.rkey).exists? 19 + end 20 + end
+35 -20
app/post_downloader.rb
··· 6 require_relative 'models/user' 7 8 class PostDownloader 9 - attr_accessor :report, :stop_when_empty 10 11 def initialize 12 - @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil) 13 14 @total_count = 0 15 @oldest_imported = Time.now ··· 51 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 52 53 response['posts'].each do |data| 54 - begin 55 - item = items.detect { |x| x.post_uri == data['uri'] } 56 - items.delete(item) 57 58 post = save_post(data['uri'], data['record']) 59 60 if post.valid? 61 - update_item(item, post) 62 else 63 - puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}" 64 - invalidate_item(item) 65 end 66 - rescue StandardError => e 67 - puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}" 68 end 69 end 70 71 check_missing_items(items) 72 rescue StandardError => e 73 - puts "Error in PostDownloader: #{e.class}: #{e}" 74 end 75 end 76 77 def save_post(post_uri, record) 78 did, _, rkey = AT_URI(post_uri) 79 80 - text = record.delete('text') 81 - created = record.delete('createdAt') 82 - 83 - author = User.find_or_create_by!(did: did) 84 85 if post = Post.find_by(user: author, rkey: rkey) 86 return post 87 end 88 89 - Post.create( 90 user: author, 91 rkey: rkey, 92 time: Time.parse(created), 93 text: text, 94 data: JSON.generate(record) 95 ) 96 end 97 98 def update_item(item, post) ··· 138 case status 139 when :active 140 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 141 - puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 142 item.destroy 143 when nil 144 # account was deleted, so all posts were deleted too 145 - puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 146 item.destroy 147 else 148 # account is inactive/suspended, but could come back, so leave it for now 149 - puts "#{item.post_uri}: account #{did} is inactive: #{status}" 150 end 151 rescue StandardError => e 152 hostname = did_obj.document.pds_host rescue "???" 153 - puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 154 155 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 156 item.destroy if hostname == 'bsky.social'
··· 6 require_relative 'models/user' 7 8 class PostDownloader 9 + attr_accessor :report, :logger, :stop_when_empty 10 11 def initialize 12 + @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 13 14 @total_count = 0 15 @oldest_imported = Time.now ··· 51 response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 52 53 response['posts'].each do |data| 54 + current_items = items.select { |x| x.post_uri == data['uri'] } 55 + items -= current_items 56 57 + begin 58 post = save_post(data['uri'], data['record']) 59 60 if post.valid? 61 + current_items.each { |i| update_item(i, post) } 62 else 63 + @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 64 + current_items.each { |i| invalidate_item(i) } 65 end 66 + rescue InvalidRecordError => e 67 + @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 68 + current_items.each { |i| i.update!(queue: nil) } 69 end 70 end 71 72 check_missing_items(items) 73 rescue StandardError => e 74 + @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" 75 end 76 end 77 78 def save_post(post_uri, record) 79 did, _, rkey = AT_URI(post_uri) 80 81 + begin 82 + author = User.find_or_create_by!(did: did) 83 + rescue ActiveRecord::RecordInvalid => e 84 + raise InvalidRecordError 85 + end 86 87 if post = Post.find_by(user: author, rkey: rkey) 88 return post 89 + else 90 + post = build_post(author, rkey, record) 91 + post.save 92 + post 93 end 94 + end 95 96 + def build_post(author, rkey, record) 97 + text = record.delete('text') 98 + created = record.delete('createdAt') 99 + 100 + record.delete('$type') 101 + 102 + Post.new( 103 user: author, 104 rkey: rkey, 105 time: Time.parse(created), 106 text: text, 107 data: JSON.generate(record) 108 ) 109 + rescue StandardError 110 + raise InvalidRecordError 111 end 112 113 def update_item(item, post) ··· 153 case status 154 when :active 155 # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 156 + # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 157 item.destroy 158 when nil 159 # account was deleted, so all posts were deleted too 160 + # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 161 item.destroy 162 else 163 # account is inactive/suspended, but could come back, so leave it for now 164 + # puts "#{item.post_uri}: account #{did} is inactive: #{status}" 165 end 166 rescue StandardError => e 167 hostname = did_obj.document.pds_host rescue "???" 168 + @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 169 170 # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 171 item.destroy if hostname == 'bsky.social'
+6 -2
app/reports/basic_report.rb
··· 1 class BasicReport 2 def update(data) 3 data.each do |k, v| 4 if k == :downloader 5 - p ({ k => v }) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 6 elsif k == :queue 7 next 8 else 9 - p ({ k => v}) 10 end 11 end 12 end
··· 1 class BasicReport 2 + def initialize(logger) 3 + @logger = logger 4 + end 5 + 6 def update(data) 7 data.each do |k, v| 8 if k == :downloader 9 + @logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 10 elsif k == :queue 11 next 12 else 13 + @logger.info({ k => v}.inspect) 14 end 15 end 16 end
+9
app/reports/simple_logger.rb
···
··· 1 + require 'logger' 2 + 3 + class SimpleLogger < Logger 4 + def initialize 5 + super(STDOUT) 6 + 7 + self.formatter = proc { |level, time, prog, msg| "[#{time}] #{msg}\n" } 8 + end 9 + end
+32 -11
app/server.rb
··· 8 9 class Server < Sinatra::Application 10 register Sinatra::ActiveRecordExtension 11 - set :port, 3000 12 13 PAGE_LIMIT = 25 14 - HOSTNAME = 'lycan.feeds.blue' 15 16 helpers do 17 def json_response(data) ··· 26 27 def get_user_did 28 if settings.development? 29 - if did = params[:user] 30 return did 31 else 32 halt json_error('AuthMissing', 'Missing "user" parameter', status: 401) ··· 65 66 before do 67 @authenticator = Authenticator.new(hostname: HOSTNAME) 68 end 69 70 get '/xrpc/blue.feeds.lycan.searchPosts' do 71 - headers['access-control-allow-origin'] = '*' 72 - 73 @user = load_user 74 75 if params[:query].to_s.strip.empty? ··· 113 json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor) 114 end 115 116 - post '/xrpc/blue.feeds.lycan.startImport' do 117 - headers['access-control-allow-origin'] = '*' 118 119 did = get_user_did 120 user = User.find_or_create_by(did: did) 121 ··· 132 end 133 134 get '/xrpc/blue.feeds.lycan.getImportStatus' do 135 - headers['access-control-allow-origin'] = '*' 136 - 137 did = get_user_did 138 user = User.find_by(did: did) 139 until_date = user&.imported_until 140 141 case until_date 142 when nil 143 - json_response(status: 'not_started') 144 - when :end 145 json_response(status: 'finished') 146 else 147 progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
··· 8 9 class Server < Sinatra::Application 10 register Sinatra::ActiveRecordExtension 11 + set :port, ENV['PORT'] || 3000 12 13 PAGE_LIMIT = 25 14 + HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue' 15 16 helpers do 17 def json_response(data) ··· 26 27 def get_user_did 28 if settings.development? 29 + did = if request.post? 30 + json = JSON.parse(request.body.read) 31 + request.body.rewind 32 + json['user'] 33 + else 34 + params[:user] 35 + end 36 + 37 + if did 38 return did 39 else 40 halt json_error('AuthMissing', 'Missing "user" parameter', status: 401) ··· 73 74 before do 75 @authenticator = Authenticator.new(hostname: HOSTNAME) 76 + 77 + if settings.development? 78 + headers['Access-Control-Allow-Origin'] = '*' 79 + end 80 + end 81 + 82 + get '/' do 83 + "Awoo ๐Ÿบ" 84 end 85 86 get '/xrpc/blue.feeds.lycan.searchPosts' do 87 @user = load_user 88 89 if params[:query].to_s.strip.empty? ··· 127 json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor) 128 end 129 130 + if settings.development? 131 + options '/xrpc/blue.feeds.lycan.startImport' do 132 + headers['Access-Control-Allow-Origin'] = '*' 133 + headers['Access-Control-Allow-Headers'] = 'content-type' 134 + end 135 + end 136 137 + post '/xrpc/blue.feeds.lycan.startImport' do 138 did = get_user_did 139 user = User.find_or_create_by(did: did) 140 ··· 151 end 152 153 get '/xrpc/blue.feeds.lycan.getImportStatus' do 154 did = get_user_did 155 user = User.find_by(did: did) 156 until_date = user&.imported_until 157 158 case until_date 159 when nil 160 + if user.import_job || user.imports.exists? 161 + json_response(status: 'scheduled') 162 + else 163 + json_response(status: 'not_started') 164 + end 165 + when Import::IMPORT_END 166 json_response(status: 'finished') 167 else 168 progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
+1
bin/server
··· 4 5 require 'bundler/setup' 6 require 'app/server' 7 8 Server.run!
··· 4 5 require 'bundler/setup' 6 require 'app/server' 7 + require 'thin' 8 9 Server.run!
+7 -2
bin/worker
··· 2 3 require 'bundler/setup' 4 require_relative '../app/import_worker' 5 6 $stdout.sync = true 7 ··· 17 puts " -v = verbose" 18 end 19 20 worker = ImportWorker.new 21 22 args = ARGV.dup 23 ··· 36 end 37 38 trap("SIGINT") { 39 - puts "Stopping..." 40 exit 41 } 42 43 trap("SIGTERM") { 44 - puts "Shutting down the service..." 45 exit 46 } 47 48 worker.run
··· 2 3 require 'bundler/setup' 4 require_relative '../app/import_worker' 5 + require_relative '../app/reports/simple_logger' 6 7 $stdout.sync = true 8 ··· 18 puts " -v = verbose" 19 end 20 21 + logger = SimpleLogger.new 22 worker = ImportWorker.new 23 + worker.logger = logger 24 25 args = ARGV.dup 26 ··· 39 end 40 41 trap("SIGINT") { 42 + puts 43 + puts "[#{Time.now}] Stopping..." 44 exit 45 } 46 47 trap("SIGTERM") { 48 + puts "[#{Time.now}] Shutting down the service..." 49 exit 50 } 51 52 + puts "[#{Time.now}] Starting background worker..." 53 worker.run
+5
config/deploy.rb
··· 22 23 after 'deploy', 'deploy:cleanup' 24 after 'deploy:migrations', 'deploy:cleanup' 25 26 namespace :deploy do 27 task :restart, :roles => :web do ··· 32 run "cd #{release_path} && bundle config set --local deployment 'true'" 33 run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'" 34 run "cd #{release_path} && bundle config set --local without 'development test'" 35 end 36 end
··· 22 23 after 'deploy', 'deploy:cleanup' 24 after 'deploy:migrations', 'deploy:cleanup' 25 + after 'deploy:update_code', 'deploy:link_shared' 26 27 namespace :deploy do 28 task :restart, :roles => :web do ··· 33 run "cd #{release_path} && bundle config set --local deployment 'true'" 34 run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'" 35 run "cd #{release_path} && bundle config set --local without 'development test'" 36 + end 37 + 38 + task :link_shared do 39 + run "ln -s #{shared_path}/env #{release_path}/.env" 40 end 41 end
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
···
··· 1 + class AddFetchedUntilToImports < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :imports, :fetched_until, :datetime 4 + end 5 + end
+2 -1
db/schema.rb
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 - ActiveRecord::Schema[7.2].define(version: 2025_09_23_180153) do 14 # These are extensions that must be enabled in order to support this database 15 enable_extension "plpgsql" 16 ··· 25 t.datetime "started_from" 26 t.datetime "last_completed" 27 t.string "collection", limit: 20, null: false 28 t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true 29 end 30
··· 10 # 11 # It's strongly recommended that you check this file into your version control system. 12 13 + ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do 14 # These are extensions that must be enabled in order to support this database 15 enable_extension "plpgsql" 16 ··· 25 t.datetime "started_from" 26 t.datetime "last_completed" 27 t.string "collection", limit: 20, null: false 28 + t.datetime "fetched_until" 29 t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true 30 end 31
+2
lib/tasks/import.rake
··· 3 require_relative '../../app/models/user' 4 require_relative '../../app/post_downloader' 5 require_relative '../../app/reports/console_report' 6 7 task :enqueue_user do 8 unless ENV['DID'] ··· 34 35 import = ImportManager.new(user) 36 import.report = ConsoleReport.new 37 import.time_limit = ENV['UNTIL'] 38 39 trap("SIGINT") {
··· 3 require_relative '../../app/models/user' 4 require_relative '../../app/post_downloader' 5 require_relative '../../app/reports/console_report' 6 + require_relative '../../app/reports/simple_logger' 7 8 task :enqueue_user do 9 unless ENV['DID'] ··· 35 36 import = ImportManager.new(user) 37 import.report = ConsoleReport.new 38 + import.logger = SimpleLogger.new 39 import.time_limit = ENV['UNTIL'] 40 41 trap("SIGINT") {