Don't forget to lycansubscribe

Compare changes

Choose any two refs to compare.

+10
.env.example
··· 1 + # hostname on which you're running the Lycan server 2 + SERVER_HOSTNAME=lycan.feeds.blue 3 + 4 + # customizing servers we connect to 5 + RELAY_HOST=bsky.network 6 + # or: JETSTREAM_HOST=jetstream2.us-east.bsky.network 7 + APPVIEW_HOST=public.api.bsky.app 8 + 9 + # put your handle here 10 + # FIREHOSE_USER_AGENT="Lycan (@my.handle)"
+1
.gitignore
··· 1 + .env 1 2 log
+2
Capfile
··· 1 + load 'deploy' 2 + load 'config/deploy'
+10 -3
Gemfile
··· 2 2 3 3 gem 'activerecord', '~> 7.2' 4 4 gem 'sinatra-activerecord', '~> 2.0' 5 + gem 'sinatra' 5 6 gem 'pg' 6 7 gem 'rake' 7 8 gem 'irb' 9 + 8 10 gem 'rainbow' 9 - 10 - gem 'sinatra' 11 + gem 'dotenv' 11 12 12 13 gem 'minisky', '~> 0.5' 13 14 gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit' 15 + gem 'skyfall', '~> 0.6' 14 16 15 17 gem 'base58' 16 18 gem 'jwt' 17 19 18 20 group :development do 19 - gem 'puma' 21 + gem 'thin' 20 22 gem 'rackup' 23 + gem 'capistrano', '~> 2.0' 24 + 25 + # for net-ssh for capistrano 26 + gem 'ed25519', '>= 1.2', '< 2.0' 27 + gem 'bcrypt_pbkdf', '>= 1.0', '< 2.0' 21 28 end
+78 -34
Gemfile.lock
··· 7 7 GEM 8 8 remote: https://rubygems.org/ 9 9 specs: 10 - activemodel (7.2.2.2) 11 - activesupport (= 7.2.2.2) 12 - activerecord (7.2.2.2) 13 - activemodel (= 7.2.2.2) 14 - activesupport (= 7.2.2.2) 10 + activemodel (7.2.3) 11 + activesupport (= 7.2.3) 12 + activerecord (7.2.3) 13 + activemodel (= 7.2.3) 14 + activesupport (= 7.2.3) 15 15 timeout (>= 0.4.0) 16 - activesupport (7.2.2.2) 16 + activesupport (7.2.3) 17 17 base64 18 18 benchmark (>= 0.3) 19 19 bigdecimal ··· 25 25 minitest (>= 5.1) 26 26 securerandom (>= 0.3) 27 27 tzinfo (~> 2.0, >= 2.0.5) 28 + base32 (0.3.4) 28 29 base58 (0.2.3) 29 30 base64 (0.3.0) 30 - benchmark (0.4.1) 31 - bigdecimal (3.2.2) 31 + bcrypt_pbkdf (1.1.1) 32 + benchmark (0.5.0) 33 + bigdecimal (3.3.1) 34 + capistrano (2.15.11) 35 + highline 36 + net-scp (>= 1.0.0) 37 + net-sftp (>= 2.0.0) 38 + net-ssh (>= 2.0.14) 39 + net-ssh-gateway (>= 1.1.0) 40 + cbor (0.5.10.1) 32 41 concurrent-ruby (1.3.5) 33 - connection_pool (2.5.3) 34 - date (3.4.1) 42 + connection_pool (2.5.4) 43 + daemons (1.4.1) 44 + date (3.5.0) 45 + dotenv (3.1.8) 35 46 drb (2.2.3) 36 - erb (5.0.2) 47 + ed25519 (1.4.0) 48 + erb (6.0.0) 49 + eventmachine (1.2.7) 50 + faye-websocket (0.12.0) 51 + eventmachine (>= 0.12.0) 52 + websocket-driver (>= 0.8.0) 53 + highline (3.1.2) 54 + reline 37 55 i18n (1.14.7) 38 56 concurrent-ruby (~> 1.0) 39 57 io-console (0.8.1) 40 - irb (1.15.2) 58 + irb (1.15.3) 41 59 pp (>= 0.6.0) 42 60 rdoc (>= 4.0.0) 43 61 reline (>= 0.4.2) ··· 46 64 logger (1.7.0) 47 65 minisky (0.5.0) 48 66 base64 (~> 0.1) 49 - minitest (5.25.5) 67 + minitest (5.26.1) 50 68 mustermann (3.0.4) 51 69 ruby2_keywords (~> 0.0.1) 52 - nio4r (2.7.4) 53 - pg (1.6.1) 54 - pg (1.6.1-aarch64-linux) 55 - pg (1.6.1-aarch64-linux-musl) 56 - pg (1.6.1-arm64-darwin) 57 - pg (1.6.1-x86_64-darwin) 58 - pg (1.6.1-x86_64-linux) 59 - pg (1.6.1-x86_64-linux-musl) 60 - pp (0.6.2) 70 + net-scp (4.1.0) 71 + net-ssh (>= 2.6.5, < 8.0.0) 72 + net-sftp (4.0.0) 73 + net-ssh (>= 5.0.0, < 8.0.0) 74 + net-ssh (7.3.0) 75 + net-ssh-gateway (2.0.0) 76 + net-ssh (>= 4.0.0) 77 + pg (1.6.2) 78 + pg (1.6.2-aarch64-linux) 79 + pg (1.6.2-aarch64-linux-musl) 80 + pg (1.6.2-arm64-darwin) 81 + pg (1.6.2-x86_64-darwin) 82 + pg (1.6.2-x86_64-linux) 83 + pg (1.6.2-x86_64-linux-musl) 84 + pp (0.6.3) 61 85 prettyprint 62 86 prettyprint (0.2.0) 63 87 psych (5.2.6) 64 88 date 65 89 stringio 66 - puma (6.6.1) 67 - nio4r (~> 2.0) 68 - rack (3.2.0) 69 - rack-protection (4.1.1) 90 + rack (3.2.4) 91 + rack-protection (4.2.1) 70 92 base64 (>= 0.1.0) 71 93 logger (>= 1.6.0) 72 94 rack (>= 3.0.0, < 4) ··· 76 98 rackup (2.2.1) 77 99 rack (>= 3) 78 100 rainbow (3.1.1) 79 - rake (13.3.0) 80 - rdoc (6.14.2) 101 + rake (13.3.1) 102 + rdoc (6.15.1) 81 103 erb 82 104 psych (>= 4.0.0) 83 - reline (0.6.2) 105 + tsort 106 + reline (0.6.3) 84 107 io-console (~> 0.5) 85 108 ruby2_keywords (0.0.5) 86 109 securerandom (0.4.1) 87 - sinatra (4.1.1) 110 + sinatra (4.2.1) 88 111 logger (>= 1.6.0) 89 112 mustermann (~> 3.0) 90 113 rack (>= 3.0.0, < 4) 91 - rack-protection (= 4.1.1) 114 + rack-protection (= 4.2.1) 92 115 rack-session (>= 2.0.0, < 3) 93 116 tilt (~> 2.0) 94 117 sinatra-activerecord (2.0.28) 95 118 activerecord (>= 4.1) 96 119 sinatra (>= 1.0) 97 - stringio (3.1.7) 120 + skyfall (0.6.0) 121 + base32 (~> 0.3, >= 0.3.4) 122 + base64 (~> 0.1) 123 + cbor (~> 0.5, >= 0.5.9.6) 124 + eventmachine (~> 1.2, >= 1.2.7) 125 + faye-websocket (~> 0.12) 126 + stringio (3.1.8) 127 + thin (2.0.1) 128 + daemons (~> 1.0, >= 1.0.9) 129 + eventmachine (~> 1.0, >= 1.0.4) 130 + logger 131 + rack (>= 1, < 4) 98 132 tilt (2.6.1) 99 - timeout (0.4.3) 133 + timeout (0.4.4) 134 + tsort (0.2.0) 100 135 tzinfo (2.0.6) 101 136 concurrent-ruby (~> 1.0) 137 + websocket-driver (0.8.0) 138 + base64 139 + websocket-extensions (>= 0.1.0) 140 + websocket-extensions (0.1.5) 102 141 103 142 PLATFORMS 104 143 aarch64-linux ··· 112 151 DEPENDENCIES 113 152 activerecord (~> 7.2) 114 153 base58 154 + bcrypt_pbkdf (>= 1.0, < 2.0) 155 + capistrano (~> 2.0) 115 156 didkit (~> 0.2)! 157 + dotenv 158 + ed25519 (>= 1.2, < 2.0) 116 159 irb 117 160 jwt 118 161 minisky (~> 0.5) 119 162 pg 120 - puma 121 163 rackup 122 164 rainbow 123 165 rake 124 166 sinatra 125 167 sinatra-activerecord (~> 2.0) 168 + skyfall (~> 0.6) 169 + thin 126 170 127 171 BUNDLED WITH 128 172 2.7.0
+3
Procfile
··· 1 + server: bin/server 2 + firehose: bin/firehose 3 + worker: bin/worker
+110
README.md
··· 1 + # Lycan ๐Ÿบ 2 + 3 + A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive. 4 + 5 + 6 + ## How it works 7 + 8 + Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose. 9 + 10 + At the moment, Lycan indexes four types of content: 11 + 12 + - posts you've liked 13 + - posts you've reposted 14 + - posts you've quoted 15 + - your old-style bookmarks (using the ๐Ÿ“Œ emoji method) 16 + 17 + New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added. 18 + 19 + Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API โ€“ the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)). 20 + 21 + The service consists of three separate components: 22 + 23 + - a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported 24 + - a **background worker**, which runs the import process 25 + - an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying 26 + 27 + 28 + ## Setting up on localhost 29 + 30 + This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it. 31 + 32 + A Postgres database is also required (again, any non-ancient version should work). 33 + 34 + Download or clone the repository, then install the dependencies: 35 + 36 + ``` 37 + bundle install 38 + ``` 39 + 40 + Next, create the database โ€“ the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task: 41 + 42 + ``` 43 + bundle exec rake db:create 44 + ``` 45 + 46 + Then, run the migrations: 47 + 48 + ``` 49 + bundle exec rake db:migrate 50 + ``` 51 + 52 + To run an import, you will need to run three separate processes, probably in separate terminal tabs: 53 + 54 + 1) the firehose client, [`bin/firehose`](bin/firehose) 55 + 2) the background worker, [`bin/worker`](bin/worker) 56 + 3) the Sinatra HTTP server, [`bin/server`](bin/server) 57 + 58 + The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu โ€“ but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL. 59 + 60 + You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.) 61 + 62 + 63 + ## Configuration 64 + 65 + There's a few things you can configure through ENV variables: 66 + 67 + - `RELAY_HOST` โ€“ hostname of the relay to use for the firehose (default: `bsky.network`) 68 + - `JETSTREAM_HOST` โ€“ alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance 69 + - `FIREHOSE_USER_AGENT` โ€“ when running in production, it's recommended that you set this to some name that identifies who is running the service 70 + - `APPVIEW_HOST` โ€“ hostname of the AppView used to download posts (default: `public.api.bsky.app`) 71 + - `SERVER_HOSTNAME` โ€“ hostname of the server on which you're running the service in production 72 + 73 + 74 + ## Rake tasks 75 + 76 + Some Rake tasks that might be useful: 77 + 78 + ``` 79 + bundle exec rake enqueue_user DID=did:plc:qweqwe 80 + ``` 81 + 82 + - request an import of the given account (to be handled by firehose + worker) 83 + 84 + ``` 85 + bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all 86 + ``` 87 + 88 + - run a complete import synchronously 89 + 90 + ``` 91 + bundle exec rake process_posts 92 + ``` 93 + 94 + - process all previously queued and unfinished or failed items 95 + 96 + 97 + ## Running in production 98 + 99 + This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup. 100 + 101 + On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server โ€“ my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose). 102 + 103 + 104 + ## Credits 105 + 106 + Copyright ยฉ 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)). 107 + 108 + The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT). 109 + 110 + Bug reports and pull requests are welcome ๐Ÿ˜Ž
+1
Rakefile
··· 1 1 require 'bundler/setup' 2 2 require 'sinatra/activerecord' 3 3 require 'sinatra/activerecord/rake' 4 + require 'dotenv/load' 4 5 5 6 Rake.add_rakelib File.join(__dir__, 'lib', 'tasks') 6 7
+44
app/at_uri.rb
··· 1 + class AT_URI 2 + class InvalidURIError 3 + end 4 + 5 + TID_PATTERN = /^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$/ 6 + 7 + attr_reader :repo, :collection, :rkey 8 + 9 + def initialize(uri) 10 + uri = uri.to_s 11 + raise InvalidURIError, "Invalid AT URI: #{uri}" if uri.include?(' ') || !uri.start_with?('at://') 12 + 13 + parts = uri.split('/') 14 + parts.each(&:freeze) 15 + 16 + raise InvalidURIError, "Invalid AT URI: #{uri}" if parts.length != 5 || parts[2..4].any?(&:empty?) 17 + 18 + @uri = uri 19 + @repo, @collection, @rkey = parts[2..4] 20 + end 21 + 22 + def to_ary 23 + [@repo, @collection, @rkey] 24 + end 25 + 26 + alias split to_ary 27 + 28 + def to_s 29 + @uri 30 + end 31 + 32 + def has_valid_tid? 33 + @rkey =~ TID_PATTERN 34 + end 35 + 36 + def is_post? 37 + @collection == 'app.bsky.feed.post' && has_valid_tid? 38 + end 39 + end 40 + 41 + def AT_URI(uri) 42 + return uri if uri.is_a?(AT_URI) 43 + AT_URI.new(uri) 44 + end
+67
app/authenticator.rb
··· 1 + require 'base58' 2 + require 'base64' 3 + require 'didkit' 4 + require 'json' 5 + require 'jwt' 6 + require 'openssl' 7 + 8 + class Authenticator 9 + class InvalidTokenError < StandardError 10 + end 11 + 12 + def initialize(hostname:) 13 + @@pkey_cache ||= {} 14 + @hostname = hostname 15 + end 16 + 17 + def decode_user_from_jwt(auth_header, endpoint) 18 + return nil unless auth_header.start_with?('Bearer ') 19 + 20 + token = auth_header.gsub(/\ABearer /, '') 21 + parts = token.split('.') 22 + raise InvalidTokenError, "Invalid JWT token" if parts.length != 3 23 + 24 + begin 25 + decoded_data = Base64.decode64(parts[1]) 26 + data = JSON.parse(decoded_data) 27 + rescue StandardError => e 28 + raise InvalidTokenError, "Invalid JWT token" 29 + end 30 + 31 + did = data['iss'] 32 + return nil if did.nil? || data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint 33 + 34 + pkey = pkey_for_user(did) 35 + 36 + decoded = JWT.decode(token, pkey, true, { algorithm: 'ES256K' }) 37 + decoded[0] && decoded[0]['iss'] 38 + end 39 + 40 + def pkey_for_user(did) 41 + # I have no idea what this does, but it seems to be working ยฏ\_(ใƒ„)_/ยฏ 42 + 43 + if pkey = @@pkey_cache[did] 44 + return pkey 45 + end 46 + 47 + doc = DID.new(did).document.json 48 + key_obj = (doc['verificationMethod'] || []).detect { |x| x['type'] == 'Multikey' } 49 + key_multi = key_obj&.dig('publicKeyMultibase') 50 + return nil unless key_multi 51 + 52 + key_decoded = Base58.base58_to_binary(key_multi[1..], :bitcoin) 53 + comp_key = key_decoded[2..-1] 54 + 55 + alg_id = OpenSSL::ASN1::Sequence([ 56 + OpenSSL::ASN1::ObjectId('id-ecPublicKey'), 57 + OpenSSL::ASN1::ObjectId('secp256k1') 58 + ]) 59 + 60 + der = OpenSSL::ASN1::Sequence([alg_id, OpenSSL::ASN1::BitString(comp_key)]).to_der 61 + pkey = OpenSSL::PKey.read(der) 62 + 63 + @@pkey_cache[did] = pkey 64 + 65 + pkey 66 + end 67 + end
+2
app/errors.rb
··· 1 + class InvalidRecordError < StandardError 2 + end
+230
app/firehose_client.rb
··· 1 + require 'skyfall' 2 + 3 + require_relative 'init' 4 + require_relative 'models/import_job' 5 + require_relative 'models/post' 6 + require_relative 'models/subscription' 7 + require_relative 'models/user' 8 + 9 + class FirehoseClient 10 + attr_accessor :start_cursor, :service 11 + 12 + DEFAULT_RELAY = 'bsky.network' 13 + 14 + def initialize 15 + @env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym 16 + 17 + if ENV['RELAY_HOST'] 18 + @service = ENV['RELAY_HOST'] 19 + elsif ENV['JETSTREAM_HOST'] 20 + @service = ENV['JETSTREAM_HOST'] 21 + @jetstream = true 22 + else 23 + @service = DEFAULT_RELAY 24 + end 25 + end 26 + 27 + def start 28 + return if @sky 29 + 30 + @active_users = load_users 31 + 32 + log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})" 33 + 34 + last_cursor = load_or_init_cursor 35 + cursor = @start_cursor || last_cursor 36 + 37 + @sky = if @jetstream 38 + Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] }) 39 + else 40 + Skyfall::Firehose.new(@service, :subscribe_repos, cursor) 41 + end 42 + 43 + @sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string 44 + @sky.check_heartbeat = true 45 + 46 + @sky.on_message do |m| 47 + start_time = Time.now 48 + diff = start_time - @last_update 49 + 50 + if diff > 30 51 + log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}" 52 + end 53 + 54 + @last_update = start_time 55 + process_message(m) 56 + end 57 + 58 + @sky.on_connecting { |u| log "Connecting to #{u}..." } 59 + @sky.on_connect { 60 + log "Connected โœ“" 61 + 62 + @replaying = true 63 + @last_update = Time.now 64 + 65 + @live_check_timer ||= EM::PeriodicTimer.new(20) do 66 + now = Time.now 67 + diff = now - @last_update 68 + 69 + if diff > 30 70 + log "Timer: last update #{sprintf('%.1f', diff)}s ago" 71 + end 72 + end 73 + 74 + @jobs_timer ||= EM::PeriodicTimer.new(3) do 75 + ImportJob.all.each do |job| 76 + @active_users[job.user.did] = job.user 77 + job.create_imports 78 + job.destroy 79 + end 80 + end 81 + } 82 + 83 + @sky.on_disconnect { 84 + log "Disconnected." 85 + } 86 + 87 + @sky.on_reconnect { 88 + log "Connection lost, reconnecting..." 89 + 90 + @timer&.cancel 91 + @timer = nil 92 + } 93 + 94 + @sky.on_timeout { 95 + log "Trying to reconnect..." 96 + } 97 + 98 + @sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" } 99 + 100 + @sky.connect 101 + end 102 + 103 + def stop 104 + save_cursor(@sky.cursor) unless @sky.nil? 105 + 106 + @sky&.disconnect 107 + @sky = nil 108 + end 109 + 110 + def load_or_init_cursor 111 + if sub = Subscription.find_by(service: @service) 112 + sub.cursor 113 + else 114 + Subscription.create!(service: @service, cursor: 0) 115 + nil 116 + end 117 + end 118 + 119 + def save_cursor(cursor) 120 + Subscription.where(service: @service).update_all(cursor: cursor) 121 + end 122 + 123 + def load_users 124 + User.active.map { |u| [u.did, u] }.then { |list| Hash[list] } 125 + end 126 + 127 + def process_message(msg) 128 + save_cursor(msg.seq) if msg.seq % 1000 == 0 129 + 130 + case msg.type 131 + when :info 132 + log "InfoMessage: #{msg}" 133 + when :account 134 + process_account_event(msg) 135 + when :commit 136 + if @replaying 137 + log "Replaying events since #{msg.time.getlocal} -->" 138 + @replaying = false 139 + end 140 + 141 + @current_user = @active_users[msg.repo] 142 + 143 + msg.operations.each do |op| 144 + case op.type 145 + when :bsky_like 146 + process_like(msg, op) 147 + when :bsky_repost 148 + process_repost(msg, op) 149 + when :bsky_post 150 + process_post(msg, op) 151 + end 152 + end 153 + end 154 + rescue CBOR::UnpackError 155 + # ignore invalid records 156 + end 157 + 158 + def process_account_event(msg) 159 + if msg.status == :deleted 160 + if user = User.find_by(did: msg.repo) 161 + user.destroy 162 + @active_users.delete_if { |k, u| u.id == user.id } 163 + end 164 + end 165 + end 166 + 167 + def process_like(msg, op) 168 + return unless @current_user 169 + 170 + if op.action == :create 171 + return if op.raw_record.nil? 172 + @current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose) 173 + elsif op.action == :delete 174 + @current_user.likes.where(rkey: op.rkey).delete_all 175 + end 176 + rescue StandardError => e 177 + log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}" 178 + log e.backtrace.reject { |x| x.include?('/ruby/') } 179 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 180 + end 181 + 182 + def process_repost(msg, op) 183 + return unless @current_user 184 + 185 + if op.action == :create 186 + return if op.raw_record.nil? 187 + @current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose) 188 + elsif op.action == :delete 189 + @current_user.reposts.where(rkey: op.rkey).delete_all 190 + end 191 + rescue StandardError => e 192 + log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}" 193 + log e.backtrace.reject { |x| x.include?('/ruby/') } 194 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 195 + end 196 + 197 + def process_post(msg, op) 198 + if op.action == :create 199 + return if op.raw_record.nil? 200 + 201 + if @current_user 202 + @current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose) 203 + @current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose) 204 + end 205 + elsif op.action == :delete 206 + if @current_user 207 + @current_user.quotes.where(rkey: op.rkey).delete_all 208 + @current_user.pins.where(rkey: op.rkey).delete_all 209 + end 210 + 211 + if post = Post.find_by_at_uri(op.uri) 212 + post.destroy 213 + end 214 + end 215 + rescue StandardError => e 216 + log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}" 217 + log e.backtrace.reject { |x| x.include?('/ruby/') } 218 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 219 + end 220 + 221 + def log(text) 222 + puts "[#{Time.now}] #{text}" 223 + end 224 + 225 + def inspect 226 + vars = instance_variables - [:@jobs_timer, :@live_check_timer] 227 + values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ") 228 + "#<#{self.class}:0x#{object_id} #{values}>" 229 + end 230 + end
+73
app/import_manager.rb
··· 1 + require_relative 'importers/likes_importer' 2 + require_relative 'importers/posts_importer' 3 + require_relative 'importers/reposts_importer' 4 + require_relative 'item_queue' 5 + require_relative 'post_downloader' 6 + 7 + class ImportManager 8 + attr_accessor :report, :time_limit, :logger, :log_status_updates 9 + 10 + def initialize(user) 11 + @user = user 12 + end 13 + 14 + def start(sets) 15 + importers = [] 16 + sets = [sets] unless sets.is_a?(Array) 17 + 18 + sets.each do |set| 19 + case set 20 + when 'likes' 21 + importers << LikesImporter.new(@user) 22 + when 'reposts' 23 + importers << RepostsImporter.new(@user) 24 + when 'posts' 25 + importers << PostsImporter.new(@user) 26 + when 'all' 27 + importers += [ 28 + LikesImporter.new(@user), 29 + RepostsImporter.new(@user), 30 + PostsImporter.new(@user) 31 + ] 32 + else 33 + raise "Invalid collection: #{set}" 34 + end 35 + end 36 + 37 + queued_items = @user.all_items_in_queue(:import).sort_by(&:time).reverse 38 + queue = ItemQueue.new(queued_items) 39 + 40 + downloader = PostDownloader.new 41 + downloader.report = @report 42 + downloader.logger = @logger 43 + 44 + download_thread = Thread.new do 45 + @logger&.info "Starting downloader thread for #{@user}" if @log_status_updates 46 + 47 + downloader.import_from_queue(queue) 48 + 49 + @logger&.info "Ended downloader thread for #{@user}" if @log_status_updates 50 + end 51 + 52 + import_threads = importers.map do |import| 53 + import.item_queue = queue 54 + import.report = @report 55 + import.logger = @logger 56 + 57 + Thread.new do 58 + @logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates 59 + 60 + import.run_import(@time_limit) 61 + 62 + @logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates 63 + end 64 + end 65 + 66 + import_threads.each { |i| i.join } 67 + 68 + @logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates 69 + 70 + downloader.stop_when_empty = true 71 + download_thread.join 72 + end 73 + end
+94
app/import_worker.rb
··· 1 + require 'active_record' 2 + require 'minisky' 3 + require 'time' 4 + 5 + require_relative 'init' 6 + require_relative 'import_manager' 7 + require_relative 'models/import' 8 + require_relative 'post_downloader' 9 + require_relative 'reports/basic_report' 10 + 11 + class ImportWorker 12 + attr_accessor :verbose, :logger 13 + 14 + class UserThread < Thread 15 + def initialize(user, collections, logger, verbose = false) 16 + @user = user 17 + @verbose = verbose 18 + @logger = logger 19 + 20 + super { run(collections) } 21 + end 22 + 23 + def user_id 24 + @user.id 25 + end 26 + 27 + def run(collections) 28 + @logger&.info "Starting import thread for #{@user}" 29 + 30 + if @user.registered_at.nil? 31 + registration_time = get_registration_time(@user) 32 + @user.update!(registered_at: registration_time) 33 + end 34 + 35 + import = ImportManager.new(@user) 36 + 37 + if @logger 38 + import.report = BasicReport.new(@logger) if @verbose 39 + import.logger = @logger 40 + import.log_status_updates = true 41 + end 42 + 43 + import.start(collections) 44 + 45 + @logger&.info "Ended import thread for #{@user}" 46 + end 47 + 48 + def get_registration_time(user) 49 + sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 50 + profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did }) 51 + 52 + Time.parse(profile['createdAt']) 53 + end 54 + end 55 + 56 + def run 57 + @user_threads = [] 58 + 59 + @firehose_thread = Thread.new { process_firehose_items } 60 + @downloader = PostDownloader.new 61 + @downloader.logger = @logger 62 + 63 + loop do 64 + @user_threads.delete_if { |t| !t.alive? } 65 + 66 + users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a 67 + 68 + users.each do |user| 69 + collections = user.imports.unfinished.map(&:collection) 70 + thread = UserThread.new(user, collections, @logger, @verbose) 71 + @user_threads << thread 72 + end 73 + 74 + # possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify 75 + sleep 5 76 + end 77 + end 78 + 79 + def process_firehose_items 80 + loop do 81 + items = [Like, Repost, Pin, Quote] 82 + .map { |type| type.in_queue(:firehose).order('time').limit(25) } 83 + .flatten 84 + .sort_by(&:time) 85 + .first(25) 86 + 87 + if items.length > 0 88 + @downloader.process_items(items) 89 + else 90 + sleep 5 91 + end 92 + end 93 + end 94 + end
-100
app/importer.rb
··· 1 - require 'didkit' 2 - require 'minisky' 3 - 4 - require_relative 'models/import' 5 - require_relative 'models/like' 6 - require_relative 'models/post' 7 - require_relative 'models/user' 8 - 9 - class Importer 10 - attr_accessor :like_queue, :report 11 - 12 - def initialize(user_did) 13 - @did = DID.new(user_did) 14 - @user = User.find_or_create_by!(did: user_did) 15 - 16 - @uid_cache = { user_did => @user.id } 17 - end 18 - 19 - def run_import(time_limit = nil) 20 - @minisky = Minisky.new(@did.document.pds_host, nil) 21 - 22 - import_likes(time_limit) 23 - end 24 - 25 - def import_likes(time_limit = nil) 26 - import = @user.import || @user.create_import! 27 - 28 - params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 } 29 - 30 - if import.cursor 31 - params[:cursor] = import.cursor 32 - else 33 - import.update!(started_from: Time.now) 34 - end 35 - 36 - count = 0 37 - time_limit ||= import.last_completed 38 - 39 - puts "Fetching until: #{time_limit}" if time_limit 40 - 41 - loop do 42 - response = @minisky.get_request('com.atproto.repo.listRecords', params) 43 - 44 - records = response['records'] 45 - cursor = response['cursor'] 46 - 47 - count += records.length 48 - @report&.update(importer: { imported_likes: count }) 49 - @report&.update(importer: { oldest_date: Time.parse(records.last['value']['createdAt']) }) unless records.empty? 50 - 51 - process_likes(records) 52 - params[:cursor] = cursor 53 - 54 - import.update!(cursor: cursor) 55 - 56 - break if !cursor 57 - break if time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < time_limit } 58 - end 59 - 60 - import.update!(cursor: nil, started_from: nil, last_completed: import.started_from) 61 - @report&.update(importer: { finished: true }) 62 - end 63 - 64 - def process_likes(likes) 65 - likes.each do |record| 66 - begin 67 - like_rkey = record['uri'].split('/').last 68 - next if @user.likes.where(rkey: like_rkey).exists? 69 - 70 - like_time = Time.parse(record['value']['createdAt']) 71 - 72 - post_uri = record['value']['subject']['uri'] 73 - parts = post_uri.split('/') 74 - next if parts[3] != 'app.bsky.feed.post' 75 - 76 - post_did, _, post_rkey = parts[2..4] 77 - 78 - if @uid_cache[post_did].nil? 79 - post_author = User.find_or_create_by!(did: post_did) 80 - @uid_cache[post_did] = post_author.id 81 - end 82 - 83 - post = Post.find_by(user_id: @uid_cache[post_did], rkey: post_rkey) 84 - 85 - if post 86 - @user.likes.create!(rkey: like_rkey, time: like_time, post: post) 87 - else 88 - like_stub = @user.likes.create!(rkey: like_rkey, time: like_time, post_uri: post_uri) 89 - 90 - if @like_queue 91 - @like_queue.push(like_stub) 92 - @report&.update(queue: { length: @like_queue.length }) 93 - end 94 - end 95 - rescue StandardError => e 96 - puts "Error in Importer#process_likes: #{record['uri']}: #{e}" 97 - end 98 - end 99 - end 100 - end
+54
app/importers/base_importer.rb
··· 1 + require 'didkit' 2 + require 'minisky' 3 + require 'time' 4 + 5 + require_relative '../at_uri' 6 + require_relative '../errors' 7 + require_relative '../models/post' 8 + require_relative '../models/user' 9 + 10 + class BaseImporter 11 + attr_accessor :item_queue, :report, :logger 12 + 13 + def initialize(user) 14 + @did = DID.new(user.did) 15 + @user = user 16 + @imported_count = 0 17 + end 18 + 19 + def importer_name 20 + self.class.name 21 + end 22 + 23 + def collection 24 + importer_name.gsub(/Importer$/, '').downcase 25 + end 26 + 27 + def run_import(requested_time_limit = nil) 28 + @minisky = Minisky.new(@did.document.pds_host, nil) 29 + @import = @user.imports.find_by(collection: collection) || @user.imports.create!(collection: collection) 30 + 31 + if @import.cursor.nil? 32 + @import.update!(started_from: Time.now) unless requested_time_limit 33 + end 34 + 35 + @time_limit = requested_time_limit || @import.last_completed 36 + @logger&.info "Fetching until: #{@time_limit}" if @time_limit 37 + 38 + import_items 39 + 40 + @import.update!(last_completed: @import.started_from) unless requested_time_limit 41 + @import.update!(cursor: nil, started_from: nil, fetched_until: nil) 42 + @report&.update(importers: { importer_name => { :finished => true }}) 43 + end 44 + 45 + def import_items 46 + raise NotImplementedError 47 + end 48 + 49 + def created_at(record) 50 + Time.parse(record['createdAt']) 51 + rescue StandardError 52 + raise InvalidRecordError 53 + end 54 + end
+42
app/importers/likes_importer.rb
··· 1 + require_relative 'base_importer' 2 + 3 + class LikesImporter < BaseImporter 4 + def import_items 5 + params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 } 6 + params[:cursor] = @import.cursor if @import.cursor 7 + 8 + loop do 9 + response = @minisky.get_request('com.atproto.repo.listRecords', params) 10 + 11 + records = response['records'] 12 + cursor = response['cursor'] 13 + oldest_date = nil 14 + 15 + records.each do |record| 16 + begin 17 + like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = like&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 + 22 + if like && like.pending? && @item_queue 23 + @item_queue.push(like) 24 + @report&.update(queue: { length: @item_queue.length }) 25 + end 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}" 28 + end 29 + end 30 + 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 + params[:cursor] = cursor 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 37 + 38 + break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 + end 41 + end 42 + end
+50
app/importers/posts_importer.rb
··· 1 + require_relative 'base_importer' 2 + 3 + class PostsImporter < BaseImporter 4 + def import_items 5 + params = { repo: @did, collection: 'app.bsky.feed.post', limit: 100 } 6 + params[:cursor] = @import.cursor if @import.cursor 7 + 8 + loop do 9 + response = @minisky.get_request('com.atproto.repo.listRecords', params) 10 + 11 + records = response['records'] 12 + cursor = response['cursor'] 13 + oldest_date = nil 14 + 15 + records.each do |record| 16 + begin 17 + quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 18 + pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 19 + 20 + record_date = quote&.time || pin&.time || created_at(record['value']) 21 + oldest_date = [oldest_date, record_date].compact.min 22 + 23 + if @item_queue 24 + if quote && quote.pending? 25 + @item_queue.push(quote) 26 + end 27 + 28 + if pin && pin.pending? 29 + @item_queue.push(pin) 30 + end 31 + 32 + @report&.update(queue: { length: @item_queue.length }) 33 + end 34 + rescue InvalidRecordError => e 35 + @logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}" 36 + end 37 + end 38 + 39 + @imported_count += records.length 40 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 41 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 42 + 43 + params[:cursor] = cursor 44 + @import.update!(cursor: cursor, fetched_until: oldest_date) 45 + 46 + break if !cursor 47 + break if @time_limit && oldest_date && oldest_date < @time_limit 48 + end 49 + end 50 + end
+42
app/importers/reposts_importer.rb
··· 1 + require_relative 'base_importer' 2 + 3 + class RepostsImporter < BaseImporter 4 + def import_items 5 + params = { repo: @did, collection: 'app.bsky.feed.repost', limit: 100 } 6 + params[:cursor] = @import.cursor if @import.cursor 7 + 8 + loop do 9 + response = @minisky.get_request('com.atproto.repo.listRecords', params) 10 + 11 + records = response['records'] 12 + cursor = response['cursor'] 13 + oldest_date = nil 14 + 15 + records.each do |record| 16 + begin 17 + repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = repost&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 + 22 + if repost && repost.pending? && @item_queue 23 + @item_queue.push(repost) 24 + @report&.update(queue: { length: @item_queue.length }) 25 + end 26 + rescue InvalidRecordError => e 27 + @logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}" 28 + end 29 + end 30 + 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 + params[:cursor] = cursor 36 + @import.update!(cursor: cursor, fetched_until: oldest_date) 37 + 38 + break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 + end 41 + end 42 + end
+1
app/init.rb
··· 1 1 require 'sinatra/activerecord' 2 + require 'dotenv/load' 2 3 3 4 ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.datetime_type = :timestamptz 4 5 RubyVM::YJIT.enable
+28
app/item_queue.rb
··· 1 + class ItemQueue 2 + BATCH_SIZE = 25 3 + 4 + def initialize(items = []) 5 + @mutex = Mutex.new 6 + @queue = items 7 + end 8 + 9 + def push(item) 10 + @mutex.synchronize { 11 + @queue << item 12 + } 13 + end 14 + 15 + def pop_batch 16 + @mutex.synchronize { 17 + batch = @queue[0...BATCH_SIZE] 18 + @queue = @queue[BATCH_SIZE..-1] || [] 19 + batch 20 + } 21 + end 22 + 23 + def length 24 + @mutex.synchronize { 25 + @queue.length 26 + } 27 + end 28 + end
-28
app/like_queue.rb
··· 1 - class LikeQueue 2 - BATCH_SIZE = 25 3 - 4 - def initialize(likes = []) 5 - @mutex = Mutex.new 6 - @queue = likes 7 - end 8 - 9 - def push(like) 10 - @mutex.synchronize { 11 - @queue << like 12 - } 13 - end 14 - 15 - def pop_batch 16 - @mutex.synchronize { 17 - batch = @queue[0...BATCH_SIZE] 18 - @queue = @queue[BATCH_SIZE..-1] || [] 19 - batch 20 - } 21 - end 22 - 23 - def length 24 - @mutex.synchronize { 25 - @queue.length 26 - } 27 - end 28 - end
+31
app/models/import.rb
··· 4 4 5 5 class Import < ActiveRecord::Base 6 6 belongs_to :user 7 + 8 + validates_inclusion_of :collection, in: %w(likes reposts posts) 9 + validates_uniqueness_of :collection, scope: :user_id 10 + 11 + scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') } 12 + 13 + IMPORT_END = Time.at(0) 14 + 15 + def imported_until 16 + return nil if cursor.nil? && last_completed.nil? 17 + 18 + groups = case collection 19 + when 'likes' 20 + [:likes] 21 + when 'reposts' 22 + [:reposts] 23 + when 'posts' 24 + [:pins, :quotes] 25 + end 26 + 27 + newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last } 28 + newest_queued = newest_queued_items.compact.sort_by(&:time).last 29 + 30 + if newest_queued 31 + newest_queued.time 32 + elsif fetched_until 33 + fetched_until 34 + else 35 + IMPORT_END 36 + end 37 + end 7 38 end
+13
app/models/import_job.rb
··· 1 + require 'active_record' 2 + 3 + require_relative 'user' 4 + 5 + class ImportJob < ActiveRecord::Base 6 + belongs_to :user 7 + 8 + def create_imports 9 + %w(likes reposts posts).each do |group| 10 + self.user.imports.find_or_create_by!(collection: group) 11 + end 12 + end 13 + end
+35
app/models/importable.rb
··· 1 + require 'active_support/concern' 2 + 3 + require_relative '../at_uri' 4 + require_relative 'post' 5 + 6 + module Importable 7 + extend ActiveSupport::Concern 8 + 9 + included do 10 + scope :pending, -> { where(post: nil) } 11 + scope :in_queue, ->(q) { where(queue: q) } 12 + 13 + enum :queue, { firehose: 0, import: 1 } 14 + 15 + validates_presence_of :post_uri, if: -> { post_id.nil? } 16 + validate :check_queue 17 + 18 + def pending? 19 + post_uri != nil 20 + end 21 + 22 + def check_queue 23 + errors.add(:queue, 'must be nil if already processed') if queue && post 24 + end 25 + 26 + def import_item!(args = {}) 27 + post_uri = AT_URI(self.post_uri) 28 + return nil if !post_uri.is_post? 29 + 30 + self.assign_attributes(args) 31 + self.save! 32 + self 33 + end 34 + end 35 + end
+16 -3
app/models/like.rb
··· 1 1 require 'active_record' 2 + require 'time' 2 3 4 + require_relative '../at_uri' 3 5 require_relative 'post' 6 + require_relative 'importable' 7 + require_relative 'searchable' 4 8 require_relative 'user' 5 9 6 10 class Like < ActiveRecord::Base 11 + include Searchable 12 + include Importable 13 + 7 14 validates_presence_of :time, :rkey 8 - validates_length_of :rkey, maximum: 13 9 - 10 - validates_presence_of :post_uri, if: -> { post_id.nil? } 15 + validates_length_of :rkey, is: 13 11 16 12 17 belongs_to :user, foreign_key: 'actor_id' 13 18 belongs_to :post, optional: true 19 + 20 + def self.new_from_record(uri, record) 21 + self.new( 22 + rkey: AT_URI(uri).rkey, 23 + time: Time.parse(record['createdAt']), 24 + post_uri: record['subject']['uri'] 25 + ) 26 + end 14 27 end
+32
app/models/pin.rb
··· 1 + require 'active_record' 2 + require 'time' 3 + 4 + require_relative '../at_uri' 5 + require_relative 'post' 6 + require_relative 'searchable' 7 + require_relative 'user' 8 + 9 + class Pin < ActiveRecord::Base 10 + include Searchable 11 + include Importable 12 + 13 + PIN_SIGN = '๐Ÿ“Œ' 14 + 15 + validates_presence_of :time, :rkey 16 + validates_length_of :rkey, is: 13 17 + validates :pin_text, length: { minimum: 0, maximum: 1000, allow_nil: false } 18 + 19 + belongs_to :user, foreign_key: 'actor_id' 20 + belongs_to :post, optional: true 21 + 22 + def self.new_from_record(uri, record) 23 + return nil unless record['reply'] && record['text'].include?(PIN_SIGN) 24 + 25 + self.new( 26 + rkey: AT_URI(uri).rkey, 27 + time: Time.parse(record['createdAt']), 28 + post_uri: record['reply']['parent']['uri'], 29 + pin_text: record['text'] 30 + ) 31 + end 32 + end
+20 -1
app/models/post.rb
··· 6 6 validates_presence_of :time, :data, :rkey 7 7 validates :text, length: { minimum: 0, allow_nil: false } 8 8 9 - validates_length_of :rkey, maximum: 13 9 + validates_length_of :rkey, is: 13 10 10 validates_length_of :text, maximum: 1000 11 11 validates_length_of :data, maximum: 10000 12 12 13 + validate :check_null_bytes 14 + 13 15 belongs_to :user 14 16 17 + has_many :likes, dependent: :delete_all 18 + has_many :reposts, dependent: :delete_all 19 + has_many :pins, dependent: :delete_all 20 + has_many :quotes, dependent: :delete_all 21 + 22 + def self.find_by_at_uri(uri) 23 + uri = AT_URI(uri) unless uri.is_a?(AT_URI) 24 + return nil unless uri.is_post? 25 + 26 + Post.joins(:user).find_by(user: { did: uri.repo }, rkey: uri.rkey) 27 + end 28 + 15 29 def at_uri 16 30 "at://#{user.did}/app.bsky.feed.post/#{rkey}" 31 + end 32 + 33 + def check_null_bytes 34 + # Postgres doesn't allow null bytes in strings 35 + errors.add(:text, 'must not contain a null byte') if text.include?("\u0000") 17 36 end 18 37 end
+39
app/models/quote.rb
··· 1 + require 'active_record' 2 + require 'time' 3 + 4 + require_relative '../at_uri' 5 + require_relative 'post' 6 + require_relative 'searchable' 7 + require_relative 'user' 8 + 9 + class Quote < ActiveRecord::Base 10 + include Searchable 11 + include Importable 12 + 13 + validates_presence_of :time, :rkey 14 + validates_length_of :rkey, is: 13 15 + validates :quote_text, length: { minimum: 0, maximum: 1000, allow_nil: false } 16 + 17 + belongs_to :user, foreign_key: 'actor_id' 18 + belongs_to :post, optional: true 19 + 20 + def self.new_from_record(uri, record) 21 + return nil unless record['embed'] 22 + 23 + quoted_post_uri = case record['embed']['$type'] 24 + when 'app.bsky.embed.record' 25 + record['embed']['record']['uri'] 26 + when 'app.bsky.embed.recordWithMedia' 27 + record['embed']['record']['record']['uri'] 28 + else 29 + return nil 30 + end 31 + 32 + self.new( 33 + rkey: AT_URI(uri).rkey, 34 + time: Time.parse(record['createdAt']), 35 + post_uri: quoted_post_uri, 36 + quote_text: record['text'] 37 + ) 38 + end 39 + end
+26
app/models/repost.rb
··· 1 + require 'active_record' 2 + require 'time' 3 + 4 + require_relative '../at_uri' 5 + require_relative 'post' 6 + require_relative 'searchable' 7 + require_relative 'user' 8 + 9 + class Repost < ActiveRecord::Base 10 + include Searchable 11 + include Importable 12 + 13 + validates_presence_of :time, :rkey 14 + validates_length_of :rkey, is: 13 15 + 16 + belongs_to :user, foreign_key: 'actor_id' 17 + belongs_to :post, optional: true 18 + 19 + def self.new_from_record(uri, record) 20 + self.new( 21 + rkey: AT_URI(uri).rkey, 22 + time: Time.parse(record['createdAt']), 23 + post_uri: record['subject']['uri'] 24 + ) 25 + end 26 + end
+48
app/models/searchable.rb
··· 1 + require 'active_support/concern' 2 + 3 + module Searchable 4 + extend ActiveSupport::Concern 5 + 6 + included do 7 + scope :reverse_chronologically, -> { order(time: :desc, id: :desc) } 8 + 9 + scope :after_cursor, ->(cursor) { 10 + return self if cursor.nil? 11 + 12 + t = arel_table 13 + 14 + timestamp, id = cursor.split(':') 15 + time = Time.at(timestamp.to_f) 16 + 17 + where( 18 + t[:time].lt(time) 19 + .or( 20 + t[:time].eq(time) 21 + .and(t[:id].lt(id)) 22 + ) 23 + ) 24 + } 25 + 26 + scope :matching_terms, ->(terms) { 27 + return self if terms.empty? 28 + 29 + where( 30 + (["(posts.text ~* ?)"] * (terms.length)).join(" AND "), 31 + *(terms.map { |x| "\\y" + x.gsub(/\s+/, "\\s+") + "\\y" }) 32 + ) 33 + } 34 + 35 + scope :excluding_terms, ->(terms) { 36 + return self if terms.empty? 37 + 38 + where( 39 + (["(posts.text !~* ?)"] * (terms.length)).join(" AND "), 40 + *(terms.map { |x| "\\y" + x.gsub(/\s+/, "\\s+") + "\\y" }) 41 + ) 42 + } 43 + 44 + def cursor 45 + "#{self.time.to_f}:#{self.id}" 46 + end 47 + end 48 + end
+5
app/models/subscription.rb
··· 1 + require 'active_record' 2 + 3 + class Subscription < ActiveRecord::Base 4 + validates_presence_of :service, :cursor 5 + end
+71 -2
app/models/user.rb
··· 1 1 require 'active_record' 2 2 3 3 require_relative 'import' 4 + require_relative 'import_job' 4 5 require_relative 'like' 6 + require_relative 'quote' 7 + require_relative 'pin' 5 8 require_relative 'post' 9 + require_relative 'repost' 10 + require_relative 'user_importable' 6 11 7 12 class User < ActiveRecord::Base 8 13 validates_presence_of :did 9 14 validates_length_of :did, maximum: 260 15 + validates_format_of :did, with: /\Adid:(plc:[0-9a-z]{24}|web:[0-9a-z\-]+(\.[0-9a-z\-]+)+)\Z/ 10 16 11 17 has_many :posts 12 - has_many :likes, foreign_key: 'actor_id' 13 - has_one :import 18 + has_many :imports, dependent: :delete_all 19 + has_one :import_job, dependent: :delete 20 + 21 + before_destroy :delete_posts_cascading 22 + 23 + has_many :likes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 24 + has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 25 + has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 26 + has_many :pins, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable 27 + 28 + def self.active 29 + self.joins(:imports).distinct 30 + end 31 + 32 + def self.with_unfinished_import 33 + self.where(id: Import.unfinished.select('user_id').distinct) 34 + .or(self.where(id: Like.in_queue(:import).select('actor_id').distinct)) 35 + .or(self.where(id: Repost.in_queue(:import).select('actor_id').distinct)) 36 + .or(self.where(id: Quote.in_queue(:import).select('actor_id').distinct)) 37 + .or(self.where(id: Pin.in_queue(:import).select('actor_id').distinct)) 38 + end 39 + 40 + def active? 41 + imports.exists? 42 + end 43 + 44 + def all_pending_items 45 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+) 46 + end 47 + 48 + def all_items_in_queue(queue) 49 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).in_queue(queue).to_a }.reduce(&:+) 50 + end 51 + 52 + def imported_until 53 + import_positions = self.imports.map(&:imported_until) 54 + 55 + if import_positions.empty? || import_positions.any? { |x| x.nil? } 56 + nil 57 + else 58 + import_positions.sort.last 59 + end 60 + end 61 + 62 + def erase_imports! 63 + [:likes, :reposts, :quotes, :pins].map { |x| self.send(x).delete_all } 64 + 65 + self.import_job&.destroy 66 + self.imports.delete_all 67 + end 68 + 69 + def delete_posts_cascading 70 + posts_subquery = self.posts.select(:id) 71 + 72 + Like.where(post_id: posts_subquery).delete_all 73 + Repost.where(post_id: posts_subquery).delete_all 74 + Pin.where(post_id: posts_subquery).delete_all 75 + Quote.where(post_id: posts_subquery).delete_all 76 + 77 + Post.where(user: self).delete_all 78 + end 79 + 80 + def to_s 81 + %(<User id: #{id}, did: "#{did}">) 82 + end 14 83 end
+20
app/models/user_importable.rb
··· 1 + require_relative '../errors' 2 + 3 + module UserImportable 4 + def import_from_record(item_uri, record, **args) 5 + item = try_build_from_record(item_uri, record) 6 + return nil if item.nil? || already_imported?(item) 7 + 8 + item.import_item!(args) 9 + end 10 + 11 + def try_build_from_record(item_uri, record) 12 + self.new_from_record(item_uri, record) 13 + rescue StandardError 14 + raise InvalidRecordError 15 + end 16 + 17 + def already_imported?(item) 18 + self.where(rkey: item.rkey).exists? 19 + end 20 + end
+133 -25
app/post_downloader.rb
··· 1 1 require 'didkit' 2 2 require 'minisky' 3 3 4 + require_relative 'at_uri' 4 5 require_relative 'models/post' 5 6 require_relative 'models/user' 6 7 7 8 class PostDownloader 8 - attr_accessor :report, :stop_when_empty 9 + attr_accessor :report, :logger, :stop_when_empty 9 10 10 11 def initialize 11 - @sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil) 12 + @sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil) 13 + 14 + @total_count = 0 15 + @oldest_imported = Time.now 16 + @account_status_cache = {} 12 17 end 13 18 14 19 def import_from_queue(queue) 15 - count = 0 16 - oldest = Time.now 17 - 18 20 loop do 19 - likes = queue.pop_batch 21 + items = queue.pop_batch 20 22 21 - if likes.empty? 23 + if items.empty? 22 24 if @stop_when_empty 23 25 return 24 26 else ··· 29 31 30 32 @report&.update(queue: { length: queue.length }) 31 33 32 - begin 33 - response = @sky.get_request('app.bsky.feed.getPosts', { uris: likes.map(&:post_uri) }) 34 + process_items(items) 35 + end 36 + end 34 37 35 - response['posts'].each do |data| 36 - begin 37 - like = likes.detect { |x| x.post_uri == data['uri'] } 38 - likes.delete(like) 38 + def process_items(items) 39 + existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a 40 + 41 + items.dup.each do |item| 42 + if post = existing_posts.detect { |post| post.at_uri == item.post_uri } 43 + update_item(item, post) 44 + items.delete(item) 45 + end 46 + end 47 + 48 + return if items.empty? 49 + 50 + begin 51 + response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq }) 52 + 53 + response['posts'].each do |data| 54 + current_items = items.select { |x| x.post_uri == data['uri'] } 55 + items -= current_items 39 56 40 - post = save_post(data['uri'], data['record']) 41 - like.update!(post: post, post_uri: nil) 42 - count += 1 43 - oldest = [oldest, like.time].min 57 + begin 58 + post = save_post(data['uri'], data['record']) 44 59 45 - @report&.update(downloader: { downloaded_posts: count, oldest_date: oldest }) 46 - rescue StandardError => e 47 - puts "Error in PostDownloader: #{like.post_uri}: #{e}" 60 + if post.valid? 61 + current_items.each { |i| update_item(i, post) } 62 + else 63 + @logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}" 64 + current_items.each { |i| invalidate_item(i) } 48 65 end 66 + rescue InvalidRecordError => e 67 + @logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}" 68 + current_items.each { |i| i.update!(queue: nil) } 49 69 end 50 - rescue StandardError => e 51 - puts "Error in PostDownloader: #{e}" 52 70 end 71 + 72 + check_missing_items(items) 73 + rescue StandardError => e 74 + @logger&.warn "Error in PostDownloader: #{e.class}: #{e}" 53 75 end 54 76 end 55 77 56 78 def save_post(post_uri, record) 57 - did, _, rkey = post_uri.split('/')[2..4] 79 + did, _, rkey = AT_URI(post_uri) 80 + 81 + begin 82 + author = User.find_or_create_by!(did: did) 83 + rescue ActiveRecord::RecordInvalid => e 84 + raise InvalidRecordError 85 + end 58 86 87 + if post = Post.find_by(user: author, rkey: rkey) 88 + return post 89 + else 90 + post = build_post(author, rkey, record) 91 + post.save 92 + post 93 + end 94 + end 95 + 96 + def build_post(author, rkey, record) 59 97 text = record.delete('text') 60 98 created = record.delete('createdAt') 61 99 62 - author = User.find_or_create_by!(did: did) 100 + record.delete('$type') 63 101 64 - Post.create!( 102 + Post.new( 65 103 user: author, 66 104 rkey: rkey, 67 105 time: Time.parse(created), 68 106 text: text, 69 107 data: JSON.generate(record) 70 108 ) 109 + rescue StandardError 110 + raise InvalidRecordError 111 + end 112 + 113 + def update_item(item, post) 114 + item.update!(post: post, post_uri: nil, queue: nil) 115 + 116 + @total_count += 1 117 + @oldest_imported = [@oldest_imported, item.time].min 118 + 119 + @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 120 + end 121 + 122 + def invalidate_item(item) 123 + @total_count += 1 124 + @oldest_imported = [@oldest_imported, item.time].min 125 + 126 + @report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported }) 127 + 128 + item.destroy 129 + end 130 + 131 + def check_missing_items(items) 132 + return if items.empty? 133 + 134 + dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq 135 + response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids }) 136 + active_dids = response['profiles'].map { |x| x['did'] } 137 + 138 + items.each do |item| 139 + did = AT_URI(item.post_uri).repo 140 + did_obj = DID.new(did) 141 + 142 + if active_dids.include?(did) 143 + # account exists but post doesn't, delete the post reference 144 + item.destroy 145 + else 146 + begin 147 + status = if @account_status_cache.has_key?(did) # don't retry if status was nil 148 + @account_status_cache[did] 149 + else 150 + @account_status_cache[did] ||= did_obj.account_status 151 + end 152 + 153 + case status 154 + when :active 155 + # account is active but wasn't returned in getProfiles, probably was suspended on the AppView 156 + # puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down" 157 + item.destroy 158 + when nil 159 + # account was deleted, so all posts were deleted too 160 + # puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted" 161 + item.destroy 162 + else 163 + # account is inactive/suspended, but could come back, so leave it for now 164 + # puts "#{item.post_uri}: account #{did} is inactive: #{status}" 165 + end 166 + rescue StandardError => e 167 + hostname = did_obj.document.pds_host rescue "???" 168 + @logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}" 169 + 170 + # delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023) 171 + item.destroy if hostname == 'bsky.social' 172 + end 173 + end 174 + 175 + if !item.destroyed? 176 + item.update!(queue: nil) 177 + end 178 + end 71 179 end 72 180 end
+31
app/query_parser.rb
··· 1 + class QueryParser 2 + attr_reader :terms, :exclusions 3 + 4 + def initialize(query) 5 + @terms = [] 6 + @exclusions = [] 7 + 8 + query = query.clone 9 + 10 + while match = query.match(/\-?".+?"/) 11 + range = match.begin(0)...match.end(0) 12 + phrase = query[range] 13 + query[range] = ' ' 14 + 15 + negative = phrase.start_with?('-') 16 + phrase = phrase.gsub(/^\-/, '').gsub(/^"|"$/, '').gsub(/[^\w\-]+/, ' ').strip 17 + 18 + if negative 19 + @exclusions << phrase 20 + else 21 + @terms << phrase 22 + end 23 + end 24 + 25 + terms = query.gsub(/[^\w\-]+/, ' ').strip.split(/ +/) 26 + negative, positive = terms.partition { |x| x.start_with?('-') } 27 + 28 + @terms += positive 29 + @exclusions += negative.map { |x| x[1..-1] }.reject(&:empty?) 30 + end 31 + end
+17
app/reports/basic_report.rb
··· 1 + class BasicReport 2 + def initialize(logger) 3 + @logger = logger 4 + end 5 + 6 + def update(data) 7 + data.each do |k, v| 8 + if k == :downloader 9 + @logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0 10 + elsif k == :queue 11 + next 12 + else 13 + @logger.info({ k => v}.inspect) 14 + end 15 + end 16 + end 17 + end
+43
app/reports/console_report.rb
··· 1 + class ConsoleReport 2 + def initialize 3 + @data = {} 4 + @start = Time.now 5 + @mutex = Mutex.new 6 + end 7 + 8 + def update(data) 9 + @mutex.synchronize { deep_merge(@data, data) } 10 + render 11 + end 12 + 13 + def deep_merge(target, updates) 14 + updates.each do |k, v| 15 + if v.is_a?(Hash) 16 + target[k] ||= {} 17 + deep_merge(target[k], v) 18 + else 19 + target[k] = v 20 + end 21 + end 22 + end 23 + 24 + def render 25 + print " " * 80 + "\r" 26 + puts "Elapsed time: #{(Time.now - @start).to_i} s" 27 + 28 + importers = @data[:importers] || {} 29 + 30 + importers.each do |name, data| 31 + print " " * 80 + "\r" 32 + puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}" 33 + end 34 + 35 + print " " * 80 + "\r" 36 + puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" 37 + 38 + print " " * 80 + "\r" 39 + puts "Queue size: #{@data.dig(:queue, :length) || 0}" 40 + 41 + print "\e[#{3 + importers.length}A" 42 + end 43 + end
+9
app/reports/simple_logger.rb
··· 1 + require 'logger' 2 + 3 + class SimpleLogger < Logger 4 + def initialize 5 + super(STDOUT) 6 + 7 + self.formatter = proc { |level, time, prog, msg| "[#{time}] #{msg}\n" } 8 + end 9 + end
+122 -76
app/server.rb
··· 1 - require 'base58' 2 - require 'base64' 3 - require 'didkit' 4 1 require 'json' 5 - require 'jwt' 6 - require 'openssl' 7 2 require 'sinatra/base' 8 3 9 4 require_relative 'init' 10 - require_relative 'models/like' 5 + require_relative 'authenticator' 11 6 require_relative 'models/user' 7 + require_relative 'query_parser' 12 8 13 9 class Server < Sinatra::Application 14 - class PKeyCache 15 - def self.get(did) 16 - @cache ||= {} 17 - @cache[did] 18 - end 19 - 20 - def self.set(did, pkey) 21 - @cache ||= {} 22 - @cache[did] = pkey 23 - end 24 - end 25 - 26 10 register Sinatra::ActiveRecordExtension 27 - set :port, 3000 11 + set :port, ENV['PORT'] || 3000 28 12 29 13 PAGE_LIMIT = 25 30 - HOSTNAME = 'lycan.feeds.blue' 14 + HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue' 31 15 32 16 helpers do 33 17 def json_response(data) ··· 40 24 [status, JSON.generate({ error: name, message: message })] 41 25 end 42 26 43 - def decode_user_from_jwt(auth_header, endpoint) 44 - return nil unless auth_header.start_with?('Bearer ') 27 + def get_user_did 28 + if settings.development? 29 + did = if request.post? 30 + json = JSON.parse(request.body.read) 31 + request.body.rewind 32 + json['user'] 33 + else 34 + params[:user] 35 + end 45 36 46 - token = auth_header.gsub(/\ABearer /, '') 47 - data = JSON.parse(Base64.decode64(token.split('.')[1])) 48 - did = data['iss'] 49 - return nil if data['aud'] != "did:web:#{HOSTNAME}" || data['lxm'] != endpoint 37 + if did 38 + return did 39 + else 40 + halt json_error('AuthMissing', 'Missing "user" parameter', status: 401) 41 + end 42 + else 43 + auth_header = env['HTTP_AUTHORIZATION'] 44 + if auth_header.to_s.strip.empty? 45 + halt json_error('AuthMissing', 'Missing authentication header', status: 401) 46 + end 50 47 51 - pkey = pkey_for_user(did) 48 + begin 49 + method = request.path.split('/')[2] 50 + did = @authenticator.decode_user_from_jwt(auth_header, method) 51 + rescue StandardError => e 52 + halt json_error('InvalidToken', e.message, status: 401) 53 + end 52 54 53 - decoded = JWT.decode(token, pkey, true, { algorithm: 'ES256K' }) 54 - decoded[0] && decoded[0]['iss'] 55 + if did 56 + return did 57 + else 58 + halt json_error('InvalidToken', "Invalid JWT token", status: 401) 59 + end 60 + end 55 61 end 56 62 57 - def pkey_for_user(did) 58 - # I have no idea what this does, but it seems to be working ยฏ\_(ใƒ„)_/ยฏ 63 + def load_user 64 + did = get_user_did 59 65 60 - if pkey = PKeyCache.get(did) 61 - return pkey 66 + if user = User.find_by(did: did) 67 + return user 68 + else 69 + halt json_error('AccountNotFound', 'Account not found', status: 401) 62 70 end 71 + end 72 + end 63 73 64 - doc = DID.new(did).document.json 65 - key_obj = (doc['verificationMethod'] || []).detect { |x| x['type'] == 'Multikey' } 66 - key_multi = key_obj&.dig('publicKeyMultibase') 67 - return nil unless key_multi 74 + before do 75 + @authenticator = Authenticator.new(hostname: HOSTNAME) 68 76 69 - key_decoded = Base58.base58_to_binary(key_multi[1..], :bitcoin) 70 - comp_key = key_decoded[2..-1] 77 + if settings.development? 78 + headers['Access-Control-Allow-Origin'] = '*' 79 + end 80 + end 71 81 72 - alg_id = OpenSSL::ASN1::Sequence([ 73 - OpenSSL::ASN1::ObjectId('id-ecPublicKey'), 74 - OpenSSL::ASN1::ObjectId('secp256k1') 75 - ]) 82 + get '/' do 83 + "Awoo ๐Ÿบ" 84 + end 76 85 77 - der = OpenSSL::ASN1::Sequence([alg_id, OpenSSL::ASN1::BitString(comp_key)]).to_der 78 - pkey = OpenSSL::PKey.read(der) 86 + get '/xrpc/blue.feeds.lycan.searchPosts' do 87 + @user = load_user 79 88 80 - PKeyCache.set(did, pkey) 89 + if params[:query].to_s.strip.empty? 90 + return json_error('MissingParameter', 'Missing "query" parameter') 91 + end 81 92 82 - pkey 93 + if params[:collection].to_s.strip.empty? 94 + return json_error('MissingParameter', 'Missing "collection" parameter') 83 95 end 84 - end 96 + 97 + query = QueryParser.new(params[:query]) 85 98 86 - get '/xrpc/blue.feeds.lycan.searchPosts' do 87 - headers['access-control-allow-origin'] = '*' 99 + collection = case params[:collection] 100 + when 'likes' then @user.likes 101 + when 'pins' then @user.pins 102 + when 'quotes' then @user.quotes 103 + when 'reposts' then @user.reposts 104 + else return json_error('InvalidParameter', 'Invalid search collection') 105 + end 88 106 89 - if settings.development? 90 - user = User.find_by(did: params[:user]) 91 - return json_error('UserNotFound', 'Missing "user" parameter') if user.nil? 107 + if query.terms.empty? 108 + return json_response(terms: [], posts: [], cursor: nil) 109 + end 110 + 111 + items = collection 112 + .joins(:post) 113 + .includes(:post => :user) 114 + .matching_terms(query.terms) 115 + .excluding_terms(query.exclusions) 116 + .reverse_chronologically 117 + .after_cursor(params[:cursor]) 118 + .limit(PAGE_LIMIT) 119 + 120 + post_uris = case params[:collection] 121 + when 'quotes' 122 + items.map { |x| "at://#{@user.did}/app.bsky.feed.post/#{x.rkey}" } 92 123 else 93 - begin 94 - did = decode_user_from_jwt(env['HTTP_AUTHORIZATION'], 'blue.feeds.lycan.searchPosts') 95 - rescue StandardError => e 96 - p e 97 - end 124 + items.map(&:post).map(&:at_uri) 125 + end 98 126 99 - user = did && User.find_by(did: did) 100 - return json_error('UserNotFound', 'Missing authentication header') if user.nil? 127 + json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor) 128 + end 129 + 130 + if settings.development? 131 + options '/xrpc/blue.feeds.lycan.startImport' do 132 + headers['Access-Control-Allow-Origin'] = '*' 133 + headers['Access-Control-Allow-Headers'] = 'content-type' 101 134 end 135 + end 102 136 103 - if params[:query] 104 - query = params[:query].gsub('%', "\\%") 137 + post '/xrpc/blue.feeds.lycan.startImport' do 138 + did = get_user_did 139 + user = User.find_or_create_by(did: did) 105 140 106 - likes = user.likes 107 - .joins(:post) 108 - .includes(:post => :user) 109 - .where("text ILIKE ?", "%#{query}%") 110 - .order('likes.time DESC, likes.id DESC') 111 - .limit(PAGE_LIMIT) 141 + if !user.valid? 142 + json_error('InvalidRequest', 'Invalid DID') 143 + elsif user.import_job || user.active? 144 + json_response(message: "Import has already started") 145 + else 146 + user.create_import_job! 112 147 113 - if params[:cursor] 114 - timestamp, id = params[:cursor].split(':') 115 - time = Time.at(timestamp.to_f) 116 - likes = likes.where("likes.time < ? OR (likes.time = ? AND likes.id < ?)", time, time, id) 117 - end 148 + status 202 149 + json_response(message: "Import has been scheduled") 150 + end 151 + end 118 152 119 - post_uris = likes.map(&:post).map(&:at_uri) 153 + get '/xrpc/blue.feeds.lycan.getImportStatus' do 154 + did = get_user_did 155 + user = User.find_by(did: did) 156 + until_date = user&.imported_until 120 157 121 - json_response(posts: post_uris, cursor: likes.last && "#{likes.last.time.to_f}:#{likes.last.id}") 158 + case until_date 159 + when nil 160 + if user.import_job || user.imports.exists? 161 + json_response(status: 'scheduled') 162 + else 163 + json_response(status: 'not_started') 164 + end 165 + when Import::IMPORT_END 166 + json_response(status: 'finished') 122 167 else 123 - json_error('MissingParameter', 'Missing "query" parameter') 168 + progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at) 169 + json_response(status: 'in_progress', position: until_date.iso8601, progress: progress) 124 170 end 125 171 end 126 172
+56
bin/firehose
··· 1 + #!/usr/bin/env ruby 2 + 3 + $LOAD_PATH.unshift(File.expand_path('..', __dir__)) 4 + 5 + require 'bundler/setup' 6 + require 'app/firehose_client' 7 + 8 + $stdout.sync = true 9 + 10 + if ENV['ARLOG'] == '1' 11 + ActiveRecord::Base.logger = Logger.new(STDOUT) 12 + else 13 + ActiveRecord::Base.logger = nil 14 + end 15 + 16 + def print_help 17 + puts "Usage: #{$0} [options...]" 18 + puts "Options:" 19 + puts " -r12345 = start from cursor 12345" 20 + end 21 + 22 + firehose = FirehoseClient.new 23 + 24 + args = ARGV.dup 25 + 26 + while arg = args.shift 27 + case arg 28 + when /^\-r(\d+)$/ 29 + firehose.start_cursor = $1.to_i 30 + when '-h', '--help' 31 + print_help 32 + exit 0 33 + else 34 + puts "Unrecognized option: #{arg}" 35 + print_help 36 + exit 1 37 + end 38 + end 39 + 40 + trap("SIGINT") { 41 + firehose.log "Stopping..." 42 + 43 + EM.add_timer(0) { 44 + firehose.stop 45 + } 46 + } 47 + 48 + trap("SIGTERM") { 49 + firehose.log "Shutting down the service..." 50 + 51 + EM.add_timer(0) { 52 + firehose.stop 53 + } 54 + } 55 + 56 + firehose.start
+1
bin/server
··· 4 4 5 5 require 'bundler/setup' 6 6 require 'app/server' 7 + require 'thin' 7 8 8 9 Server.run!
+53
bin/worker
··· 1 + #!/usr/bin/env ruby 2 + 3 + require 'bundler/setup' 4 + require_relative '../app/import_worker' 5 + require_relative '../app/reports/simple_logger' 6 + 7 + $stdout.sync = true 8 + 9 + if ENV['ARLOG'] == '1' 10 + ActiveRecord::Base.logger = Logger.new(STDOUT) 11 + else 12 + ActiveRecord::Base.logger = nil 13 + end 14 + 15 + def print_help 16 + puts "Usage: #{$0} [options...]" 17 + puts "Options:" 18 + puts " -v = verbose" 19 + end 20 + 21 + logger = SimpleLogger.new 22 + worker = ImportWorker.new 23 + worker.logger = logger 24 + 25 + args = ARGV.dup 26 + 27 + while arg = args.shift 28 + case arg 29 + when '-v', '--verbose' 30 + worker.verbose = true 31 + when '-h', '--help' 32 + print_help 33 + exit 0 34 + else 35 + puts "Unrecognized option: #{arg}" 36 + print_help 37 + exit 1 38 + end 39 + end 40 + 41 + trap("SIGINT") { 42 + puts 43 + puts "[#{Time.now}] Stopping..." 44 + exit 45 + } 46 + 47 + trap("SIGTERM") { 48 + puts "[#{Time.now}] Shutting down the service..." 49 + exit 50 + } 51 + 52 + puts "[#{Time.now}] Starting background worker..." 53 + worker.run
+41
config/deploy.rb
··· 1 + # TODO: migrate to capistrano3 bundler integration 2 + require 'bundler/capistrano' 3 + 4 + set :bundle_dir, '' 5 + set :bundle_flags, '--quiet' 6 + set :bundle_without, [] 7 + 8 + set :application, "lycan" 9 + set :repository, "https://tangled.sh/@mackuba.eu/lycan" 10 + set :scm, :git 11 + set :keep_releases, 10 12 + set :use_sudo, false 13 + set :deploy_to, "/var/www/lycan" 14 + set :deploy_via, :remote_cache 15 + set :migrate_env, "RACK_ENV=production" 16 + set :public_children, [] 17 + set :shared_children, ['log', 'tmp/pids'] 18 + 19 + server "lycan.feeds.blue", :app, :web, :db, :primary => true 20 + 21 + before 'bundle:install', 'deploy:set_bundler_options' 22 + 23 + after 'deploy', 'deploy:cleanup' 24 + after 'deploy:migrations', 'deploy:cleanup' 25 + after 'deploy:update_code', 'deploy:link_shared' 26 + 27 + namespace :deploy do 28 + task :restart, :roles => :web do 29 + run "touch #{current_path}/tmp/restart.txt" 30 + end 31 + 32 + task :set_bundler_options do 33 + run "cd #{release_path} && bundle config set --local deployment 'true'" 34 + run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'" 35 + run "cd #{release_path} && bundle config set --local without 'development test'" 36 + end 37 + 38 + task :link_shared do 39 + run "ln -s #{shared_path}/env #{release_path}/.env" 40 + end 41 + end
+22
db/migrate/20250903215014_add_import_collections.rb
··· 1 + class AddImportCollections < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :imports, :collection, :string, limit: 20, null: true 4 + 5 + reversible do |dir| 6 + dir.up do 7 + execute "UPDATE imports SET collection = 'likes'" 8 + end 9 + end 10 + 11 + change_column_null :imports, :collection, false 12 + 13 + remove_index :imports, :user_id, unique: true 14 + add_index :imports, [:user_id, :collection], unique: true 15 + 16 + reversible do |dir| 17 + dir.down do 18 + execute "DELETE FROM imports WHERE collection != 'likes'" 19 + end 20 + end 21 + end 22 + end
+14
db/migrate/20250906005748_add_reposts.rb
··· 1 + class AddReposts < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :reposts do |t| 4 + t.integer "actor_id", null: false 5 + t.string "rkey", limit: 13, null: false 6 + t.datetime "time", null: false 7 + t.bigint "post_id" 8 + t.string "post_uri" 9 + end 10 + 11 + add_index :reposts, [:actor_id, :time, :id], order: { time: :desc, id: :desc } 12 + add_index :reposts, [:actor_id, :rkey], unique: true 13 + end 14 + end
+27
db/migrate/20250906233017_add_quotes_and_pins.rb
··· 1 + class AddQuotesAndPins < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :quotes do |t| 4 + t.integer "actor_id", null: false 5 + t.string "rkey", limit: 13, null: false 6 + t.datetime "time", null: false 7 + t.text "quote_text", null: false 8 + t.bigint "post_id" 9 + t.string "post_uri" 10 + end 11 + 12 + add_index :quotes, [:actor_id, :time, :id], order: { time: :desc, id: :desc } 13 + add_index :quotes, [:actor_id, :rkey], unique: true 14 + 15 + create_table :pins do |t| 16 + t.integer "actor_id", null: false 17 + t.string "rkey", limit: 13, null: false 18 + t.datetime "time", null: false 19 + t.text "pin_text", null: false 20 + t.bigint "post_id" 21 + t.string "post_uri" 22 + end 23 + 24 + add_index :pins, [:actor_id, :time, :id], order: { time: :desc, id: :desc } 25 + add_index :pins, [:actor_id, :rkey], unique: true 26 + end 27 + end
+8
db/migrate/20250918024627_add_subscriptions.rb
··· 1 + class AddSubscriptions < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :subscriptions do |t| 4 + t.string "service", null: false 5 + t.bigint "cursor", null: false 6 + end 7 + end 8 + end
+9
db/migrate/20250920182018_add_import_jobs.rb
··· 1 + class AddImportJobs < ActiveRecord::Migration[7.2] 2 + def change 3 + create_table :import_jobs do |t| 4 + t.integer "user_id", null: false 5 + end 6 + 7 + add_index :import_jobs, :user_id, unique: true 8 + end 9 + end
+7
db/migrate/20250923014702_add_queued_field.rb
··· 1 + class AddQueuedField < ActiveRecord::Migration[7.2] 2 + def change 3 + [:likes, :reposts, :quotes, :pins].each do |table| 4 + add_column(table, :queue, :smallint, null: true) 5 + end 6 + end 7 + end
+5
db/migrate/20250923180153_add_user_created_at.rb
··· 1 + class AddUserCreatedAt < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :users, :registered_at, :datetime, null: true 4 + end 5 + end
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
··· 1 + class AddFetchedUntilToImports < ActiveRecord::Migration[7.2] 2 + def change 3 + add_column :imports, :fetched_until, :datetime 4 + end 5 + end
+51 -2
db/schema.rb
··· 10 10 # 11 11 # It's strongly recommended that you check this file into your version control system. 12 12 13 - ActiveRecord::Schema[7.2].define(version: 2025_08_31_210930) do 13 + ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do 14 14 # These are extensions that must be enabled in order to support this database 15 15 enable_extension "plpgsql" 16 + 17 + create_table "import_jobs", force: :cascade do |t| 18 + t.integer "user_id", null: false 19 + t.index ["user_id"], name: "index_import_jobs_on_user_id", unique: true 20 + end 16 21 17 22 create_table "imports", force: :cascade do |t| 18 23 t.integer "user_id", null: false 19 24 t.string "cursor" 20 25 t.datetime "started_from" 21 26 t.datetime "last_completed" 22 - t.index ["user_id"], name: "index_imports_on_user_id", unique: true 27 + t.string "collection", limit: 20, null: false 28 + t.datetime "fetched_until" 29 + t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true 23 30 end 24 31 25 32 create_table "likes", force: :cascade do |t| ··· 28 35 t.datetime "time", null: false 29 36 t.bigint "post_id" 30 37 t.string "post_uri" 38 + t.integer "queue", limit: 2 31 39 t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true 32 40 t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 33 41 end 34 42 43 + create_table "pins", force: :cascade do |t| 44 + t.integer "actor_id", null: false 45 + t.string "rkey", limit: 13, null: false 46 + t.datetime "time", null: false 47 + t.text "pin_text", null: false 48 + t.bigint "post_id" 49 + t.string "post_uri" 50 + t.integer "queue", limit: 2 51 + t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true 52 + t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 53 + end 54 + 35 55 create_table "posts", force: :cascade do |t| 36 56 t.integer "user_id", null: false 37 57 t.string "rkey", limit: 13, null: false ··· 42 62 t.index ["user_id", "time", "id"], name: "index_posts_on_user_id_and_time_and_id", order: { time: :desc, id: :desc } 43 63 end 44 64 65 + create_table "quotes", force: :cascade do |t| 66 + t.integer "actor_id", null: false 67 + t.string "rkey", limit: 13, null: false 68 + t.datetime "time", null: false 69 + t.text "quote_text", null: false 70 + t.bigint "post_id" 71 + t.string "post_uri" 72 + t.integer "queue", limit: 2 73 + t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true 74 + t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 75 + end 76 + 77 + create_table "reposts", force: :cascade do |t| 78 + t.integer "actor_id", null: false 79 + t.string "rkey", limit: 13, null: false 80 + t.datetime "time", null: false 81 + t.bigint "post_id" 82 + t.string "post_uri" 83 + t.integer "queue", limit: 2 84 + t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true 85 + t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc } 86 + end 87 + 88 + create_table "subscriptions", force: :cascade do |t| 89 + t.string "service", null: false 90 + t.bigint "cursor", null: false 91 + end 92 + 45 93 create_table "users", id: :serial, force: :cascade do |t| 46 94 t.string "did", limit: 260, null: false 95 + t.datetime "registered_at" 47 96 t.index ["did"], name: "index_users_on_did", unique: true 48 97 end 49 98 end
+32 -46
lib/tasks/import.rake
··· 1 - require_relative '../../app/importer' 2 - require_relative '../../app/like_queue' 1 + require_relative '../../app/import_manager' 2 + require_relative '../../app/item_queue' 3 + require_relative '../../app/models/user' 3 4 require_relative '../../app/post_downloader' 5 + require_relative '../../app/reports/console_report' 6 + require_relative '../../app/reports/simple_logger' 4 7 5 - class ImportReport 6 - def initialize 7 - @data = {} 8 - @start = Time.now 8 + task :enqueue_user do 9 + unless ENV['DID'] 10 + raise "Required DID parameter missing" 9 11 end 10 12 11 - def update(data) 12 - data.each do |k, v| 13 - @data[k] ||= {} 14 - @data[k].update(v) 15 - end 13 + user = User.find_or_create_by!(did: ENV['DID']) 16 14 17 - render 18 - end 19 - 20 - def render 21 - print " " * 80 + "\r" 22 - puts "Elapsed time: #{(Time.now - @start).to_i} s" 23 - 24 - print " " * 80 + "\r" 25 - puts "Importer: imported likes = #{@data.dig(:importer, :imported_likes) || 0} (until: #{@data.dig(:importer, :oldest_date)})" + 26 - "#{" (DONE)" if @data.dig(:importer, :finished)}" 27 - 28 - print " " * 80 + "\r" 29 - puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})" 30 - 31 - print " " * 80 + "\r" 32 - puts "Queue size: #{@data.dig(:queue, :length) || 0}" 33 - 34 - print "\e[4A" 15 + if user.import_job 16 + puts "Import for #{user.did} is already scheduled." 17 + elsif user.active? 18 + puts "Import for #{user.did} has already started." 19 + else 20 + user.create_import_job! 21 + puts "Import for #{user.did} scheduled โœ“" 35 22 end 36 23 end 37 24 38 25 task :import_user do 39 - unless ENV['USER'] 40 - raise "Required USER parameter missing" 26 + unless ENV['DID'] 27 + raise "Required DID parameter missing" 41 28 end 42 29 43 - queue = LikeQueue.new(Like.where(post: nil).to_a) 44 - report = ImportReport.new 30 + user = User.find_or_create_by!(did: ENV['DID']) 45 31 46 - importer = Importer.new(ENV['USER']) 47 - importer.like_queue = queue 48 - importer.report = report 32 + unless ENV['COLLECTION'] 33 + raise "Required COLLECTION parameter missing" 34 + end 49 35 50 - downloader = PostDownloader.new 51 - downloader.report = report 52 - 53 - download_thread = Thread.new { downloader.import_from_queue(queue) } 36 + import = ImportManager.new(user) 37 + import.report = ConsoleReport.new 38 + import.logger = SimpleLogger.new 39 + import.time_limit = ENV['UNTIL'] 54 40 55 41 trap("SIGINT") { 56 42 puts "\n\n\n\n\n" 57 43 exit 58 44 } 59 45 60 - importer.run_import(ENV['UNTIL']) 61 - 62 - downloader.stop_when_empty = true 63 - download_thread.join 46 + import.start(ENV['COLLECTION']) 64 47 65 48 puts "\n\n\n\n\n" 66 49 end 67 50 68 51 task :process_posts do 69 - queue = LikeQueue.new(Like.where(post: nil).to_a) 70 - report = ImportReport.new 52 + queue = ItemQueue.new( 53 + [Like, Repost, Quote, Pin].map { |x| x.pending.to_a }.reduce(&:+) 54 + ) 55 + 56 + report = ConsoleReport.new 71 57 72 58 trap("SIGINT") { 73 59 puts "\n\n\n\n\n"