+10
.env.example
+10
.env.example
···
···
1
+
# hostname on which you're running the Lycan server
2
+
SERVER_HOSTNAME=lycan.feeds.blue
3
+
4
+
# customizing servers we connect to
5
+
RELAY_HOST=bsky.network
6
+
# or: JETSTREAM_HOST=jetstream2.us-east.bsky.network
7
+
APPVIEW_HOST=public.api.bsky.app
8
+
9
+
# put your handle here
10
+
# FIREHOSE_USER_AGENT="Lycan (@my.handle)"
+5
-3
Gemfile
+5
-3
Gemfile
···
2
3
gem 'activerecord', '~> 7.2'
4
gem 'sinatra-activerecord', '~> 2.0'
5
gem 'pg'
6
gem 'rake'
7
gem 'irb'
8
-
gem 'rainbow'
9
10
-
gem 'sinatra'
11
12
gem 'minisky', '~> 0.5'
13
gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit'
14
15
gem 'base58'
16
gem 'jwt'
17
18
group :development do
19
-
gem 'puma'
20
gem 'rackup'
21
gem 'capistrano', '~> 2.0'
22
···
2
3
gem 'activerecord', '~> 7.2'
4
gem 'sinatra-activerecord', '~> 2.0'
5
+
gem 'sinatra'
6
gem 'pg'
7
gem 'rake'
8
gem 'irb'
9
10
+
gem 'rainbow'
11
+
gem 'dotenv'
12
13
gem 'minisky', '~> 0.5'
14
gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit'
15
+
gem 'skyfall', '~> 0.6'
16
17
gem 'base58'
18
gem 'jwt'
19
20
group :development do
21
+
gem 'thin'
22
gem 'rackup'
23
gem 'capistrano', '~> 2.0'
24
+58
-34
Gemfile.lock
+58
-34
Gemfile.lock
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
-
activemodel (7.2.2.2)
11
-
activesupport (= 7.2.2.2)
12
-
activerecord (7.2.2.2)
13
-
activemodel (= 7.2.2.2)
14
-
activesupport (= 7.2.2.2)
15
timeout (>= 0.4.0)
16
-
activesupport (7.2.2.2)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
25
minitest (>= 5.1)
26
securerandom (>= 0.3)
27
tzinfo (~> 2.0, >= 2.0.5)
28
base58 (0.2.3)
29
base64 (0.3.0)
30
bcrypt_pbkdf (1.1.1)
31
-
benchmark (0.4.1)
32
-
bigdecimal (3.2.2)
33
capistrano (2.15.11)
34
highline
35
net-scp (>= 1.0.0)
36
net-sftp (>= 2.0.0)
37
net-ssh (>= 2.0.14)
38
net-ssh-gateway (>= 1.1.0)
39
concurrent-ruby (1.3.5)
40
-
connection_pool (2.5.3)
41
-
date (3.4.1)
42
drb (2.2.3)
43
ed25519 (1.4.0)
44
-
erb (5.0.2)
45
highline (3.1.2)
46
reline
47
i18n (1.14.7)
48
concurrent-ruby (~> 1.0)
49
io-console (0.8.1)
50
-
irb (1.15.2)
51
pp (>= 0.6.0)
52
rdoc (>= 4.0.0)
53
reline (>= 0.4.2)
···
56
logger (1.7.0)
57
minisky (0.5.0)
58
base64 (~> 0.1)
59
-
minitest (5.25.5)
60
mustermann (3.0.4)
61
ruby2_keywords (~> 0.0.1)
62
net-scp (4.1.0)
···
66
net-ssh (7.3.0)
67
net-ssh-gateway (2.0.0)
68
net-ssh (>= 4.0.0)
69
-
nio4r (2.7.4)
70
-
pg (1.6.1)
71
-
pg (1.6.1-aarch64-linux)
72
-
pg (1.6.1-aarch64-linux-musl)
73
-
pg (1.6.1-arm64-darwin)
74
-
pg (1.6.1-x86_64-darwin)
75
-
pg (1.6.1-x86_64-linux)
76
-
pg (1.6.1-x86_64-linux-musl)
77
-
pp (0.6.2)
78
prettyprint
79
prettyprint (0.2.0)
80
psych (5.2.6)
81
date
82
stringio
83
-
puma (6.6.1)
84
-
nio4r (~> 2.0)
85
-
rack (3.2.0)
86
-
rack-protection (4.1.1)
87
base64 (>= 0.1.0)
88
logger (>= 1.6.0)
89
rack (>= 3.0.0, < 4)
···
93
rackup (2.2.1)
94
rack (>= 3)
95
rainbow (3.1.1)
96
-
rake (13.3.0)
97
-
rdoc (6.14.2)
98
erb
99
psych (>= 4.0.0)
100
-
reline (0.6.2)
101
io-console (~> 0.5)
102
ruby2_keywords (0.0.5)
103
securerandom (0.4.1)
104
-
sinatra (4.1.1)
105
logger (>= 1.6.0)
106
mustermann (~> 3.0)
107
rack (>= 3.0.0, < 4)
108
-
rack-protection (= 4.1.1)
109
rack-session (>= 2.0.0, < 3)
110
tilt (~> 2.0)
111
sinatra-activerecord (2.0.28)
112
activerecord (>= 4.1)
113
sinatra (>= 1.0)
114
-
stringio (3.1.7)
115
tilt (2.6.1)
116
-
timeout (0.4.3)
117
tzinfo (2.0.6)
118
concurrent-ruby (~> 1.0)
119
120
PLATFORMS
121
aarch64-linux
···
132
bcrypt_pbkdf (>= 1.0, < 2.0)
133
capistrano (~> 2.0)
134
didkit (~> 0.2)!
135
ed25519 (>= 1.2, < 2.0)
136
irb
137
jwt
138
minisky (~> 0.5)
139
pg
140
-
puma
141
rackup
142
rainbow
143
rake
144
sinatra
145
sinatra-activerecord (~> 2.0)
146
147
BUNDLED WITH
148
2.7.0
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
+
activemodel (7.2.3)
11
+
activesupport (= 7.2.3)
12
+
activerecord (7.2.3)
13
+
activemodel (= 7.2.3)
14
+
activesupport (= 7.2.3)
15
timeout (>= 0.4.0)
16
+
activesupport (7.2.3)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
25
minitest (>= 5.1)
26
securerandom (>= 0.3)
27
tzinfo (~> 2.0, >= 2.0.5)
28
+
base32 (0.3.4)
29
base58 (0.2.3)
30
base64 (0.3.0)
31
bcrypt_pbkdf (1.1.1)
32
+
benchmark (0.5.0)
33
+
bigdecimal (3.3.1)
34
capistrano (2.15.11)
35
highline
36
net-scp (>= 1.0.0)
37
net-sftp (>= 2.0.0)
38
net-ssh (>= 2.0.14)
39
net-ssh-gateway (>= 1.1.0)
40
+
cbor (0.5.10.1)
41
concurrent-ruby (1.3.5)
42
+
connection_pool (2.5.4)
43
+
daemons (1.4.1)
44
+
date (3.5.0)
45
+
dotenv (3.1.8)
46
drb (2.2.3)
47
ed25519 (1.4.0)
48
+
erb (6.0.0)
49
+
eventmachine (1.2.7)
50
+
faye-websocket (0.12.0)
51
+
eventmachine (>= 0.12.0)
52
+
websocket-driver (>= 0.8.0)
53
highline (3.1.2)
54
reline
55
i18n (1.14.7)
56
concurrent-ruby (~> 1.0)
57
io-console (0.8.1)
58
+
irb (1.15.3)
59
pp (>= 0.6.0)
60
rdoc (>= 4.0.0)
61
reline (>= 0.4.2)
···
64
logger (1.7.0)
65
minisky (0.5.0)
66
base64 (~> 0.1)
67
+
minitest (5.26.1)
68
mustermann (3.0.4)
69
ruby2_keywords (~> 0.0.1)
70
net-scp (4.1.0)
···
74
net-ssh (7.3.0)
75
net-ssh-gateway (2.0.0)
76
net-ssh (>= 4.0.0)
77
+
pg (1.6.2)
78
+
pg (1.6.2-aarch64-linux)
79
+
pg (1.6.2-aarch64-linux-musl)
80
+
pg (1.6.2-arm64-darwin)
81
+
pg (1.6.2-x86_64-darwin)
82
+
pg (1.6.2-x86_64-linux)
83
+
pg (1.6.2-x86_64-linux-musl)
84
+
pp (0.6.3)
85
prettyprint
86
prettyprint (0.2.0)
87
psych (5.2.6)
88
date
89
stringio
90
+
rack (3.2.4)
91
+
rack-protection (4.2.1)
92
base64 (>= 0.1.0)
93
logger (>= 1.6.0)
94
rack (>= 3.0.0, < 4)
···
98
rackup (2.2.1)
99
rack (>= 3)
100
rainbow (3.1.1)
101
+
rake (13.3.1)
102
+
rdoc (6.15.1)
103
erb
104
psych (>= 4.0.0)
105
+
tsort
106
+
reline (0.6.3)
107
io-console (~> 0.5)
108
ruby2_keywords (0.0.5)
109
securerandom (0.4.1)
110
+
sinatra (4.2.1)
111
logger (>= 1.6.0)
112
mustermann (~> 3.0)
113
rack (>= 3.0.0, < 4)
114
+
rack-protection (= 4.2.1)
115
rack-session (>= 2.0.0, < 3)
116
tilt (~> 2.0)
117
sinatra-activerecord (2.0.28)
118
activerecord (>= 4.1)
119
sinatra (>= 1.0)
120
+
skyfall (0.6.0)
121
+
base32 (~> 0.3, >= 0.3.4)
122
+
base64 (~> 0.1)
123
+
cbor (~> 0.5, >= 0.5.9.6)
124
+
eventmachine (~> 1.2, >= 1.2.7)
125
+
faye-websocket (~> 0.12)
126
+
stringio (3.1.8)
127
+
thin (2.0.1)
128
+
daemons (~> 1.0, >= 1.0.9)
129
+
eventmachine (~> 1.0, >= 1.0.4)
130
+
logger
131
+
rack (>= 1, < 4)
132
tilt (2.6.1)
133
+
timeout (0.4.4)
134
+
tsort (0.2.0)
135
tzinfo (2.0.6)
136
concurrent-ruby (~> 1.0)
137
+
websocket-driver (0.8.0)
138
+
base64
139
+
websocket-extensions (>= 0.1.0)
140
+
websocket-extensions (0.1.5)
141
142
PLATFORMS
143
aarch64-linux
···
154
bcrypt_pbkdf (>= 1.0, < 2.0)
155
capistrano (~> 2.0)
156
didkit (~> 0.2)!
157
+
dotenv
158
ed25519 (>= 1.2, < 2.0)
159
irb
160
jwt
161
minisky (~> 0.5)
162
pg
163
rackup
164
rainbow
165
rake
166
sinatra
167
sinatra-activerecord (~> 2.0)
168
+
skyfall (~> 0.6)
169
+
thin
170
171
BUNDLED WITH
172
2.7.0
+110
README.md
+110
README.md
···
···
1
+
# Lycan 🐺
2
+
3
+
A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive.
4
+
5
+
6
+
## How it works
7
+
8
+
Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose.
9
+
10
+
At the moment, Lycan indexes four types of content:
11
+
12
+
- posts you've liked
13
+
- posts you've reposted
14
+
- posts you've quoted
15
+
- your old-style bookmarks (using the 📌 emoji method)
16
+
17
+
New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added.
18
+
19
+
Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API – the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)).
20
+
21
+
The service consists of three separate components:
22
+
23
+
- a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported
24
+
- a **background worker**, which runs the import process
25
+
- an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying
26
+
27
+
28
+
## Setting up on localhost
29
+
30
+
This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it.
31
+
32
+
A Postgres database is also required (again, any non-ancient version should work).
33
+
34
+
Download or clone the repository, then install the dependencies:
35
+
36
+
```
37
+
bundle install
38
+
```
39
+
40
+
Next, create the database – the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task:
41
+
42
+
```
43
+
bundle exec rake db:create
44
+
```
45
+
46
+
Then, run the migrations:
47
+
48
+
```
49
+
bundle exec rake db:migrate
50
+
```
51
+
52
+
To run an import, you will need to run three separate processes, probably in separate terminal tabs:
53
+
54
+
1) the firehose client, [`bin/firehose`](bin/firehose)
55
+
2) the background worker, [`bin/worker`](bin/worker)
56
+
3) the Sinatra HTTP server, [`bin/server`](bin/server)
57
+
58
+
The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu – but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL.
59
+
60
+
You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.)
61
+
62
+
63
+
## Configuration
64
+
65
+
There's a few things you can configure through ENV variables:
66
+
67
+
- `RELAY_HOST` – hostname of the relay to use for the firehose (default: `bsky.network`)
68
+
- `JETSTREAM_HOST` – alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance
69
+
- `FIREHOSE_USER_AGENT` – when running in production, it's recommended that you set this to some name that identifies who is running the service
70
+
- `APPVIEW_HOST` – hostname of the AppView used to download posts (default: `public.api.bsky.app`)
71
+
- `SERVER_HOSTNAME` – hostname of the server on which you're running the service in production
72
+
73
+
74
+
## Rake tasks
75
+
76
+
Some Rake tasks that might be useful:
77
+
78
+
```
79
+
bundle exec rake enqueue_user DID=did:plc:qweqwe
80
+
```
81
+
82
+
- request an import of the given account (to be handled by firehose + worker)
83
+
84
+
```
85
+
bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all
86
+
```
87
+
88
+
- run a complete import synchronously
89
+
90
+
```
91
+
bundle exec rake process_posts
92
+
```
93
+
94
+
- process all previously queued and unfinished or failed items
95
+
96
+
97
+
## Running in production
98
+
99
+
This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup.
100
+
101
+
On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server – my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose).
102
+
103
+
104
+
## Credits
105
+
106
+
Copyright © 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
107
+
108
+
The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
109
+
110
+
Bug reports and pull requests are welcome 😎
+1
Rakefile
+1
Rakefile
+44
app/at_uri.rb
+44
app/at_uri.rb
···
···
1
+
class AT_URI
2
+
class InvalidURIError
3
+
end
4
+
5
+
TID_PATTERN = /^[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}$/
6
+
7
+
attr_reader :repo, :collection, :rkey
8
+
9
+
def initialize(uri)
10
+
uri = uri.to_s
11
+
raise InvalidURIError, "Invalid AT URI: #{uri}" if uri.include?(' ') || !uri.start_with?('at://')
12
+
13
+
parts = uri.split('/')
14
+
parts.each(&:freeze)
15
+
16
+
raise InvalidURIError, "Invalid AT URI: #{uri}" if parts.length != 5 || parts[2..4].any?(&:empty?)
17
+
18
+
@uri = uri
19
+
@repo, @collection, @rkey = parts[2..4]
20
+
end
21
+
22
+
def to_ary
23
+
[@repo, @collection, @rkey]
24
+
end
25
+
26
+
alias split to_ary
27
+
28
+
def to_s
29
+
@uri
30
+
end
31
+
32
+
def has_valid_tid?
33
+
@rkey =~ TID_PATTERN
34
+
end
35
+
36
+
def is_post?
37
+
@collection == 'app.bsky.feed.post' && has_valid_tid?
38
+
end
39
+
end
40
+
41
+
def AT_URI(uri)
42
+
return uri if uri.is_a?(AT_URI)
43
+
AT_URI.new(uri)
44
+
end
+67
app/authenticator.rb
+67
app/authenticator.rb
···
···
1
+
require 'base58'
2
+
require 'base64'
3
+
require 'didkit'
4
+
require 'json'
5
+
require 'jwt'
6
+
require 'openssl'
7
+
8
+
class Authenticator
9
+
class InvalidTokenError < StandardError
10
+
end
11
+
12
+
def initialize(hostname:)
13
+
@@pkey_cache ||= {}
14
+
@hostname = hostname
15
+
end
16
+
17
+
def decode_user_from_jwt(auth_header, endpoint)
18
+
return nil unless auth_header.start_with?('Bearer ')
19
+
20
+
token = auth_header.gsub(/\ABearer /, '')
21
+
parts = token.split('.')
22
+
raise InvalidTokenError, "Invalid JWT token" if parts.length != 3
23
+
24
+
begin
25
+
decoded_data = Base64.decode64(parts[1])
26
+
data = JSON.parse(decoded_data)
27
+
rescue StandardError => e
28
+
raise InvalidTokenError, "Invalid JWT token"
29
+
end
30
+
31
+
did = data['iss']
32
+
return nil if did.nil? || data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint
33
+
34
+
pkey = pkey_for_user(did)
35
+
36
+
decoded = JWT.decode(token, pkey, true, { algorithm: 'ES256K' })
37
+
decoded[0] && decoded[0]['iss']
38
+
end
39
+
40
+
def pkey_for_user(did)
41
+
# I have no idea what this does, but it seems to be working ¯\_(ツ)_/¯
42
+
43
+
if pkey = @@pkey_cache[did]
44
+
return pkey
45
+
end
46
+
47
+
doc = DID.new(did).document.json
48
+
key_obj = (doc['verificationMethod'] || []).detect { |x| x['type'] == 'Multikey' }
49
+
key_multi = key_obj&.dig('publicKeyMultibase')
50
+
return nil unless key_multi
51
+
52
+
key_decoded = Base58.base58_to_binary(key_multi[1..], :bitcoin)
53
+
comp_key = key_decoded[2..-1]
54
+
55
+
alg_id = OpenSSL::ASN1::Sequence([
56
+
OpenSSL::ASN1::ObjectId('id-ecPublicKey'),
57
+
OpenSSL::ASN1::ObjectId('secp256k1')
58
+
])
59
+
60
+
der = OpenSSL::ASN1::Sequence([alg_id, OpenSSL::ASN1::BitString(comp_key)]).to_der
61
+
pkey = OpenSSL::PKey.read(der)
62
+
63
+
@@pkey_cache[did] = pkey
64
+
65
+
pkey
66
+
end
67
+
end
+230
app/firehose_client.rb
+230
app/firehose_client.rb
···
···
1
+
require 'skyfall'
2
+
3
+
require_relative 'init'
4
+
require_relative 'models/import_job'
5
+
require_relative 'models/post'
6
+
require_relative 'models/subscription'
7
+
require_relative 'models/user'
8
+
9
+
class FirehoseClient
10
+
attr_accessor :start_cursor, :service
11
+
12
+
DEFAULT_RELAY = 'bsky.network'
13
+
14
+
def initialize
15
+
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16
+
17
+
if ENV['RELAY_HOST']
18
+
@service = ENV['RELAY_HOST']
19
+
elsif ENV['JETSTREAM_HOST']
20
+
@service = ENV['JETSTREAM_HOST']
21
+
@jetstream = true
22
+
else
23
+
@service = DEFAULT_RELAY
24
+
end
25
+
end
26
+
27
+
def start
28
+
return if @sky
29
+
30
+
@active_users = load_users
31
+
32
+
log "Starting firehose process (YJIT = #{RubyVM::YJIT.enabled? ? 'on' : 'off'})"
33
+
34
+
last_cursor = load_or_init_cursor
35
+
cursor = @start_cursor || last_cursor
36
+
37
+
@sky = if @jetstream
38
+
Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] })
39
+
else
40
+
Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
41
+
end
42
+
43
+
@sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string
44
+
@sky.check_heartbeat = true
45
+
46
+
@sky.on_message do |m|
47
+
start_time = Time.now
48
+
diff = start_time - @last_update
49
+
50
+
if diff > 30
51
+
log "Receiving messages again after #{sprintf('%.1f', diff)}s, starting from #{m.time.getlocal}"
52
+
end
53
+
54
+
@last_update = start_time
55
+
process_message(m)
56
+
end
57
+
58
+
@sky.on_connecting { |u| log "Connecting to #{u}..." }
59
+
@sky.on_connect {
60
+
log "Connected ✓"
61
+
62
+
@replaying = true
63
+
@last_update = Time.now
64
+
65
+
@live_check_timer ||= EM::PeriodicTimer.new(20) do
66
+
now = Time.now
67
+
diff = now - @last_update
68
+
69
+
if diff > 30
70
+
log "Timer: last update #{sprintf('%.1f', diff)}s ago"
71
+
end
72
+
end
73
+
74
+
@jobs_timer ||= EM::PeriodicTimer.new(3) do
75
+
ImportJob.all.each do |job|
76
+
@active_users[job.user.did] = job.user
77
+
job.create_imports
78
+
job.destroy
79
+
end
80
+
end
81
+
}
82
+
83
+
@sky.on_disconnect {
84
+
log "Disconnected."
85
+
}
86
+
87
+
@sky.on_reconnect {
88
+
log "Connection lost, reconnecting..."
89
+
90
+
@timer&.cancel
91
+
@timer = nil
92
+
}
93
+
94
+
@sky.on_timeout {
95
+
log "Trying to reconnect..."
96
+
}
97
+
98
+
@sky.on_error { |e| log "ERROR: #{e.class} #{e.message}" }
99
+
100
+
@sky.connect
101
+
end
102
+
103
+
def stop
104
+
save_cursor(@sky.cursor) unless @sky.nil?
105
+
106
+
@sky&.disconnect
107
+
@sky = nil
108
+
end
109
+
110
+
def load_or_init_cursor
111
+
if sub = Subscription.find_by(service: @service)
112
+
sub.cursor
113
+
else
114
+
Subscription.create!(service: @service, cursor: 0)
115
+
nil
116
+
end
117
+
end
118
+
119
+
def save_cursor(cursor)
120
+
Subscription.where(service: @service).update_all(cursor: cursor)
121
+
end
122
+
123
+
def load_users
124
+
User.active.map { |u| [u.did, u] }.then { |list| Hash[list] }
125
+
end
126
+
127
+
def process_message(msg)
128
+
save_cursor(msg.seq) if msg.seq % 1000 == 0
129
+
130
+
case msg.type
131
+
when :info
132
+
log "InfoMessage: #{msg}"
133
+
when :account
134
+
process_account_event(msg)
135
+
when :commit
136
+
if @replaying
137
+
log "Replaying events since #{msg.time.getlocal} -->"
138
+
@replaying = false
139
+
end
140
+
141
+
@current_user = @active_users[msg.repo]
142
+
143
+
msg.operations.each do |op|
144
+
case op.type
145
+
when :bsky_like
146
+
process_like(msg, op)
147
+
when :bsky_repost
148
+
process_repost(msg, op)
149
+
when :bsky_post
150
+
process_post(msg, op)
151
+
end
152
+
end
153
+
end
154
+
rescue CBOR::UnpackError
155
+
# ignore invalid records
156
+
end
157
+
158
+
def process_account_event(msg)
159
+
if msg.status == :deleted
160
+
if user = User.find_by(did: msg.repo)
161
+
user.destroy
162
+
@active_users.delete_if { |k, u| u.id == user.id }
163
+
end
164
+
end
165
+
end
166
+
167
+
def process_like(msg, op)
168
+
return unless @current_user
169
+
170
+
if op.action == :create
171
+
return if op.raw_record.nil?
172
+
@current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
173
+
elsif op.action == :delete
174
+
@current_user.likes.where(rkey: op.rkey).delete_all
175
+
end
176
+
rescue StandardError => e
177
+
log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}"
178
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
179
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
180
+
end
181
+
182
+
def process_repost(msg, op)
183
+
return unless @current_user
184
+
185
+
if op.action == :create
186
+
return if op.raw_record.nil?
187
+
@current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
188
+
elsif op.action == :delete
189
+
@current_user.reposts.where(rkey: op.rkey).delete_all
190
+
end
191
+
rescue StandardError => e
192
+
log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}"
193
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
194
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
195
+
end
196
+
197
+
def process_post(msg, op)
198
+
if op.action == :create
199
+
return if op.raw_record.nil?
200
+
201
+
if @current_user
202
+
@current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
203
+
@current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
204
+
end
205
+
elsif op.action == :delete
206
+
if @current_user
207
+
@current_user.quotes.where(rkey: op.rkey).delete_all
208
+
@current_user.pins.where(rkey: op.rkey).delete_all
209
+
end
210
+
211
+
if post = Post.find_by_at_uri(op.uri)
212
+
post.destroy
213
+
end
214
+
end
215
+
rescue StandardError => e
216
+
log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}"
217
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
218
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
219
+
end
220
+
221
+
def log(text)
222
+
puts "[#{Time.now}] #{text}"
223
+
end
224
+
225
+
def inspect
226
+
vars = instance_variables - [:@jobs_timer, :@live_check_timer]
227
+
values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
228
+
"#<#{self.class}:0x#{object_id} #{values}>"
229
+
end
230
+
end
+73
app/import_manager.rb
+73
app/import_manager.rb
···
···
1
+
require_relative 'importers/likes_importer'
2
+
require_relative 'importers/posts_importer'
3
+
require_relative 'importers/reposts_importer'
4
+
require_relative 'item_queue'
5
+
require_relative 'post_downloader'
6
+
7
+
class ImportManager
8
+
attr_accessor :report, :time_limit, :logger, :log_status_updates
9
+
10
+
def initialize(user)
11
+
@user = user
12
+
end
13
+
14
+
def start(sets)
15
+
importers = []
16
+
sets = [sets] unless sets.is_a?(Array)
17
+
18
+
sets.each do |set|
19
+
case set
20
+
when 'likes'
21
+
importers << LikesImporter.new(@user)
22
+
when 'reposts'
23
+
importers << RepostsImporter.new(@user)
24
+
when 'posts'
25
+
importers << PostsImporter.new(@user)
26
+
when 'all'
27
+
importers += [
28
+
LikesImporter.new(@user),
29
+
RepostsImporter.new(@user),
30
+
PostsImporter.new(@user)
31
+
]
32
+
else
33
+
raise "Invalid collection: #{set}"
34
+
end
35
+
end
36
+
37
+
queued_items = @user.all_items_in_queue(:import).sort_by(&:time).reverse
38
+
queue = ItemQueue.new(queued_items)
39
+
40
+
downloader = PostDownloader.new
41
+
downloader.report = @report
42
+
downloader.logger = @logger
43
+
44
+
download_thread = Thread.new do
45
+
@logger&.info "Starting downloader thread for #{@user}" if @log_status_updates
46
+
47
+
downloader.import_from_queue(queue)
48
+
49
+
@logger&.info "Ended downloader thread for #{@user}" if @log_status_updates
50
+
end
51
+
52
+
import_threads = importers.map do |import|
53
+
import.item_queue = queue
54
+
import.report = @report
55
+
import.logger = @logger
56
+
57
+
Thread.new do
58
+
@logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates
59
+
60
+
import.run_import(@time_limit)
61
+
62
+
@logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates
63
+
end
64
+
end
65
+
66
+
import_threads.each { |i| i.join }
67
+
68
+
@logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates
69
+
70
+
downloader.stop_when_empty = true
71
+
download_thread.join
72
+
end
73
+
end
+94
app/import_worker.rb
+94
app/import_worker.rb
···
···
1
+
require 'active_record'
2
+
require 'minisky'
3
+
require 'time'
4
+
5
+
require_relative 'init'
6
+
require_relative 'import_manager'
7
+
require_relative 'models/import'
8
+
require_relative 'post_downloader'
9
+
require_relative 'reports/basic_report'
10
+
11
+
class ImportWorker
12
+
attr_accessor :verbose, :logger
13
+
14
+
class UserThread < Thread
15
+
def initialize(user, collections, logger, verbose = false)
16
+
@user = user
17
+
@verbose = verbose
18
+
@logger = logger
19
+
20
+
super { run(collections) }
21
+
end
22
+
23
+
def user_id
24
+
@user.id
25
+
end
26
+
27
+
def run(collections)
28
+
@logger&.info "Starting import thread for #{@user}"
29
+
30
+
if @user.registered_at.nil?
31
+
registration_time = get_registration_time(@user)
32
+
@user.update!(registered_at: registration_time)
33
+
end
34
+
35
+
import = ImportManager.new(@user)
36
+
37
+
if @logger
38
+
import.report = BasicReport.new(@logger) if @verbose
39
+
import.logger = @logger
40
+
import.log_status_updates = true
41
+
end
42
+
43
+
import.start(collections)
44
+
45
+
@logger&.info "Ended import thread for #{@user}"
46
+
end
47
+
48
+
def get_registration_time(user)
49
+
sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
50
+
profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did })
51
+
52
+
Time.parse(profile['createdAt'])
53
+
end
54
+
end
55
+
56
+
def run
57
+
@user_threads = []
58
+
59
+
@firehose_thread = Thread.new { process_firehose_items }
60
+
@downloader = PostDownloader.new
61
+
@downloader.logger = @logger
62
+
63
+
loop do
64
+
@user_threads.delete_if { |t| !t.alive? }
65
+
66
+
users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a
67
+
68
+
users.each do |user|
69
+
collections = user.imports.unfinished.map(&:collection)
70
+
thread = UserThread.new(user, collections, @logger, @verbose)
71
+
@user_threads << thread
72
+
end
73
+
74
+
# possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
75
+
sleep 5
76
+
end
77
+
end
78
+
79
+
def process_firehose_items
80
+
loop do
81
+
items = [Like, Repost, Pin, Quote]
82
+
.map { |type| type.in_queue(:firehose).order('time').limit(25) }
83
+
.flatten
84
+
.sort_by(&:time)
85
+
.first(25)
86
+
87
+
if items.length > 0
88
+
@downloader.process_items(items)
89
+
else
90
+
sleep 5
91
+
end
92
+
end
93
+
end
94
+
end
-102
app/importer.rb
-102
app/importer.rb
···
1
-
require 'didkit'
2
-
require 'minisky'
3
-
4
-
require_relative 'models/import'
5
-
require_relative 'models/like'
6
-
require_relative 'models/post'
7
-
require_relative 'models/user'
8
-
9
-
class Importer
10
-
attr_accessor :like_queue, :report
11
-
12
-
def initialize(user_did)
13
-
@did = DID.new(user_did)
14
-
@user = User.find_or_create_by!(did: user_did)
15
-
16
-
@uid_cache = { user_did => @user.id }
17
-
end
18
-
19
-
def run_import(time_limit = nil)
20
-
@minisky = Minisky.new(@did.document.pds_host, nil)
21
-
22
-
import_likes(time_limit)
23
-
end
24
-
25
-
def import_likes(requested_time_limit = nil)
26
-
import = @user.import || @user.create_import!
27
-
28
-
params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 }
29
-
30
-
if import.cursor
31
-
params[:cursor] = import.cursor
32
-
else
33
-
import.update!(started_from: Time.now) unless requested_time_limit
34
-
end
35
-
36
-
count = 0
37
-
time_limit = requested_time_limit || import.last_completed
38
-
39
-
puts "Fetching until: #{time_limit}" if time_limit
40
-
41
-
loop do
42
-
response = @minisky.get_request('com.atproto.repo.listRecords', params)
43
-
44
-
records = response['records']
45
-
cursor = response['cursor']
46
-
47
-
count += records.length
48
-
@report&.update(importer: { imported_likes: count })
49
-
@report&.update(importer: { oldest_date: Time.parse(records.last['value']['createdAt']) }) unless records.empty?
50
-
51
-
process_likes(records)
52
-
params[:cursor] = cursor
53
-
54
-
import.update!(cursor: cursor)
55
-
56
-
break if !cursor
57
-
break if time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < time_limit }
58
-
end
59
-
60
-
import.update!(cursor: nil, started_from: nil)
61
-
import.update!(last_completed: import.started_from) unless requested_time_limit
62
-
63
-
@report&.update(importer: { finished: true })
64
-
end
65
-
66
-
def process_likes(likes)
67
-
likes.each do |record|
68
-
begin
69
-
like_rkey = record['uri'].split('/').last
70
-
next if @user.likes.where(rkey: like_rkey).exists?
71
-
72
-
like_time = Time.parse(record['value']['createdAt'])
73
-
74
-
post_uri = record['value']['subject']['uri']
75
-
parts = post_uri.split('/')
76
-
next if parts[3] != 'app.bsky.feed.post'
77
-
78
-
post_did, _, post_rkey = parts[2..4]
79
-
80
-
if @uid_cache[post_did].nil?
81
-
post_author = User.find_or_create_by!(did: post_did)
82
-
@uid_cache[post_did] = post_author.id
83
-
end
84
-
85
-
post = Post.find_by(user_id: @uid_cache[post_did], rkey: post_rkey)
86
-
87
-
if post
88
-
@user.likes.create!(rkey: like_rkey, time: like_time, post: post)
89
-
else
90
-
like_stub = @user.likes.create!(rkey: like_rkey, time: like_time, post_uri: post_uri)
91
-
92
-
if @like_queue
93
-
@like_queue.push(like_stub)
94
-
@report&.update(queue: { length: @like_queue.length })
95
-
end
96
-
end
97
-
rescue StandardError => e
98
-
puts "Error in Importer#process_likes: #{record['uri']}: #{e}"
99
-
end
100
-
end
101
-
end
102
-
end
···
+54
app/importers/base_importer.rb
+54
app/importers/base_importer.rb
···
···
1
+
require 'didkit'
2
+
require 'minisky'
3
+
require 'time'
4
+
5
+
require_relative '../at_uri'
6
+
require_relative '../errors'
7
+
require_relative '../models/post'
8
+
require_relative '../models/user'
9
+
10
+
class BaseImporter
11
+
attr_accessor :item_queue, :report, :logger
12
+
13
+
def initialize(user)
14
+
@did = DID.new(user.did)
15
+
@user = user
16
+
@imported_count = 0
17
+
end
18
+
19
+
def importer_name
20
+
self.class.name
21
+
end
22
+
23
+
def collection
24
+
importer_name.gsub(/Importer$/, '').downcase
25
+
end
26
+
27
+
def run_import(requested_time_limit = nil)
28
+
@minisky = Minisky.new(@did.document.pds_host, nil)
29
+
@import = @user.imports.find_by(collection: collection) || @user.imports.create!(collection: collection)
30
+
31
+
if @import.cursor.nil?
32
+
@import.update!(started_from: Time.now) unless requested_time_limit
33
+
end
34
+
35
+
@time_limit = requested_time_limit || @import.last_completed
36
+
@logger&.info "Fetching until: #{@time_limit}" if @time_limit
37
+
38
+
import_items
39
+
40
+
@import.update!(last_completed: @import.started_from) unless requested_time_limit
41
+
@import.update!(cursor: nil, started_from: nil, fetched_until: nil)
42
+
@report&.update(importers: { importer_name => { :finished => true }})
43
+
end
44
+
45
+
def import_items
46
+
raise NotImplementedError
47
+
end
48
+
49
+
def created_at(record)
50
+
Time.parse(record['createdAt'])
51
+
rescue StandardError
52
+
raise InvalidRecordError
53
+
end
54
+
end
+42
app/importers/likes_importer.rb
+42
app/importers/likes_importer.rb
···
···
1
+
require_relative 'base_importer'
2
+
3
+
class LikesImporter < BaseImporter
4
+
def import_items
5
+
params = { repo: @did, collection: 'app.bsky.feed.like', limit: 100 }
6
+
params[:cursor] = @import.cursor if @import.cursor
7
+
8
+
loop do
9
+
response = @minisky.get_request('com.atproto.repo.listRecords', params)
10
+
11
+
records = response['records']
12
+
cursor = response['cursor']
13
+
oldest_date = nil
14
+
15
+
records.each do |record|
16
+
begin
17
+
like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import)
18
+
19
+
record_date = like&.time || created_at(record['value'])
20
+
oldest_date = [oldest_date, record_date].compact.min
21
+
22
+
if like && like.pending? && @item_queue
23
+
@item_queue.push(like)
24
+
@report&.update(queue: { length: @item_queue.length })
25
+
end
26
+
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}"
28
+
end
29
+
end
30
+
31
+
@imported_count += records.length
32
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
33
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
+
35
+
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
+
38
+
break if !cursor
39
+
break if @time_limit && oldest_date && oldest_date < @time_limit
40
+
end
41
+
end
42
+
end
+50
app/importers/posts_importer.rb
+50
app/importers/posts_importer.rb
···
···
1
+
require_relative 'base_importer'
2
+
3
+
class PostsImporter < BaseImporter
4
+
def import_items
5
+
params = { repo: @did, collection: 'app.bsky.feed.post', limit: 100 }
6
+
params[:cursor] = @import.cursor if @import.cursor
7
+
8
+
loop do
9
+
response = @minisky.get_request('com.atproto.repo.listRecords', params)
10
+
11
+
records = response['records']
12
+
cursor = response['cursor']
13
+
oldest_date = nil
14
+
15
+
records.each do |record|
16
+
begin
17
+
quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import)
18
+
pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import)
19
+
20
+
record_date = quote&.time || pin&.time || created_at(record['value'])
21
+
oldest_date = [oldest_date, record_date].compact.min
22
+
23
+
if @item_queue
24
+
if quote && quote.pending?
25
+
@item_queue.push(quote)
26
+
end
27
+
28
+
if pin && pin.pending?
29
+
@item_queue.push(pin)
30
+
end
31
+
32
+
@report&.update(queue: { length: @item_queue.length })
33
+
end
34
+
rescue InvalidRecordError => e
35
+
@logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}"
36
+
end
37
+
end
38
+
39
+
@imported_count += records.length
40
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
41
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
42
+
43
+
params[:cursor] = cursor
44
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
45
+
46
+
break if !cursor
47
+
break if @time_limit && oldest_date && oldest_date < @time_limit
48
+
end
49
+
end
50
+
end
+42
app/importers/reposts_importer.rb
+42
app/importers/reposts_importer.rb
···
···
1
+
require_relative 'base_importer'
2
+
3
+
class RepostsImporter < BaseImporter
4
+
def import_items
5
+
params = { repo: @did, collection: 'app.bsky.feed.repost', limit: 100 }
6
+
params[:cursor] = @import.cursor if @import.cursor
7
+
8
+
loop do
9
+
response = @minisky.get_request('com.atproto.repo.listRecords', params)
10
+
11
+
records = response['records']
12
+
cursor = response['cursor']
13
+
oldest_date = nil
14
+
15
+
records.each do |record|
16
+
begin
17
+
repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import)
18
+
19
+
record_date = repost&.time || created_at(record['value'])
20
+
oldest_date = [oldest_date, record_date].compact.min
21
+
22
+
if repost && repost.pending? && @item_queue
23
+
@item_queue.push(repost)
24
+
@report&.update(queue: { length: @item_queue.length })
25
+
end
26
+
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}"
28
+
end
29
+
end
30
+
31
+
@imported_count += records.length
32
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
33
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
+
35
+
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
+
38
+
break if !cursor
39
+
break if @time_limit && oldest_date && oldest_date < @time_limit
40
+
end
41
+
end
42
+
end
+1
app/init.rb
+1
app/init.rb
+28
app/item_queue.rb
+28
app/item_queue.rb
···
···
1
+
class ItemQueue
2
+
BATCH_SIZE = 25
3
+
4
+
def initialize(items = [])
5
+
@mutex = Mutex.new
6
+
@queue = items
7
+
end
8
+
9
+
def push(item)
10
+
@mutex.synchronize {
11
+
@queue << item
12
+
}
13
+
end
14
+
15
+
def pop_batch
16
+
@mutex.synchronize {
17
+
batch = @queue[0...BATCH_SIZE]
18
+
@queue = @queue[BATCH_SIZE..-1] || []
19
+
batch
20
+
}
21
+
end
22
+
23
+
def length
24
+
@mutex.synchronize {
25
+
@queue.length
26
+
}
27
+
end
28
+
end
-28
app/like_queue.rb
-28
app/like_queue.rb
···
1
-
class LikeQueue
2
-
BATCH_SIZE = 25
3
-
4
-
def initialize(likes = [])
5
-
@mutex = Mutex.new
6
-
@queue = likes
7
-
end
8
-
9
-
def push(like)
10
-
@mutex.synchronize {
11
-
@queue << like
12
-
}
13
-
end
14
-
15
-
def pop_batch
16
-
@mutex.synchronize {
17
-
batch = @queue[0...BATCH_SIZE]
18
-
@queue = @queue[BATCH_SIZE..-1] || []
19
-
batch
20
-
}
21
-
end
22
-
23
-
def length
24
-
@mutex.synchronize {
25
-
@queue.length
26
-
}
27
-
end
28
-
end
···
+31
app/models/import.rb
+31
app/models/import.rb
···
4
5
class Import < ActiveRecord::Base
6
belongs_to :user
7
+
8
+
validates_inclusion_of :collection, in: %w(likes reposts posts)
9
+
validates_uniqueness_of :collection, scope: :user_id
10
+
11
+
scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') }
12
+
13
+
IMPORT_END = Time.at(0)
14
+
15
+
def imported_until
16
+
return nil if cursor.nil? && last_completed.nil?
17
+
18
+
groups = case collection
19
+
when 'likes'
20
+
[:likes]
21
+
when 'reposts'
22
+
[:reposts]
23
+
when 'posts'
24
+
[:pins, :quotes]
25
+
end
26
+
27
+
newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last }
28
+
newest_queued = newest_queued_items.compact.sort_by(&:time).last
29
+
30
+
if newest_queued
31
+
newest_queued.time
32
+
elsif fetched_until
33
+
fetched_until
34
+
else
35
+
IMPORT_END
36
+
end
37
+
end
38
end
+13
app/models/import_job.rb
+13
app/models/import_job.rb
+35
app/models/importable.rb
+35
app/models/importable.rb
···
···
1
+
require 'active_support/concern'
2
+
3
+
require_relative '../at_uri'
4
+
require_relative 'post'
5
+
6
+
module Importable
7
+
extend ActiveSupport::Concern
8
+
9
+
included do
10
+
scope :pending, -> { where(post: nil) }
11
+
scope :in_queue, ->(q) { where(queue: q) }
12
+
13
+
enum :queue, { firehose: 0, import: 1 }
14
+
15
+
validates_presence_of :post_uri, if: -> { post_id.nil? }
16
+
validate :check_queue
17
+
18
+
def pending?
19
+
post_uri != nil
20
+
end
21
+
22
+
def check_queue
23
+
errors.add(:queue, 'must be nil if already processed') if queue && post
24
+
end
25
+
26
+
def import_item!(args = {})
27
+
post_uri = AT_URI(self.post_uri)
28
+
return nil if !post_uri.is_post?
29
+
30
+
self.assign_attributes(args)
31
+
self.save!
32
+
self
33
+
end
34
+
end
35
+
end
+16
-3
app/models/like.rb
+16
-3
app/models/like.rb
···
1
require 'active_record'
2
3
require_relative 'post'
4
require_relative 'user'
5
6
class Like < ActiveRecord::Base
7
validates_presence_of :time, :rkey
8
-
validates_length_of :rkey, maximum: 13
9
-
10
-
validates_presence_of :post_uri, if: -> { post_id.nil? }
11
12
belongs_to :user, foreign_key: 'actor_id'
13
belongs_to :post, optional: true
14
end
···
1
require 'active_record'
2
+
require 'time'
3
4
+
require_relative '../at_uri'
5
require_relative 'post'
6
+
require_relative 'importable'
7
+
require_relative 'searchable'
8
require_relative 'user'
9
10
class Like < ActiveRecord::Base
11
+
include Searchable
12
+
include Importable
13
+
14
validates_presence_of :time, :rkey
15
+
validates_length_of :rkey, is: 13
16
17
belongs_to :user, foreign_key: 'actor_id'
18
belongs_to :post, optional: true
19
+
20
+
def self.new_from_record(uri, record)
21
+
self.new(
22
+
rkey: AT_URI(uri).rkey,
23
+
time: Time.parse(record['createdAt']),
24
+
post_uri: record['subject']['uri']
25
+
)
26
+
end
27
end
+32
app/models/pin.rb
+32
app/models/pin.rb
···
···
1
+
require 'active_record'
2
+
require 'time'
3
+
4
+
require_relative '../at_uri'
5
+
require_relative 'post'
6
+
require_relative 'searchable'
7
+
require_relative 'user'
8
+
9
+
class Pin < ActiveRecord::Base
10
+
include Searchable
11
+
include Importable
12
+
13
+
PIN_SIGN = '📌'
14
+
15
+
validates_presence_of :time, :rkey
16
+
validates_length_of :rkey, is: 13
17
+
validates :pin_text, length: { minimum: 0, maximum: 1000, allow_nil: false }
18
+
19
+
belongs_to :user, foreign_key: 'actor_id'
20
+
belongs_to :post, optional: true
21
+
22
+
def self.new_from_record(uri, record)
23
+
return nil unless record['reply'] && record['text'].include?(PIN_SIGN)
24
+
25
+
self.new(
26
+
rkey: AT_URI(uri).rkey,
27
+
time: Time.parse(record['createdAt']),
28
+
post_uri: record['reply']['parent']['uri'],
29
+
pin_text: record['text']
30
+
)
31
+
end
32
+
end
+20
-1
app/models/post.rb
+20
-1
app/models/post.rb
···
6
validates_presence_of :time, :data, :rkey
7
validates :text, length: { minimum: 0, allow_nil: false }
8
9
-
validates_length_of :rkey, maximum: 13
10
validates_length_of :text, maximum: 1000
11
validates_length_of :data, maximum: 10000
12
13
belongs_to :user
14
15
def at_uri
16
"at://#{user.did}/app.bsky.feed.post/#{rkey}"
17
end
18
end
···
6
validates_presence_of :time, :data, :rkey
7
validates :text, length: { minimum: 0, allow_nil: false }
8
9
+
validates_length_of :rkey, is: 13
10
validates_length_of :text, maximum: 1000
11
validates_length_of :data, maximum: 10000
12
13
+
validate :check_null_bytes
14
+
15
belongs_to :user
16
17
+
has_many :likes, dependent: :delete_all
18
+
has_many :reposts, dependent: :delete_all
19
+
has_many :pins, dependent: :delete_all
20
+
has_many :quotes, dependent: :delete_all
21
+
22
+
def self.find_by_at_uri(uri)
23
+
uri = AT_URI(uri) unless uri.is_a?(AT_URI)
24
+
return nil unless uri.is_post?
25
+
26
+
Post.joins(:user).find_by(user: { did: uri.repo }, rkey: uri.rkey)
27
+
end
28
+
29
def at_uri
30
"at://#{user.did}/app.bsky.feed.post/#{rkey}"
31
+
end
32
+
33
+
def check_null_bytes
34
+
# Postgres doesn't allow null bytes in strings
35
+
errors.add(:text, 'must not contain a null byte') if text.include?("\u0000")
36
end
37
end
+39
app/models/quote.rb
+39
app/models/quote.rb
···
···
1
+
require 'active_record'
2
+
require 'time'
3
+
4
+
require_relative '../at_uri'
5
+
require_relative 'post'
6
+
require_relative 'searchable'
7
+
require_relative 'user'
8
+
9
+
class Quote < ActiveRecord::Base
10
+
include Searchable
11
+
include Importable
12
+
13
+
validates_presence_of :time, :rkey
14
+
validates_length_of :rkey, is: 13
15
+
validates :quote_text, length: { minimum: 0, maximum: 1000, allow_nil: false }
16
+
17
+
belongs_to :user, foreign_key: 'actor_id'
18
+
belongs_to :post, optional: true
19
+
20
+
def self.new_from_record(uri, record)
21
+
return nil unless record['embed']
22
+
23
+
quoted_post_uri = case record['embed']['$type']
24
+
when 'app.bsky.embed.record'
25
+
record['embed']['record']['uri']
26
+
when 'app.bsky.embed.recordWithMedia'
27
+
record['embed']['record']['record']['uri']
28
+
else
29
+
return nil
30
+
end
31
+
32
+
self.new(
33
+
rkey: AT_URI(uri).rkey,
34
+
time: Time.parse(record['createdAt']),
35
+
post_uri: quoted_post_uri,
36
+
quote_text: record['text']
37
+
)
38
+
end
39
+
end
+26
app/models/repost.rb
+26
app/models/repost.rb
···
···
1
+
require 'active_record'
2
+
require 'time'
3
+
4
+
require_relative '../at_uri'
5
+
require_relative 'post'
6
+
require_relative 'searchable'
7
+
require_relative 'user'
8
+
9
+
class Repost < ActiveRecord::Base
10
+
include Searchable
11
+
include Importable
12
+
13
+
validates_presence_of :time, :rkey
14
+
validates_length_of :rkey, is: 13
15
+
16
+
belongs_to :user, foreign_key: 'actor_id'
17
+
belongs_to :post, optional: true
18
+
19
+
def self.new_from_record(uri, record)
20
+
self.new(
21
+
rkey: AT_URI(uri).rkey,
22
+
time: Time.parse(record['createdAt']),
23
+
post_uri: record['subject']['uri']
24
+
)
25
+
end
26
+
end
+48
app/models/searchable.rb
+48
app/models/searchable.rb
···
···
1
+
require 'active_support/concern'
2
+
3
+
module Searchable
4
+
extend ActiveSupport::Concern
5
+
6
+
included do
7
+
scope :reverse_chronologically, -> { order(time: :desc, id: :desc) }
8
+
9
+
scope :after_cursor, ->(cursor) {
10
+
return self if cursor.nil?
11
+
12
+
t = arel_table
13
+
14
+
timestamp, id = cursor.split(':')
15
+
time = Time.at(timestamp.to_f)
16
+
17
+
where(
18
+
t[:time].lt(time)
19
+
.or(
20
+
t[:time].eq(time)
21
+
.and(t[:id].lt(id))
22
+
)
23
+
)
24
+
}
25
+
26
+
scope :matching_terms, ->(terms) {
27
+
return self if terms.empty?
28
+
29
+
where(
30
+
(["(posts.text ~* ?)"] * (terms.length)).join(" AND "),
31
+
*(terms.map { |x| "\\y" + x.gsub(/\s+/, "\\s+") + "\\y" })
32
+
)
33
+
}
34
+
35
+
scope :excluding_terms, ->(terms) {
36
+
return self if terms.empty?
37
+
38
+
where(
39
+
(["(posts.text !~* ?)"] * (terms.length)).join(" AND "),
40
+
*(terms.map { |x| "\\y" + x.gsub(/\s+/, "\\s+") + "\\y" })
41
+
)
42
+
}
43
+
44
+
def cursor
45
+
"#{self.time.to_f}:#{self.id}"
46
+
end
47
+
end
48
+
end
+5
app/models/subscription.rb
+5
app/models/subscription.rb
+71
-2
app/models/user.rb
+71
-2
app/models/user.rb
···
1
require 'active_record'
2
3
require_relative 'import'
4
require_relative 'like'
5
require_relative 'post'
6
7
class User < ActiveRecord::Base
8
validates_presence_of :did
9
validates_length_of :did, maximum: 260
10
11
has_many :posts
12
-
has_many :likes, foreign_key: 'actor_id'
13
-
has_one :import
14
end
···
1
require 'active_record'
2
3
require_relative 'import'
4
+
require_relative 'import_job'
5
require_relative 'like'
6
+
require_relative 'quote'
7
+
require_relative 'pin'
8
require_relative 'post'
9
+
require_relative 'repost'
10
+
require_relative 'user_importable'
11
12
class User < ActiveRecord::Base
13
validates_presence_of :did
14
validates_length_of :did, maximum: 260
15
+
validates_format_of :did, with: /\Adid:(plc:[0-9a-z]{24}|web:[0-9a-z\-]+(\.[0-9a-z\-]+)+)\Z/
16
17
has_many :posts
18
+
has_many :imports, dependent: :delete_all
19
+
has_one :import_job, dependent: :delete
20
+
21
+
before_destroy :delete_posts_cascading
22
+
23
+
has_many :likes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
24
+
has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
25
+
has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
26
+
has_many :pins, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
27
+
28
+
def self.active
29
+
self.joins(:imports).distinct
30
+
end
31
+
32
+
def self.with_unfinished_import
33
+
self.where(id: Import.unfinished.select('user_id').distinct)
34
+
.or(self.where(id: Like.in_queue(:import).select('actor_id').distinct))
35
+
.or(self.where(id: Repost.in_queue(:import).select('actor_id').distinct))
36
+
.or(self.where(id: Quote.in_queue(:import).select('actor_id').distinct))
37
+
.or(self.where(id: Pin.in_queue(:import).select('actor_id').distinct))
38
+
end
39
+
40
+
def active?
41
+
imports.exists?
42
+
end
43
+
44
+
def all_pending_items
45
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
46
+
end
47
+
48
+
def all_items_in_queue(queue)
49
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).in_queue(queue).to_a }.reduce(&:+)
50
+
end
51
+
52
+
def imported_until
53
+
import_positions = self.imports.map(&:imported_until)
54
+
55
+
if import_positions.empty? || import_positions.any? { |x| x.nil? }
56
+
nil
57
+
else
58
+
import_positions.sort.last
59
+
end
60
+
end
61
+
62
+
def erase_imports!
63
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).delete_all }
64
+
65
+
self.import_job&.destroy
66
+
self.imports.delete_all
67
+
end
68
+
69
+
def delete_posts_cascading
70
+
posts_subquery = self.posts.select(:id)
71
+
72
+
Like.where(post_id: posts_subquery).delete_all
73
+
Repost.where(post_id: posts_subquery).delete_all
74
+
Pin.where(post_id: posts_subquery).delete_all
75
+
Quote.where(post_id: posts_subquery).delete_all
76
+
77
+
Post.where(user: self).delete_all
78
+
end
79
+
80
+
def to_s
81
+
%(<User id: #{id}, did: "#{did}">)
82
+
end
83
end
+20
app/models/user_importable.rb
+20
app/models/user_importable.rb
···
···
1
+
require_relative '../errors'
2
+
3
+
module UserImportable
4
+
def import_from_record(item_uri, record, **args)
5
+
item = try_build_from_record(item_uri, record)
6
+
return nil if item.nil? || already_imported?(item)
7
+
8
+
item.import_item!(args)
9
+
end
10
+
11
+
def try_build_from_record(item_uri, record)
12
+
self.new_from_record(item_uri, record)
13
+
rescue StandardError
14
+
raise InvalidRecordError
15
+
end
16
+
17
+
def already_imported?(item)
18
+
self.where(rkey: item.rkey).exists?
19
+
end
20
+
end
+125
-22
app/post_downloader.rb
+125
-22
app/post_downloader.rb
···
1
require 'didkit'
2
require 'minisky'
3
4
require_relative 'models/post'
5
require_relative 'models/user'
6
7
class PostDownloader
8
-
attr_accessor :report, :stop_when_empty
9
10
def initialize
11
-
@sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil)
12
13
@total_count = 0
14
@oldest_imported = Time.now
15
end
16
17
def import_from_queue(queue)
18
loop do
19
-
likes = queue.pop_batch
20
21
-
if likes.empty?
22
if @stop_when_empty
23
return
24
else
···
29
30
@report&.update(queue: { length: queue.length })
31
32
-
begin
33
-
response = @sky.get_request('app.bsky.feed.getPosts', { uris: likes.map(&:post_uri) })
34
35
-
response['posts'].each do |data|
36
-
begin
37
-
like = likes.detect { |x| x.post_uri == data['uri'] }
38
-
likes.delete(like)
39
40
-
post = save_post(data['uri'], data['record'])
41
-
update_like(like, post)
42
-
rescue StandardError => e
43
-
puts "Error in PostDownloader: #{like.post_uri}: #{e}"
44
end
45
end
46
-
rescue StandardError => e
47
-
puts "Error in PostDownloader: #{e}"
48
end
49
end
50
end
51
52
def save_post(post_uri, record)
53
-
did, _, rkey = post_uri.split('/')[2..4]
54
55
text = record.delete('text')
56
created = record.delete('createdAt')
57
58
-
author = User.find_or_create_by!(did: did)
59
60
-
Post.create!(
61
user: author,
62
rkey: rkey,
63
time: Time.parse(created),
64
text: text,
65
data: JSON.generate(record)
66
)
67
end
68
69
-
def update_like(like, post)
70
-
like.update!(post: post, post_uri: nil)
71
72
@total_count += 1
73
-
@oldest_imported = [@oldest_imported, like.time].min
74
75
@report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
76
end
77
end
···
1
require 'didkit'
2
require 'minisky'
3
4
+
require_relative 'at_uri'
5
require_relative 'models/post'
6
require_relative 'models/user'
7
8
class PostDownloader
9
+
attr_accessor :report, :logger, :stop_when_empty
10
11
def initialize
12
+
@sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
13
14
@total_count = 0
15
@oldest_imported = Time.now
16
+
@account_status_cache = {}
17
end
18
19
def import_from_queue(queue)
20
loop do
21
+
items = queue.pop_batch
22
23
+
if items.empty?
24
if @stop_when_empty
25
return
26
else
···
31
32
@report&.update(queue: { length: queue.length })
33
34
+
process_items(items)
35
+
end
36
+
end
37
+
38
+
def process_items(items)
39
+
existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a
40
+
41
+
items.dup.each do |item|
42
+
if post = existing_posts.detect { |post| post.at_uri == item.post_uri }
43
+
update_item(item, post)
44
+
items.delete(item)
45
+
end
46
+
end
47
48
+
return if items.empty?
49
+
50
+
begin
51
+
response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq })
52
53
+
response['posts'].each do |data|
54
+
current_items = items.select { |x| x.post_uri == data['uri'] }
55
+
items -= current_items
56
+
57
+
begin
58
+
post = save_post(data['uri'], data['record'])
59
+
60
+
if post.valid?
61
+
current_items.each { |i| update_item(i, post) }
62
+
else
63
+
@logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}"
64
+
current_items.each { |i| invalidate_item(i) }
65
end
66
+
rescue InvalidRecordError => e
67
+
@logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}"
68
+
current_items.each { |i| i.update!(queue: nil) }
69
end
70
end
71
+
72
+
check_missing_items(items)
73
+
rescue StandardError => e
74
+
@logger&.warn "Error in PostDownloader: #{e.class}: #{e}"
75
end
76
end
77
78
def save_post(post_uri, record)
79
+
did, _, rkey = AT_URI(post_uri)
80
+
81
+
begin
82
+
author = User.find_or_create_by!(did: did)
83
+
rescue ActiveRecord::RecordInvalid => e
84
+
raise InvalidRecordError
85
+
end
86
+
87
+
if post = Post.find_by(user: author, rkey: rkey)
88
+
return post
89
+
else
90
+
post = build_post(author, rkey, record)
91
+
post.save
92
+
post
93
+
end
94
+
end
95
96
+
def build_post(author, rkey, record)
97
text = record.delete('text')
98
created = record.delete('createdAt')
99
100
+
record.delete('$type')
101
102
+
Post.new(
103
user: author,
104
rkey: rkey,
105
time: Time.parse(created),
106
text: text,
107
data: JSON.generate(record)
108
)
109
+
rescue StandardError
110
+
raise InvalidRecordError
111
end
112
113
+
def update_item(item, post)
114
+
item.update!(post: post, post_uri: nil, queue: nil)
115
+
116
+
@total_count += 1
117
+
@oldest_imported = [@oldest_imported, item.time].min
118
+
119
+
@report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
120
+
end
121
122
+
def invalidate_item(item)
123
@total_count += 1
124
+
@oldest_imported = [@oldest_imported, item.time].min
125
126
@report&.update(downloader: { downloaded_posts: @total_count, oldest_date: @oldest_imported })
127
+
128
+
item.destroy
129
+
end
130
+
131
+
def check_missing_items(items)
132
+
return if items.empty?
133
+
134
+
dids = items.map { |x| AT_URI(x.post_uri).repo }.uniq
135
+
response = @sky.get_request('app.bsky.actor.getProfiles', { actors: dids })
136
+
active_dids = response['profiles'].map { |x| x['did'] }
137
+
138
+
items.each do |item|
139
+
did = AT_URI(item.post_uri).repo
140
+
did_obj = DID.new(did)
141
+
142
+
if active_dids.include?(did)
143
+
# account exists but post doesn't, delete the post reference
144
+
item.destroy
145
+
else
146
+
begin
147
+
status = if @account_status_cache.has_key?(did) # don't retry if status was nil
148
+
@account_status_cache[did]
149
+
else
150
+
@account_status_cache[did] ||= did_obj.account_status
151
+
end
152
+
153
+
case status
154
+
when :active
155
+
# account is active but wasn't returned in getProfiles, probably was suspended on the AppView
156
+
# puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
157
+
item.destroy
158
+
when nil
159
+
# account was deleted, so all posts were deleted too
160
+
# puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
161
+
item.destroy
162
+
else
163
+
# account is inactive/suspended, but could come back, so leave it for now
164
+
# puts "#{item.post_uri}: account #{did} is inactive: #{status}"
165
+
end
166
+
rescue StandardError => e
167
+
hostname = did_obj.document.pds_host rescue "???"
168
+
@logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
169
+
170
+
# delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
171
+
item.destroy if hostname == 'bsky.social'
172
+
end
173
+
end
174
+
175
+
if !item.destroyed?
176
+
item.update!(queue: nil)
177
+
end
178
+
end
179
end
180
end
+31
app/query_parser.rb
+31
app/query_parser.rb
···
···
1
+
class QueryParser
2
+
attr_reader :terms, :exclusions
3
+
4
+
def initialize(query)
5
+
@terms = []
6
+
@exclusions = []
7
+
8
+
query = query.clone
9
+
10
+
while match = query.match(/\-?".+?"/)
11
+
range = match.begin(0)...match.end(0)
12
+
phrase = query[range]
13
+
query[range] = ' '
14
+
15
+
negative = phrase.start_with?('-')
16
+
phrase = phrase.gsub(/^\-/, '').gsub(/^"|"$/, '').gsub(/[^\w\-]+/, ' ').strip
17
+
18
+
if negative
19
+
@exclusions << phrase
20
+
else
21
+
@terms << phrase
22
+
end
23
+
end
24
+
25
+
terms = query.gsub(/[^\w\-]+/, ' ').strip.split(/ +/)
26
+
negative, positive = terms.partition { |x| x.start_with?('-') }
27
+
28
+
@terms += positive
29
+
@exclusions += negative.map { |x| x[1..-1] }.reject(&:empty?)
30
+
end
31
+
end
+17
app/reports/basic_report.rb
+17
app/reports/basic_report.rb
···
···
1
+
class BasicReport
2
+
def initialize(logger)
3
+
@logger = logger
4
+
end
5
+
6
+
def update(data)
7
+
data.each do |k, v|
8
+
if k == :downloader
9
+
@logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0
10
+
elsif k == :queue
11
+
next
12
+
else
13
+
@logger.info({ k => v}.inspect)
14
+
end
15
+
end
16
+
end
17
+
end
+43
app/reports/console_report.rb
+43
app/reports/console_report.rb
···
···
1
+
class ConsoleReport
2
+
def initialize
3
+
@data = {}
4
+
@start = Time.now
5
+
@mutex = Mutex.new
6
+
end
7
+
8
+
def update(data)
9
+
@mutex.synchronize { deep_merge(@data, data) }
10
+
render
11
+
end
12
+
13
+
def deep_merge(target, updates)
14
+
updates.each do |k, v|
15
+
if v.is_a?(Hash)
16
+
target[k] ||= {}
17
+
deep_merge(target[k], v)
18
+
else
19
+
target[k] = v
20
+
end
21
+
end
22
+
end
23
+
24
+
def render
25
+
print " " * 80 + "\r"
26
+
puts "Elapsed time: #{(Time.now - @start).to_i} s"
27
+
28
+
importers = @data[:importers] || {}
29
+
30
+
importers.each do |name, data|
31
+
print " " * 80 + "\r"
32
+
puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}"
33
+
end
34
+
35
+
print " " * 80 + "\r"
36
+
puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})"
37
+
38
+
print " " * 80 + "\r"
39
+
puts "Queue size: #{@data.dig(:queue, :length) || 0}"
40
+
41
+
print "\e[#{3 + importers.length}A"
42
+
end
43
+
end
+9
app/reports/simple_logger.rb
+9
app/reports/simple_logger.rb
+122
-80
app/server.rb
+122
-80
app/server.rb
···
1
-
require 'base58'
2
-
require 'base64'
3
-
require 'didkit'
4
require 'json'
5
-
require 'jwt'
6
-
require 'openssl'
7
require 'sinatra/base'
8
9
require_relative 'init'
10
-
require_relative 'models/like'
11
require_relative 'models/user'
12
13
class Server < Sinatra::Application
14
-
class PKeyCache
15
-
def self.get(did)
16
-
@cache ||= {}
17
-
@cache[did]
18
-
end
19
-
20
-
def self.set(did, pkey)
21
-
@cache ||= {}
22
-
@cache[did] = pkey
23
-
end
24
-
end
25
-
26
register Sinatra::ActiveRecordExtension
27
-
set :port, 3000
28
29
PAGE_LIMIT = 25
30
-
HOSTNAME = 'lycan.feeds.blue'
31
32
helpers do
33
def json_response(data)
···
40
[status, JSON.generate({ error: name, message: message })]
41
end
42
43
-
def decode_user_from_jwt(auth_header, endpoint)
44
-
return nil unless auth_header.start_with?('Bearer ')
45
46
-
token = auth_header.gsub(/\ABearer /, '')
47
-
data = JSON.parse(Base64.decode64(token.split('.')[1]))
48
-
did = data['iss']
49
-
return nil if data['aud'] != "did:web:#{HOSTNAME}" || data['lxm'] != endpoint
50
51
-
pkey = pkey_for_user(did)
52
53
-
decoded = JWT.decode(token, pkey, true, { algorithm: 'ES256K' })
54
-
decoded[0] && decoded[0]['iss']
55
end
56
57
-
def pkey_for_user(did)
58
-
# I have no idea what this does, but it seems to be working ¯\_(ツ)_/¯
59
60
-
if pkey = PKeyCache.get(did)
61
-
return pkey
62
end
63
64
-
doc = DID.new(did).document.json
65
-
key_obj = (doc['verificationMethod'] || []).detect { |x| x['type'] == 'Multikey' }
66
-
key_multi = key_obj&.dig('publicKeyMultibase')
67
-
return nil unless key_multi
68
69
-
key_decoded = Base58.base58_to_binary(key_multi[1..], :bitcoin)
70
-
comp_key = key_decoded[2..-1]
71
72
-
alg_id = OpenSSL::ASN1::Sequence([
73
-
OpenSSL::ASN1::ObjectId('id-ecPublicKey'),
74
-
OpenSSL::ASN1::ObjectId('secp256k1')
75
-
])
76
77
-
der = OpenSSL::ASN1::Sequence([alg_id, OpenSSL::ASN1::BitString(comp_key)]).to_der
78
-
pkey = OpenSSL::PKey.read(der)
79
80
-
PKeyCache.set(did, pkey)
81
82
-
pkey
83
end
84
-
end
85
86
-
get '/xrpc/blue.feeds.lycan.searchPosts' do
87
-
headers['access-control-allow-origin'] = '*'
88
89
-
if settings.development?
90
-
user = User.find_by(did: params[:user])
91
-
return json_error('UserNotFound', 'Missing "user" parameter') if user.nil?
92
else
93
-
begin
94
-
did = decode_user_from_jwt(env['HTTP_AUTHORIZATION'], 'blue.feeds.lycan.searchPosts')
95
-
rescue StandardError => e
96
-
p e
97
-
end
98
99
-
user = did && User.find_by(did: did)
100
-
return json_error('UserNotFound', 'Missing authentication header') if user.nil?
101
end
102
103
-
if params[:query]
104
-
query = params[:query].gsub('%', "\\%")
105
-
words = query.strip.split(/ +/)
106
107
-
likes = user.likes
108
-
.joins(:post)
109
-
.includes(:post => :user)
110
-
.where(
111
-
(["(text ~* ?)"] * (words.length)).join(" AND "),
112
-
*(words.map { |x| "\\y#{x}\\y" })
113
-
)
114
-
.order('likes.time DESC, likes.id DESC')
115
-
.limit(PAGE_LIMIT)
116
117
-
if params[:cursor]
118
-
timestamp, id = params[:cursor].split(':')
119
-
time = Time.at(timestamp.to_f)
120
-
likes = likes.where("likes.time < ? OR (likes.time = ? AND likes.id < ?)", time, time, id)
121
-
end
122
123
-
post_uris = likes.map(&:post).map(&:at_uri)
124
125
-
json_response(posts: post_uris, cursor: likes.last && "#{likes.last.time.to_f}:#{likes.last.id}")
126
else
127
-
json_error('MissingParameter', 'Missing "query" parameter')
128
end
129
end
130
···
1
require 'json'
2
require 'sinatra/base'
3
4
require_relative 'init'
5
+
require_relative 'authenticator'
6
require_relative 'models/user'
7
+
require_relative 'query_parser'
8
9
class Server < Sinatra::Application
10
register Sinatra::ActiveRecordExtension
11
+
set :port, ENV['PORT'] || 3000
12
13
PAGE_LIMIT = 25
14
+
HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue'
15
16
helpers do
17
def json_response(data)
···
24
[status, JSON.generate({ error: name, message: message })]
25
end
26
27
+
def get_user_did
28
+
if settings.development?
29
+
did = if request.post?
30
+
json = JSON.parse(request.body.read)
31
+
request.body.rewind
32
+
json['user']
33
+
else
34
+
params[:user]
35
+
end
36
37
+
if did
38
+
return did
39
+
else
40
+
halt json_error('AuthMissing', 'Missing "user" parameter', status: 401)
41
+
end
42
+
else
43
+
auth_header = env['HTTP_AUTHORIZATION']
44
+
if auth_header.to_s.strip.empty?
45
+
halt json_error('AuthMissing', 'Missing authentication header', status: 401)
46
+
end
47
48
+
begin
49
+
method = request.path.split('/')[2]
50
+
did = @authenticator.decode_user_from_jwt(auth_header, method)
51
+
rescue StandardError => e
52
+
halt json_error('InvalidToken', e.message, status: 401)
53
+
end
54
55
+
if did
56
+
return did
57
+
else
58
+
halt json_error('InvalidToken', "Invalid JWT token", status: 401)
59
+
end
60
+
end
61
end
62
63
+
def load_user
64
+
did = get_user_did
65
66
+
if user = User.find_by(did: did)
67
+
return user
68
+
else
69
+
halt json_error('AccountNotFound', 'Account not found', status: 401)
70
end
71
+
end
72
+
end
73
74
+
before do
75
+
@authenticator = Authenticator.new(hostname: HOSTNAME)
76
77
+
if settings.development?
78
+
headers['Access-Control-Allow-Origin'] = '*'
79
+
end
80
+
end
81
82
+
get '/' do
83
+
"Awoo 🐺"
84
+
end
85
86
+
get '/xrpc/blue.feeds.lycan.searchPosts' do
87
+
@user = load_user
88
89
+
if params[:query].to_s.strip.empty?
90
+
return json_error('MissingParameter', 'Missing "query" parameter')
91
+
end
92
93
+
if params[:collection].to_s.strip.empty?
94
+
return json_error('MissingParameter', 'Missing "collection" parameter')
95
end
96
+
97
+
query = QueryParser.new(params[:query])
98
99
+
collection = case params[:collection]
100
+
when 'likes' then @user.likes
101
+
when 'pins' then @user.pins
102
+
when 'quotes' then @user.quotes
103
+
when 'reposts' then @user.reposts
104
+
else return json_error('InvalidParameter', 'Invalid search collection')
105
+
end
106
+
107
+
if query.terms.empty?
108
+
return json_response(terms: [], posts: [], cursor: nil)
109
+
end
110
+
111
+
items = collection
112
+
.joins(:post)
113
+
.includes(:post => :user)
114
+
.matching_terms(query.terms)
115
+
.excluding_terms(query.exclusions)
116
+
.reverse_chronologically
117
+
.after_cursor(params[:cursor])
118
+
.limit(PAGE_LIMIT)
119
120
+
post_uris = case params[:collection]
121
+
when 'quotes'
122
+
items.map { |x| "at://#{@user.did}/app.bsky.feed.post/#{x.rkey}" }
123
else
124
+
items.map(&:post).map(&:at_uri)
125
+
end
126
+
127
+
json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor)
128
+
end
129
130
+
if settings.development?
131
+
options '/xrpc/blue.feeds.lycan.startImport' do
132
+
headers['Access-Control-Allow-Origin'] = '*'
133
+
headers['Access-Control-Allow-Headers'] = 'content-type'
134
end
135
+
end
136
137
+
post '/xrpc/blue.feeds.lycan.startImport' do
138
+
did = get_user_did
139
+
user = User.find_or_create_by(did: did)
140
141
+
if !user.valid?
142
+
json_error('InvalidRequest', 'Invalid DID')
143
+
elsif user.import_job || user.active?
144
+
json_response(message: "Import has already started")
145
+
else
146
+
user.create_import_job!
147
148
+
status 202
149
+
json_response(message: "Import has been scheduled")
150
+
end
151
+
end
152
153
+
get '/xrpc/blue.feeds.lycan.getImportStatus' do
154
+
did = get_user_did
155
+
user = User.find_by(did: did)
156
+
until_date = user&.imported_until
157
158
+
case until_date
159
+
when nil
160
+
if user.import_job || user.imports.exists?
161
+
json_response(status: 'scheduled')
162
+
else
163
+
json_response(status: 'not_started')
164
+
end
165
+
when Import::IMPORT_END
166
+
json_response(status: 'finished')
167
else
168
+
progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
169
+
json_response(status: 'in_progress', position: until_date.iso8601, progress: progress)
170
end
171
end
172
+56
bin/firehose
+56
bin/firehose
···
···
1
+
#!/usr/bin/env ruby
2
+
3
+
$LOAD_PATH.unshift(File.expand_path('..', __dir__))
4
+
5
+
require 'bundler/setup'
6
+
require 'app/firehose_client'
7
+
8
+
$stdout.sync = true
9
+
10
+
if ENV['ARLOG'] == '1'
11
+
ActiveRecord::Base.logger = Logger.new(STDOUT)
12
+
else
13
+
ActiveRecord::Base.logger = nil
14
+
end
15
+
16
+
def print_help
17
+
puts "Usage: #{$0} [options...]"
18
+
puts "Options:"
19
+
puts " -r12345 = start from cursor 12345"
20
+
end
21
+
22
+
firehose = FirehoseClient.new
23
+
24
+
args = ARGV.dup
25
+
26
+
while arg = args.shift
27
+
case arg
28
+
when /^\-r(\d+)$/
29
+
firehose.start_cursor = $1.to_i
30
+
when '-h', '--help'
31
+
print_help
32
+
exit 0
33
+
else
34
+
puts "Unrecognized option: #{arg}"
35
+
print_help
36
+
exit 1
37
+
end
38
+
end
39
+
40
+
trap("SIGINT") {
41
+
firehose.log "Stopping..."
42
+
43
+
EM.add_timer(0) {
44
+
firehose.stop
45
+
}
46
+
}
47
+
48
+
trap("SIGTERM") {
49
+
firehose.log "Shutting down the service..."
50
+
51
+
EM.add_timer(0) {
52
+
firehose.stop
53
+
}
54
+
}
55
+
56
+
firehose.start
+1
bin/server
+1
bin/server
+53
bin/worker
+53
bin/worker
···
···
1
+
#!/usr/bin/env ruby
2
+
3
+
require 'bundler/setup'
4
+
require_relative '../app/import_worker'
5
+
require_relative '../app/reports/simple_logger'
6
+
7
+
$stdout.sync = true
8
+
9
+
if ENV['ARLOG'] == '1'
10
+
ActiveRecord::Base.logger = Logger.new(STDOUT)
11
+
else
12
+
ActiveRecord::Base.logger = nil
13
+
end
14
+
15
+
def print_help
16
+
puts "Usage: #{$0} [options...]"
17
+
puts "Options:"
18
+
puts " -v = verbose"
19
+
end
20
+
21
+
logger = SimpleLogger.new
22
+
worker = ImportWorker.new
23
+
worker.logger = logger
24
+
25
+
args = ARGV.dup
26
+
27
+
while arg = args.shift
28
+
case arg
29
+
when '-v', '--verbose'
30
+
worker.verbose = true
31
+
when '-h', '--help'
32
+
print_help
33
+
exit 0
34
+
else
35
+
puts "Unrecognized option: #{arg}"
36
+
print_help
37
+
exit 1
38
+
end
39
+
end
40
+
41
+
trap("SIGINT") {
42
+
puts
43
+
puts "[#{Time.now}] Stopping..."
44
+
exit
45
+
}
46
+
47
+
trap("SIGTERM") {
48
+
puts "[#{Time.now}] Shutting down the service..."
49
+
exit
50
+
}
51
+
52
+
puts "[#{Time.now}] Starting background worker..."
53
+
worker.run
+5
config/deploy.rb
+5
config/deploy.rb
···
22
23
after 'deploy', 'deploy:cleanup'
24
after 'deploy:migrations', 'deploy:cleanup'
25
26
namespace :deploy do
27
task :restart, :roles => :web do
···
32
run "cd #{release_path} && bundle config set --local deployment 'true'"
33
run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'"
34
run "cd #{release_path} && bundle config set --local without 'development test'"
35
end
36
end
···
22
23
after 'deploy', 'deploy:cleanup'
24
after 'deploy:migrations', 'deploy:cleanup'
25
+
after 'deploy:update_code', 'deploy:link_shared'
26
27
namespace :deploy do
28
task :restart, :roles => :web do
···
33
run "cd #{release_path} && bundle config set --local deployment 'true'"
34
run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'"
35
run "cd #{release_path} && bundle config set --local without 'development test'"
36
+
end
37
+
38
+
task :link_shared do
39
+
run "ln -s #{shared_path}/env #{release_path}/.env"
40
end
41
end
+22
db/migrate/20250903215014_add_import_collections.rb
+22
db/migrate/20250903215014_add_import_collections.rb
···
···
1
+
class AddImportCollections < ActiveRecord::Migration[7.2]
2
+
def change
3
+
add_column :imports, :collection, :string, limit: 20, null: true
4
+
5
+
reversible do |dir|
6
+
dir.up do
7
+
execute "UPDATE imports SET collection = 'likes'"
8
+
end
9
+
end
10
+
11
+
change_column_null :imports, :collection, false
12
+
13
+
remove_index :imports, :user_id, unique: true
14
+
add_index :imports, [:user_id, :collection], unique: true
15
+
16
+
reversible do |dir|
17
+
dir.down do
18
+
execute "DELETE FROM imports WHERE collection != 'likes'"
19
+
end
20
+
end
21
+
end
22
+
end
+14
db/migrate/20250906005748_add_reposts.rb
+14
db/migrate/20250906005748_add_reposts.rb
···
···
1
+
class AddReposts < ActiveRecord::Migration[7.2]
2
+
def change
3
+
create_table :reposts do |t|
4
+
t.integer "actor_id", null: false
5
+
t.string "rkey", limit: 13, null: false
6
+
t.datetime "time", null: false
7
+
t.bigint "post_id"
8
+
t.string "post_uri"
9
+
end
10
+
11
+
add_index :reposts, [:actor_id, :time, :id], order: { time: :desc, id: :desc }
12
+
add_index :reposts, [:actor_id, :rkey], unique: true
13
+
end
14
+
end
+27
db/migrate/20250906233017_add_quotes_and_pins.rb
+27
db/migrate/20250906233017_add_quotes_and_pins.rb
···
···
1
+
class AddQuotesAndPins < ActiveRecord::Migration[7.2]
2
+
def change
3
+
create_table :quotes do |t|
4
+
t.integer "actor_id", null: false
5
+
t.string "rkey", limit: 13, null: false
6
+
t.datetime "time", null: false
7
+
t.text "quote_text", null: false
8
+
t.bigint "post_id"
9
+
t.string "post_uri"
10
+
end
11
+
12
+
add_index :quotes, [:actor_id, :time, :id], order: { time: :desc, id: :desc }
13
+
add_index :quotes, [:actor_id, :rkey], unique: true
14
+
15
+
create_table :pins do |t|
16
+
t.integer "actor_id", null: false
17
+
t.string "rkey", limit: 13, null: false
18
+
t.datetime "time", null: false
19
+
t.text "pin_text", null: false
20
+
t.bigint "post_id"
21
+
t.string "post_uri"
22
+
end
23
+
24
+
add_index :pins, [:actor_id, :time, :id], order: { time: :desc, id: :desc }
25
+
add_index :pins, [:actor_id, :rkey], unique: true
26
+
end
27
+
end
+8
db/migrate/20250918024627_add_subscriptions.rb
+8
db/migrate/20250918024627_add_subscriptions.rb
+9
db/migrate/20250920182018_add_import_jobs.rb
+9
db/migrate/20250920182018_add_import_jobs.rb
+7
db/migrate/20250923014702_add_queued_field.rb
+7
db/migrate/20250923014702_add_queued_field.rb
+5
db/migrate/20250923180153_add_user_created_at.rb
+5
db/migrate/20250923180153_add_user_created_at.rb
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+51
-2
db/schema.rb
+51
-2
db/schema.rb
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
-
ActiveRecord::Schema[7.2].define(version: 2025_08_31_210930) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
17
create_table "imports", force: :cascade do |t|
18
t.integer "user_id", null: false
19
t.string "cursor"
20
t.datetime "started_from"
21
t.datetime "last_completed"
22
-
t.index ["user_id"], name: "index_imports_on_user_id", unique: true
23
end
24
25
create_table "likes", force: :cascade do |t|
···
28
t.datetime "time", null: false
29
t.bigint "post_id"
30
t.string "post_uri"
31
t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
32
t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
33
end
34
35
create_table "posts", force: :cascade do |t|
36
t.integer "user_id", null: false
37
t.string "rkey", limit: 13, null: false
···
42
t.index ["user_id", "time", "id"], name: "index_posts_on_user_id_and_time_and_id", order: { time: :desc, id: :desc }
43
end
44
45
create_table "users", id: :serial, force: :cascade do |t|
46
t.string "did", limit: 260, null: false
47
t.index ["did"], name: "index_users_on_did", unique: true
48
end
49
end
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
+
ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
+
17
+
create_table "import_jobs", force: :cascade do |t|
18
+
t.integer "user_id", null: false
19
+
t.index ["user_id"], name: "index_import_jobs_on_user_id", unique: true
20
+
end
21
22
create_table "imports", force: :cascade do |t|
23
t.integer "user_id", null: false
24
t.string "cursor"
25
t.datetime "started_from"
26
t.datetime "last_completed"
27
+
t.string "collection", limit: 20, null: false
28
+
t.datetime "fetched_until"
29
+
t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true
30
end
31
32
create_table "likes", force: :cascade do |t|
···
35
t.datetime "time", null: false
36
t.bigint "post_id"
37
t.string "post_uri"
38
+
t.integer "queue", limit: 2
39
t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
40
t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
41
end
42
43
+
create_table "pins", force: :cascade do |t|
44
+
t.integer "actor_id", null: false
45
+
t.string "rkey", limit: 13, null: false
46
+
t.datetime "time", null: false
47
+
t.text "pin_text", null: false
48
+
t.bigint "post_id"
49
+
t.string "post_uri"
50
+
t.integer "queue", limit: 2
51
+
t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true
52
+
t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
53
+
end
54
+
55
create_table "posts", force: :cascade do |t|
56
t.integer "user_id", null: false
57
t.string "rkey", limit: 13, null: false
···
62
t.index ["user_id", "time", "id"], name: "index_posts_on_user_id_and_time_and_id", order: { time: :desc, id: :desc }
63
end
64
65
+
create_table "quotes", force: :cascade do |t|
66
+
t.integer "actor_id", null: false
67
+
t.string "rkey", limit: 13, null: false
68
+
t.datetime "time", null: false
69
+
t.text "quote_text", null: false
70
+
t.bigint "post_id"
71
+
t.string "post_uri"
72
+
t.integer "queue", limit: 2
73
+
t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true
74
+
t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
75
+
end
76
+
77
+
create_table "reposts", force: :cascade do |t|
78
+
t.integer "actor_id", null: false
79
+
t.string "rkey", limit: 13, null: false
80
+
t.datetime "time", null: false
81
+
t.bigint "post_id"
82
+
t.string "post_uri"
83
+
t.integer "queue", limit: 2
84
+
t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true
85
+
t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
86
+
end
87
+
88
+
create_table "subscriptions", force: :cascade do |t|
89
+
t.string "service", null: false
90
+
t.bigint "cursor", null: false
91
+
end
92
+
93
create_table "users", id: :serial, force: :cascade do |t|
94
t.string "did", limit: 260, null: false
95
+
t.datetime "registered_at"
96
t.index ["did"], name: "index_users_on_did", unique: true
97
end
98
end
+32
-46
lib/tasks/import.rake
+32
-46
lib/tasks/import.rake
···
1
-
require_relative '../../app/importer'
2
-
require_relative '../../app/like_queue'
3
require_relative '../../app/post_downloader'
4
5
-
class ImportReport
6
-
def initialize
7
-
@data = {}
8
-
@start = Time.now
9
end
10
11
-
def update(data)
12
-
data.each do |k, v|
13
-
@data[k] ||= {}
14
-
@data[k].update(v)
15
-
end
16
17
-
render
18
-
end
19
-
20
-
def render
21
-
print " " * 80 + "\r"
22
-
puts "Elapsed time: #{(Time.now - @start).to_i} s"
23
-
24
-
print " " * 80 + "\r"
25
-
puts "Importer: imported likes = #{@data.dig(:importer, :imported_likes) || 0} (until: #{@data.dig(:importer, :oldest_date)})" +
26
-
"#{" (DONE)" if @data.dig(:importer, :finished)}"
27
-
28
-
print " " * 80 + "\r"
29
-
puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})"
30
-
31
-
print " " * 80 + "\r"
32
-
puts "Queue size: #{@data.dig(:queue, :length) || 0}"
33
-
34
-
print "\e[4A"
35
end
36
end
37
38
task :import_user do
39
-
unless ENV['USER']
40
-
raise "Required USER parameter missing"
41
end
42
43
-
queue = LikeQueue.new(Like.where(post: nil).to_a)
44
-
report = ImportReport.new
45
46
-
importer = Importer.new(ENV['USER'])
47
-
importer.like_queue = queue
48
-
importer.report = report
49
50
-
downloader = PostDownloader.new
51
-
downloader.report = report
52
-
53
-
download_thread = Thread.new { downloader.import_from_queue(queue) }
54
55
trap("SIGINT") {
56
puts "\n\n\n\n\n"
57
exit
58
}
59
60
-
importer.run_import(ENV['UNTIL'])
61
-
62
-
downloader.stop_when_empty = true
63
-
download_thread.join
64
65
puts "\n\n\n\n\n"
66
end
67
68
task :process_posts do
69
-
queue = LikeQueue.new(Like.where(post: nil).to_a)
70
-
report = ImportReport.new
71
72
trap("SIGINT") {
73
puts "\n\n\n\n\n"
···
1
+
require_relative '../../app/import_manager'
2
+
require_relative '../../app/item_queue'
3
+
require_relative '../../app/models/user'
4
require_relative '../../app/post_downloader'
5
+
require_relative '../../app/reports/console_report'
6
+
require_relative '../../app/reports/simple_logger'
7
8
+
task :enqueue_user do
9
+
unless ENV['DID']
10
+
raise "Required DID parameter missing"
11
end
12
13
+
user = User.find_or_create_by!(did: ENV['DID'])
14
15
+
if user.import_job
16
+
puts "Import for #{user.did} is already scheduled."
17
+
elsif user.active?
18
+
puts "Import for #{user.did} has already started."
19
+
else
20
+
user.create_import_job!
21
+
puts "Import for #{user.did} scheduled ✓"
22
end
23
end
24
25
task :import_user do
26
+
unless ENV['DID']
27
+
raise "Required DID parameter missing"
28
end
29
30
+
user = User.find_or_create_by!(did: ENV['DID'])
31
32
+
unless ENV['COLLECTION']
33
+
raise "Required COLLECTION parameter missing"
34
+
end
35
36
+
import = ImportManager.new(user)
37
+
import.report = ConsoleReport.new
38
+
import.logger = SimpleLogger.new
39
+
import.time_limit = ENV['UNTIL']
40
41
trap("SIGINT") {
42
puts "\n\n\n\n\n"
43
exit
44
}
45
46
+
import.start(ENV['COLLECTION'])
47
48
puts "\n\n\n\n\n"
49
end
50
51
task :process_posts do
52
+
queue = ItemQueue.new(
53
+
[Like, Repost, Quote, Pin].map { |x| x.pending.to_a }.reduce(&:+)
54
+
)
55
+
56
+
report = ConsoleReport.new
57
58
trap("SIGINT") {
59
puts "\n\n\n\n\n"