+10
.env.example
+10
.env.example
···
···
1
+
# hostname on which you're running the Lycan server
2
+
SERVER_HOSTNAME=lycan.feeds.blue
3
+
4
+
# customizing servers we connect to
5
+
RELAY_HOST=bsky.network
6
+
# or: JETSTREAM_HOST=jetstream2.us-east.bsky.network
7
+
APPVIEW_HOST=public.api.bsky.app
8
+
9
+
# put your handle here
10
+
# FIREHOSE_USER_AGENT="Lycan (@my.handle)"
+4
-3
Gemfile
+4
-3
Gemfile
···
2
3
gem 'activerecord', '~> 7.2'
4
gem 'sinatra-activerecord', '~> 2.0'
5
gem 'pg'
6
gem 'rake'
7
gem 'irb'
8
gem 'rainbow'
9
-
10
-
gem 'sinatra'
11
12
gem 'minisky', '~> 0.5'
13
gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit'
···
17
gem 'jwt'
18
19
group :development do
20
-
gem 'puma'
21
gem 'rackup'
22
gem 'capistrano', '~> 2.0'
23
···
2
3
gem 'activerecord', '~> 7.2'
4
gem 'sinatra-activerecord', '~> 2.0'
5
+
gem 'sinatra'
6
gem 'pg'
7
gem 'rake'
8
gem 'irb'
9
+
10
gem 'rainbow'
11
+
gem 'dotenv'
12
13
gem 'minisky', '~> 0.5'
14
gem 'didkit', '~> 0.2', git: 'https://tangled.sh/@mackuba.eu/didkit'
···
18
gem 'jwt'
19
20
group :development do
21
+
gem 'thin'
22
gem 'rackup'
23
gem 'capistrano', '~> 2.0'
24
+41
-34
Gemfile.lock
+41
-34
Gemfile.lock
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
-
activemodel (7.2.2.2)
11
-
activesupport (= 7.2.2.2)
12
-
activerecord (7.2.2.2)
13
-
activemodel (= 7.2.2.2)
14
-
activesupport (= 7.2.2.2)
15
timeout (>= 0.4.0)
16
-
activesupport (7.2.2.2)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
29
base58 (0.2.3)
30
base64 (0.3.0)
31
bcrypt_pbkdf (1.1.1)
32
-
benchmark (0.4.1)
33
-
bigdecimal (3.2.2)
34
capistrano (2.15.11)
35
highline
36
net-scp (>= 1.0.0)
···
39
net-ssh-gateway (>= 1.1.0)
40
cbor (0.5.10.1)
41
concurrent-ruby (1.3.5)
42
-
connection_pool (2.5.3)
43
-
date (3.4.1)
44
drb (2.2.3)
45
ed25519 (1.4.0)
46
-
erb (5.0.2)
47
eventmachine (1.2.7)
48
faye-websocket (0.12.0)
49
eventmachine (>= 0.12.0)
···
53
i18n (1.14.7)
54
concurrent-ruby (~> 1.0)
55
io-console (0.8.1)
56
-
irb (1.15.2)
57
pp (>= 0.6.0)
58
rdoc (>= 4.0.0)
59
reline (>= 0.4.2)
···
62
logger (1.7.0)
63
minisky (0.5.0)
64
base64 (~> 0.1)
65
-
minitest (5.25.5)
66
mustermann (3.0.4)
67
ruby2_keywords (~> 0.0.1)
68
net-scp (4.1.0)
···
72
net-ssh (7.3.0)
73
net-ssh-gateway (2.0.0)
74
net-ssh (>= 4.0.0)
75
-
nio4r (2.7.4)
76
-
pg (1.6.1)
77
-
pg (1.6.1-aarch64-linux)
78
-
pg (1.6.1-aarch64-linux-musl)
79
-
pg (1.6.1-arm64-darwin)
80
-
pg (1.6.1-x86_64-darwin)
81
-
pg (1.6.1-x86_64-linux)
82
-
pg (1.6.1-x86_64-linux-musl)
83
-
pp (0.6.2)
84
prettyprint
85
prettyprint (0.2.0)
86
psych (5.2.6)
87
date
88
stringio
89
-
puma (6.6.1)
90
-
nio4r (~> 2.0)
91
-
rack (3.2.0)
92
-
rack-protection (4.1.1)
93
base64 (>= 0.1.0)
94
logger (>= 1.6.0)
95
rack (>= 3.0.0, < 4)
···
99
rackup (2.2.1)
100
rack (>= 3)
101
rainbow (3.1.1)
102
-
rake (13.3.0)
103
-
rdoc (6.14.2)
104
erb
105
psych (>= 4.0.0)
106
-
reline (0.6.2)
107
io-console (~> 0.5)
108
ruby2_keywords (0.0.5)
109
securerandom (0.4.1)
110
-
sinatra (4.1.1)
111
logger (>= 1.6.0)
112
mustermann (~> 3.0)
113
rack (>= 3.0.0, < 4)
114
-
rack-protection (= 4.1.1)
115
rack-session (>= 2.0.0, < 3)
116
tilt (~> 2.0)
117
sinatra-activerecord (2.0.28)
···
123
cbor (~> 0.5, >= 0.5.9.6)
124
eventmachine (~> 1.2, >= 1.2.7)
125
faye-websocket (~> 0.12)
126
-
stringio (3.1.7)
127
tilt (2.6.1)
128
-
timeout (0.4.3)
129
tzinfo (2.0.6)
130
concurrent-ruby (~> 1.0)
131
websocket-driver (0.8.0)
···
148
bcrypt_pbkdf (>= 1.0, < 2.0)
149
capistrano (~> 2.0)
150
didkit (~> 0.2)!
151
ed25519 (>= 1.2, < 2.0)
152
irb
153
jwt
154
minisky (~> 0.5)
155
pg
156
-
puma
157
rackup
158
rainbow
159
rake
160
sinatra
161
sinatra-activerecord (~> 2.0)
162
skyfall (~> 0.6)
163
164
BUNDLED WITH
165
2.7.0
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
+
activemodel (7.2.3)
11
+
activesupport (= 7.2.3)
12
+
activerecord (7.2.3)
13
+
activemodel (= 7.2.3)
14
+
activesupport (= 7.2.3)
15
timeout (>= 0.4.0)
16
+
activesupport (7.2.3)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
29
base58 (0.2.3)
30
base64 (0.3.0)
31
bcrypt_pbkdf (1.1.1)
32
+
benchmark (0.5.0)
33
+
bigdecimal (3.3.1)
34
capistrano (2.15.11)
35
highline
36
net-scp (>= 1.0.0)
···
39
net-ssh-gateway (>= 1.1.0)
40
cbor (0.5.10.1)
41
concurrent-ruby (1.3.5)
42
+
connection_pool (2.5.4)
43
+
daemons (1.4.1)
44
+
date (3.5.0)
45
+
dotenv (3.1.8)
46
drb (2.2.3)
47
ed25519 (1.4.0)
48
+
erb (6.0.0)
49
eventmachine (1.2.7)
50
faye-websocket (0.12.0)
51
eventmachine (>= 0.12.0)
···
55
i18n (1.14.7)
56
concurrent-ruby (~> 1.0)
57
io-console (0.8.1)
58
+
irb (1.15.3)
59
pp (>= 0.6.0)
60
rdoc (>= 4.0.0)
61
reline (>= 0.4.2)
···
64
logger (1.7.0)
65
minisky (0.5.0)
66
base64 (~> 0.1)
67
+
minitest (5.26.1)
68
mustermann (3.0.4)
69
ruby2_keywords (~> 0.0.1)
70
net-scp (4.1.0)
···
74
net-ssh (7.3.0)
75
net-ssh-gateway (2.0.0)
76
net-ssh (>= 4.0.0)
77
+
pg (1.6.2)
78
+
pg (1.6.2-aarch64-linux)
79
+
pg (1.6.2-aarch64-linux-musl)
80
+
pg (1.6.2-arm64-darwin)
81
+
pg (1.6.2-x86_64-darwin)
82
+
pg (1.6.2-x86_64-linux)
83
+
pg (1.6.2-x86_64-linux-musl)
84
+
pp (0.6.3)
85
prettyprint
86
prettyprint (0.2.0)
87
psych (5.2.6)
88
date
89
stringio
90
+
rack (3.2.4)
91
+
rack-protection (4.2.1)
92
base64 (>= 0.1.0)
93
logger (>= 1.6.0)
94
rack (>= 3.0.0, < 4)
···
98
rackup (2.2.1)
99
rack (>= 3)
100
rainbow (3.1.1)
101
+
rake (13.3.1)
102
+
rdoc (6.15.1)
103
erb
104
psych (>= 4.0.0)
105
+
tsort
106
+
reline (0.6.3)
107
io-console (~> 0.5)
108
ruby2_keywords (0.0.5)
109
securerandom (0.4.1)
110
+
sinatra (4.2.1)
111
logger (>= 1.6.0)
112
mustermann (~> 3.0)
113
rack (>= 3.0.0, < 4)
114
+
rack-protection (= 4.2.1)
115
rack-session (>= 2.0.0, < 3)
116
tilt (~> 2.0)
117
sinatra-activerecord (2.0.28)
···
123
cbor (~> 0.5, >= 0.5.9.6)
124
eventmachine (~> 1.2, >= 1.2.7)
125
faye-websocket (~> 0.12)
126
+
stringio (3.1.8)
127
+
thin (2.0.1)
128
+
daemons (~> 1.0, >= 1.0.9)
129
+
eventmachine (~> 1.0, >= 1.0.4)
130
+
logger
131
+
rack (>= 1, < 4)
132
tilt (2.6.1)
133
+
timeout (0.4.4)
134
+
tsort (0.2.0)
135
tzinfo (2.0.6)
136
concurrent-ruby (~> 1.0)
137
websocket-driver (0.8.0)
···
154
bcrypt_pbkdf (>= 1.0, < 2.0)
155
capistrano (~> 2.0)
156
didkit (~> 0.2)!
157
+
dotenv
158
ed25519 (>= 1.2, < 2.0)
159
irb
160
jwt
161
minisky (~> 0.5)
162
pg
163
rackup
164
rainbow
165
rake
166
sinatra
167
sinatra-activerecord (~> 2.0)
168
skyfall (~> 0.6)
169
+
thin
170
171
BUNDLED WITH
172
2.7.0
+110
README.md
+110
README.md
···
···
1
+
# Lycan 🐺
2
+
3
+
A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive.
4
+
5
+
6
+
## How it works
7
+
8
+
Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose.
9
+
10
+
At the moment, Lycan indexes four types of content:
11
+
12
+
- posts you've liked
13
+
- posts you've reposted
14
+
- posts you've quoted
15
+
- your old-style bookmarks (using the 📌 emoji method)
16
+
17
+
New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added.
18
+
19
+
Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API – the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)).
20
+
21
+
The service consists of three separate components:
22
+
23
+
- a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported
24
+
- a **background worker**, which runs the import process
25
+
- an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying
26
+
27
+
28
+
## Setting up on localhost
29
+
30
+
This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it.
31
+
32
+
A Postgres database is also required (again, any non-ancient version should work).
33
+
34
+
Download or clone the repository, then install the dependencies:
35
+
36
+
```
37
+
bundle install
38
+
```
39
+
40
+
Next, create the database – the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task:
41
+
42
+
```
43
+
bundle exec rake db:create
44
+
```
45
+
46
+
Then, run the migrations:
47
+
48
+
```
49
+
bundle exec rake db:migrate
50
+
```
51
+
52
+
To run an import, you will need to run three separate processes, probably in separate terminal tabs:
53
+
54
+
1) the firehose client, [`bin/firehose`](bin/firehose)
55
+
2) the background worker, [`bin/worker`](bin/worker)
56
+
3) the Sinatra HTTP server, [`bin/server`](bin/server)
57
+
58
+
The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu – but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL.
59
+
60
+
You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.)
61
+
62
+
63
+
## Configuration
64
+
65
+
There's a few things you can configure through ENV variables:
66
+
67
+
- `RELAY_HOST` – hostname of the relay to use for the firehose (default: `bsky.network`)
68
+
- `JETSTREAM_HOST` – alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance
69
+
- `FIREHOSE_USER_AGENT` – when running in production, it's recommended that you set this to some name that identifies who is running the service
70
+
- `APPVIEW_HOST` – hostname of the AppView used to download posts (default: `public.api.bsky.app`)
71
+
- `SERVER_HOSTNAME` – hostname of the server on which you're running the service in production
72
+
73
+
74
+
## Rake tasks
75
+
76
+
Some Rake tasks that might be useful:
77
+
78
+
```
79
+
bundle exec rake enqueue_user DID=did:plc:qweqwe
80
+
```
81
+
82
+
- request an import of the given account (to be handled by firehose + worker)
83
+
84
+
```
85
+
bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all
86
+
```
87
+
88
+
- run a complete import synchronously
89
+
90
+
```
91
+
bundle exec rake process_posts
92
+
```
93
+
94
+
- process all previously queued and unfinished or failed items
95
+
96
+
97
+
## Running in production
98
+
99
+
This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup.
100
+
101
+
On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server – my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose).
102
+
103
+
104
+
## Credits
105
+
106
+
Copyright © 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
107
+
108
+
The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
109
+
110
+
Bug reports and pull requests are welcome 😎
+1
Rakefile
+1
Rakefile
+14
-2
app/authenticator.rb
+14
-2
app/authenticator.rb
···
6
require 'openssl'
7
8
class Authenticator
9
def initialize(hostname:)
10
@@pkey_cache ||= {}
11
@hostname = hostname
···
15
return nil unless auth_header.start_with?('Bearer ')
16
17
token = auth_header.gsub(/\ABearer /, '')
18
-
data = JSON.parse(Base64.decode64(token.split('.')[1]))
19
did = data['iss']
20
-
return nil if data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint
21
22
pkey = pkey_for_user(did)
23
···
6
require 'openssl'
7
8
class Authenticator
9
+
class InvalidTokenError < StandardError
10
+
end
11
+
12
def initialize(hostname:)
13
@@pkey_cache ||= {}
14
@hostname = hostname
···
18
return nil unless auth_header.start_with?('Bearer ')
19
20
token = auth_header.gsub(/\ABearer /, '')
21
+
parts = token.split('.')
22
+
raise InvalidTokenError, "Invalid JWT token" if parts.length != 3
23
+
24
+
begin
25
+
decoded_data = Base64.decode64(parts[1])
26
+
data = JSON.parse(decoded_data)
27
+
rescue StandardError => e
28
+
raise InvalidTokenError, "Invalid JWT token"
29
+
end
30
+
31
did = data['iss']
32
+
return nil if did.nil? || data['aud'] != "did:web:#{@hostname}" || data['lxm'] != endpoint
33
34
pkey = pkey_for_user(did)
35
-42
app/console_report.rb
-42
app/console_report.rb
···
1
-
class ConsoleReport
2
-
def initialize
3
-
@data = {}
4
-
@start = Time.now
5
-
end
6
-
7
-
def update(data)
8
-
deep_merge(@data, data)
9
-
render
10
-
end
11
-
12
-
def deep_merge(target, updates)
13
-
updates.each do |k, v|
14
-
if v.is_a?(Hash)
15
-
target[k] ||= {}
16
-
deep_merge(target[k], v)
17
-
else
18
-
target[k] = v
19
-
end
20
-
end
21
-
end
22
-
23
-
def render
24
-
print " " * 80 + "\r"
25
-
puts "Elapsed time: #{(Time.now - @start).to_i} s"
26
-
27
-
importers = @data[:importers] || {}
28
-
29
-
importers.each do |name, data|
30
-
print " " * 80 + "\r"
31
-
puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}"
32
-
end
33
-
34
-
print " " * 80 + "\r"
35
-
puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})"
36
-
37
-
print " " * 80 + "\r"
38
-
puts "Queue size: #{@data.dig(:queue, :length) || 0}"
39
-
40
-
print "\e[#{3 + importers.length}A"
41
-
end
42
-
end
···
+50
-9
app/firehose_client.rb
+50
-9
app/firehose_client.rb
···
1
require 'skyfall'
2
3
require_relative 'init'
4
require_relative 'models/post'
5
require_relative 'models/subscription'
6
require_relative 'models/user'
···
12
13
def initialize
14
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
15
-
@service = DEFAULT_RELAY
16
end
17
18
def start
···
25
last_cursor = load_or_init_cursor
26
cursor = @start_cursor || last_cursor
27
28
-
@sky = Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
29
-
@sky.user_agent = "Lycan (https://tangled.sh/@mackuba.eu/lycan) #{@sky.version_string}"
30
@sky.check_heartbeat = true
31
32
@sky.on_message do |m|
···
48
@replaying = true
49
@last_update = Time.now
50
51
-
@timer ||= EM::PeriodicTimer.new(20) do
52
now = Time.now
53
diff = now - @last_update
54
55
if diff > 30
56
log "Timer: last update #{sprintf('%.1f', diff)}s ago"
57
end
58
end
59
}
···
129
end
130
end
131
end
132
end
133
134
def process_account_event(msg)
135
if msg.status == :deleted
136
if user = User.find_by(did: msg.repo)
137
user.destroy
138
end
139
end
140
end
···
143
return unless @current_user
144
145
if op.action == :create
146
-
@current_user.likes.import_from_record(op.uri, op.raw_record)
147
elsif op.action == :delete
148
@current_user.likes.where(rkey: op.rkey).delete_all
149
end
150
end
151
152
def process_repost(msg, op)
153
return unless @current_user
154
155
if op.action == :create
156
-
@current_user.reposts.import_from_record(op.uri, op.raw_record)
157
elsif op.action == :delete
158
@current_user.reposts.where(rkey: op.rkey).delete_all
159
end
160
end
161
162
def process_post(msg, op)
163
if op.action == :create
164
if @current_user
165
-
@current_user.quotes.import_from_record(op.uri, op.raw_record)
166
-
@current_user.pins.import_from_record(op.uri, op.raw_record)
167
end
168
elsif op.action == :delete
169
if @current_user
···
175
post.destroy
176
end
177
end
178
end
179
180
def log(text)
···
182
end
183
184
def inspect
185
-
vars = instance_variables - [:@timer]
186
values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
187
"#<#{self.class}:0x#{object_id} #{values}>"
188
end
···
1
require 'skyfall'
2
3
require_relative 'init'
4
+
require_relative 'models/import_job'
5
require_relative 'models/post'
6
require_relative 'models/subscription'
7
require_relative 'models/user'
···
13
14
def initialize
15
@env = (ENV['APP_ENV'] || ENV['RACK_ENV'] || :development).to_sym
16
+
17
+
if ENV['RELAY_HOST']
18
+
@service = ENV['RELAY_HOST']
19
+
elsif ENV['JETSTREAM_HOST']
20
+
@service = ENV['JETSTREAM_HOST']
21
+
@jetstream = true
22
+
else
23
+
@service = DEFAULT_RELAY
24
+
end
25
end
26
27
def start
···
34
last_cursor = load_or_init_cursor
35
cursor = @start_cursor || last_cursor
36
37
+
@sky = if @jetstream
38
+
Skyfall::Jetstream.new(@service, { cursor: cursor, wanted_collections: [:bsky_post, :bsky_like, :bsky_repost] })
39
+
else
40
+
Skyfall::Firehose.new(@service, :subscribe_repos, cursor)
41
+
end
42
+
43
+
@sky.user_agent = (ENV['FIREHOSE_USER_AGENT'] || "Lycan (https://tangled.sh/@mackuba.eu/lycan)") + ' ' + @sky.version_string
44
@sky.check_heartbeat = true
45
46
@sky.on_message do |m|
···
62
@replaying = true
63
@last_update = Time.now
64
65
+
@live_check_timer ||= EM::PeriodicTimer.new(20) do
66
now = Time.now
67
diff = now - @last_update
68
69
if diff > 30
70
log "Timer: last update #{sprintf('%.1f', diff)}s ago"
71
+
end
72
+
end
73
+
74
+
@jobs_timer ||= EM::PeriodicTimer.new(3) do
75
+
ImportJob.all.each do |job|
76
+
@active_users[job.user.did] = job.user
77
+
job.create_imports
78
+
job.destroy
79
end
80
end
81
}
···
151
end
152
end
153
end
154
+
rescue CBOR::UnpackError
155
+
# ignore invalid records
156
end
157
158
def process_account_event(msg)
159
if msg.status == :deleted
160
if user = User.find_by(did: msg.repo)
161
user.destroy
162
+
@active_users.delete_if { |k, u| u.id == user.id }
163
end
164
end
165
end
···
168
return unless @current_user
169
170
if op.action == :create
171
+
return if op.raw_record.nil?
172
+
@current_user.likes.import_from_record(op.uri, op.raw_record, queue: :firehose)
173
elsif op.action == :delete
174
@current_user.likes.where(rkey: op.rkey).delete_all
175
end
176
+
rescue StandardError => e
177
+
log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}"
178
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
179
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
180
end
181
182
def process_repost(msg, op)
183
return unless @current_user
184
185
if op.action == :create
186
+
return if op.raw_record.nil?
187
+
@current_user.reposts.import_from_record(op.uri, op.raw_record, queue: :firehose)
188
elsif op.action == :delete
189
@current_user.reposts.where(rkey: op.rkey).delete_all
190
end
191
+
rescue StandardError => e
192
+
log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}"
193
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
194
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
195
end
196
197
def process_post(msg, op)
198
if op.action == :create
199
+
return if op.raw_record.nil?
200
+
201
if @current_user
202
+
@current_user.quotes.import_from_record(op.uri, op.raw_record, queue: :firehose)
203
+
@current_user.pins.import_from_record(op.uri, op.raw_record, queue: :firehose)
204
end
205
elsif op.action == :delete
206
if @current_user
···
212
post.destroy
213
end
214
end
215
+
rescue StandardError => e
216
+
log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}"
217
+
log e.backtrace.reject { |x| x.include?('/ruby/') }
218
+
sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed)
219
end
220
221
def log(text)
···
223
end
224
225
def inspect
226
+
vars = instance_variables - [:@jobs_timer, :@live_check_timer]
227
values = vars.map { |v| "#{v}=#{instance_variable_get(v).inspect}" }.join(", ")
228
"#<#{self.class}:0x#{object_id} #{values}>"
229
end
+21
-9
app/import_manager.rb
+21
-9
app/import_manager.rb
···
5
require_relative 'post_downloader'
6
7
class ImportManager
8
-
attr_accessor :report, :time_limit
9
10
def initialize(user)
11
@user = user
12
end
13
14
-
def start(sets, include_pending)
15
-
queued_items = []
16
importers = []
17
sets = [sets] unless sets.is_a?(Array)
18
19
sets.each do |set|
20
case set
21
when 'likes'
22
-
queued_items += @user.likes.pending.to_a if include_pending
23
importers << LikesImporter.new(@user)
24
when 'reposts'
25
-
queued_items += @user.reposts.pending.to_a if include_pending
26
importers << RepostsImporter.new(@user)
27
when 'posts'
28
-
queued_items += @user.quotes.pending.to_a + @user.pins.pending.to_a if include_pending
29
importers << PostsImporter.new(@user)
30
when 'all'
31
-
queued_items += @user.all_pending_items if include_pending
32
importers += [
33
LikesImporter.new(@user),
34
RepostsImporter.new(@user),
···
39
end
40
end
41
42
queue = ItemQueue.new(queued_items)
43
44
downloader = PostDownloader.new
45
downloader.report = @report
46
47
-
download_thread = Thread.new { downloader.import_from_queue(queue) }
48
49
import_threads = importers.map do |import|
50
import.item_queue = queue
51
import.report = @report
52
53
-
Thread.new { import.run_import(@time_limit) }
54
end
55
56
import_threads.each { |i| i.join }
57
58
downloader.stop_when_empty = true
59
download_thread.join
···
5
require_relative 'post_downloader'
6
7
class ImportManager
8
+
attr_accessor :report, :time_limit, :logger, :log_status_updates
9
10
def initialize(user)
11
@user = user
12
end
13
14
+
def start(sets)
15
importers = []
16
sets = [sets] unless sets.is_a?(Array)
17
18
sets.each do |set|
19
case set
20
when 'likes'
21
importers << LikesImporter.new(@user)
22
when 'reposts'
23
importers << RepostsImporter.new(@user)
24
when 'posts'
25
importers << PostsImporter.new(@user)
26
when 'all'
27
importers += [
28
LikesImporter.new(@user),
29
RepostsImporter.new(@user),
···
34
end
35
end
36
37
+
queued_items = @user.all_items_in_queue(:import).sort_by(&:time).reverse
38
queue = ItemQueue.new(queued_items)
39
40
downloader = PostDownloader.new
41
downloader.report = @report
42
+
downloader.logger = @logger
43
44
+
download_thread = Thread.new do
45
+
@logger&.info "Starting downloader thread for #{@user}" if @log_status_updates
46
+
47
+
downloader.import_from_queue(queue)
48
+
49
+
@logger&.info "Ended downloader thread for #{@user}" if @log_status_updates
50
+
end
51
52
import_threads = importers.map do |import|
53
import.item_queue = queue
54
import.report = @report
55
+
import.logger = @logger
56
+
57
+
Thread.new do
58
+
@logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates
59
60
+
import.run_import(@time_limit)
61
+
62
+
@logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates
63
+
end
64
end
65
66
import_threads.each { |i| i.join }
67
+
68
+
@logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates
69
70
downloader.stop_when_empty = true
71
download_thread.join
+94
app/import_worker.rb
+94
app/import_worker.rb
···
···
1
+
require 'active_record'
2
+
require 'minisky'
3
+
require 'time'
4
+
5
+
require_relative 'init'
6
+
require_relative 'import_manager'
7
+
require_relative 'models/import'
8
+
require_relative 'post_downloader'
9
+
require_relative 'reports/basic_report'
10
+
11
+
class ImportWorker
12
+
attr_accessor :verbose, :logger
13
+
14
+
class UserThread < Thread
15
+
def initialize(user, collections, logger, verbose = false)
16
+
@user = user
17
+
@verbose = verbose
18
+
@logger = logger
19
+
20
+
super { run(collections) }
21
+
end
22
+
23
+
def user_id
24
+
@user.id
25
+
end
26
+
27
+
def run(collections)
28
+
@logger&.info "Starting import thread for #{@user}"
29
+
30
+
if @user.registered_at.nil?
31
+
registration_time = get_registration_time(@user)
32
+
@user.update!(registered_at: registration_time)
33
+
end
34
+
35
+
import = ImportManager.new(@user)
36
+
37
+
if @logger
38
+
import.report = BasicReport.new(@logger) if @verbose
39
+
import.logger = @logger
40
+
import.log_status_updates = true
41
+
end
42
+
43
+
import.start(collections)
44
+
45
+
@logger&.info "Ended import thread for #{@user}"
46
+
end
47
+
48
+
def get_registration_time(user)
49
+
sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
50
+
profile = sky.get_request('app.bsky.actor.getProfile', { actor: user.did })
51
+
52
+
Time.parse(profile['createdAt'])
53
+
end
54
+
end
55
+
56
+
def run
57
+
@user_threads = []
58
+
59
+
@firehose_thread = Thread.new { process_firehose_items }
60
+
@downloader = PostDownloader.new
61
+
@downloader.logger = @logger
62
+
63
+
loop do
64
+
@user_threads.delete_if { |t| !t.alive? }
65
+
66
+
users = User.with_unfinished_import.where.not(id: @user_threads.map(&:user_id)).to_a
67
+
68
+
users.each do |user|
69
+
collections = user.imports.unfinished.map(&:collection)
70
+
thread = UserThread.new(user, collections, @logger, @verbose)
71
+
@user_threads << thread
72
+
end
73
+
74
+
# possible future enhancement: use LISTEN/UNLISTEN/NOTIFY and wait_for_notify
75
+
sleep 5
76
+
end
77
+
end
78
+
79
+
def process_firehose_items
80
+
loop do
81
+
items = [Like, Repost, Pin, Quote]
82
+
.map { |type| type.in_queue(:firehose).order('time').limit(25) }
83
+
.flatten
84
+
.sort_by(&:time)
85
+
.first(25)
86
+
87
+
if items.length > 0
88
+
@downloader.process_items(items)
89
+
else
90
+
sleep 5
91
+
end
92
+
end
93
+
end
94
+
end
+11
-3
app/importers/base_importer.rb
+11
-3
app/importers/base_importer.rb
···
1
require 'didkit'
2
require 'minisky'
3
4
require_relative '../at_uri'
5
require_relative '../models/post'
6
require_relative '../models/user'
7
8
class BaseImporter
9
-
attr_accessor :item_queue, :report
10
11
def initialize(user)
12
@did = DID.new(user.did)
···
31
end
32
33
@time_limit = requested_time_limit || @import.last_completed
34
-
puts "Fetching until: #{@time_limit}" if @time_limit
35
36
import_items
37
38
@import.update!(last_completed: @import.started_from) unless requested_time_limit
39
-
@import.update!(cursor: nil, started_from: nil)
40
@report&.update(importers: { importer_name => { :finished => true }})
41
end
42
43
def import_items
44
raise NotImplementedError
45
end
46
end
···
1
require 'didkit'
2
require 'minisky'
3
+
require 'time'
4
5
require_relative '../at_uri'
6
+
require_relative '../errors'
7
require_relative '../models/post'
8
require_relative '../models/user'
9
10
class BaseImporter
11
+
attr_accessor :item_queue, :report, :logger
12
13
def initialize(user)
14
@did = DID.new(user.did)
···
33
end
34
35
@time_limit = requested_time_limit || @import.last_completed
36
+
@logger&.info "Fetching until: #{@time_limit}" if @time_limit
37
38
import_items
39
40
@import.update!(last_completed: @import.started_from) unless requested_time_limit
41
+
@import.update!(cursor: nil, started_from: nil, fetched_until: nil)
42
@report&.update(importers: { importer_name => { :finished => true }})
43
end
44
45
def import_items
46
raise NotImplementedError
47
+
end
48
+
49
+
def created_at(record)
50
+
Time.parse(record['createdAt'])
51
+
rescue StandardError
52
+
raise InvalidRecordError
53
end
54
end
+13
-10
app/importers/likes_importer.rb
+13
-10
app/importers/likes_importer.rb
···
1
-
require 'time'
2
require_relative 'base_importer'
3
4
class LikesImporter < BaseImporter
···
11
12
records = response['records']
13
cursor = response['cursor']
14
-
15
-
@imported_count += records.length
16
-
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
17
-
@report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty?
18
19
records.each do |record|
20
begin
21
-
like = @user.likes.import_from_record(record['uri'], record['value'])
22
23
if like && like.pending? && @item_queue
24
@item_queue.push(like)
25
@report&.update(queue: { length: @item_queue.length })
26
end
27
-
rescue StandardError => e
28
-
puts "Error in LikesImporter: #{record['uri']}: #{e}"
29
end
30
end
31
32
params[:cursor] = cursor
33
-
@import.update!(cursor: cursor)
34
35
break if !cursor
36
-
break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit }
37
end
38
end
39
end
···
1
require_relative 'base_importer'
2
3
class LikesImporter < BaseImporter
···
10
11
records = response['records']
12
cursor = response['cursor']
13
+
oldest_date = nil
14
15
records.each do |record|
16
begin
17
+
like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import)
18
+
19
+
record_date = like&.time || created_at(record['value'])
20
+
oldest_date = [oldest_date, record_date].compact.min
21
22
if like && like.pending? && @item_queue
23
@item_queue.push(like)
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
+
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}"
28
end
29
end
30
31
+
@imported_count += records.length
32
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
33
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
+
35
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
38
break if !cursor
39
+
break if @time_limit && oldest_date && oldest_date < @time_limit
40
end
41
end
42
end
+14
-11
app/importers/posts_importer.rb
+14
-11
app/importers/posts_importer.rb
···
1
-
require 'time'
2
require_relative 'base_importer'
3
4
class PostsImporter < BaseImporter
···
11
12
records = response['records']
13
cursor = response['cursor']
14
-
15
-
@imported_count += records.length
16
-
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
17
-
@report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty?
18
19
records.each do |record|
20
begin
21
-
quote = @user.quotes.import_from_record(record['uri'], record['value'])
22
-
pin = @user.pins.import_from_record(record['uri'], record['value'])
23
24
if @item_queue
25
if quote && quote.pending?
···
32
33
@report&.update(queue: { length: @item_queue.length })
34
end
35
-
rescue StandardError => e
36
-
puts "Error in LikesImporter: #{record['uri']}: #{e}"
37
end
38
end
39
40
params[:cursor] = cursor
41
-
@import.update!(cursor: cursor)
42
43
break if !cursor
44
-
break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit }
45
end
46
end
47
end
···
1
require_relative 'base_importer'
2
3
class PostsImporter < BaseImporter
···
10
11
records = response['records']
12
cursor = response['cursor']
13
+
oldest_date = nil
14
15
records.each do |record|
16
begin
17
+
quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import)
18
+
pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import)
19
+
20
+
record_date = quote&.time || pin&.time || created_at(record['value'])
21
+
oldest_date = [oldest_date, record_date].compact.min
22
23
if @item_queue
24
if quote && quote.pending?
···
31
32
@report&.update(queue: { length: @item_queue.length })
33
end
34
+
rescue InvalidRecordError => e
35
+
@logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}"
36
end
37
end
38
39
+
@imported_count += records.length
40
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
41
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
42
+
43
params[:cursor] = cursor
44
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
45
46
break if !cursor
47
+
break if @time_limit && oldest_date && oldest_date < @time_limit
48
end
49
end
50
end
+13
-10
app/importers/reposts_importer.rb
+13
-10
app/importers/reposts_importer.rb
···
1
-
require 'time'
2
require_relative 'base_importer'
3
4
class RepostsImporter < BaseImporter
···
11
12
records = response['records']
13
cursor = response['cursor']
14
-
15
-
@imported_count += records.length
16
-
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
17
-
@report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty?
18
19
records.each do |record|
20
begin
21
-
repost = @user.reposts.import_from_record(record['uri'], record['value'])
22
23
if repost && repost.pending? && @item_queue
24
@item_queue.push(repost)
25
@report&.update(queue: { length: @item_queue.length })
26
end
27
-
rescue StandardError => e
28
-
puts "Error in RepostsImporter: #{record['uri']}: #{e}"
29
end
30
end
31
32
params[:cursor] = cursor
33
-
@import.update!(cursor: cursor)
34
35
break if !cursor
36
-
break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit }
37
end
38
end
39
end
···
1
require_relative 'base_importer'
2
3
class RepostsImporter < BaseImporter
···
10
11
records = response['records']
12
cursor = response['cursor']
13
+
oldest_date = nil
14
15
records.each do |record|
16
begin
17
+
repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import)
18
+
19
+
record_date = repost&.time || created_at(record['value'])
20
+
oldest_date = [oldest_date, record_date].compact.min
21
22
if repost && repost.pending? && @item_queue
23
@item_queue.push(repost)
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
+
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}"
28
end
29
end
30
31
+
@imported_count += records.length
32
+
@report&.update(importers: { importer_name => { :imported_items => @imported_count }})
33
+
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
+
35
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
38
break if !cursor
39
+
break if @time_limit && oldest_date && oldest_date < @time_limit
40
end
41
end
42
end
+1
app/init.rb
+1
app/init.rb
+28
app/models/import.rb
+28
app/models/import.rb
···
7
8
validates_inclusion_of :collection, in: %w(likes reposts posts)
9
validates_uniqueness_of :collection, scope: :user_id
10
+
11
+
scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') }
12
+
13
+
IMPORT_END = Time.at(0)
14
+
15
+
def imported_until
16
+
return nil if cursor.nil? && last_completed.nil?
17
+
18
+
groups = case collection
19
+
when 'likes'
20
+
[:likes]
21
+
when 'reposts'
22
+
[:reposts]
23
+
when 'posts'
24
+
[:pins, :quotes]
25
+
end
26
+
27
+
newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last }
28
+
newest_queued = newest_queued_items.compact.sort_by(&:time).last
29
+
30
+
if newest_queued
31
+
newest_queued.time
32
+
elsif fetched_until
33
+
fetched_until
34
+
else
35
+
IMPORT_END
36
+
end
37
+
end
38
end
+13
app/models/import_job.rb
+13
app/models/import_job.rb
+12
-6
app/models/importable.rb
+12
-6
app/models/importable.rb
···
8
9
included do
10
scope :pending, -> { where(post: nil) }
11
12
def pending?
13
post_uri != nil
14
end
15
16
-
def import_item!
17
post_uri = AT_URI(self.post_uri)
18
return nil if !post_uri.is_post?
19
20
-
if post = Post.find_by_at_uri(post_uri)
21
-
self.post = post
22
-
self.post_uri = nil
23
-
end
24
-
25
self.save!
26
self
27
end
···
8
9
included do
10
scope :pending, -> { where(post: nil) }
11
+
scope :in_queue, ->(q) { where(queue: q) }
12
+
13
+
enum :queue, { firehose: 0, import: 1 }
14
+
15
+
validates_presence_of :post_uri, if: -> { post_id.nil? }
16
+
validate :check_queue
17
18
def pending?
19
post_uri != nil
20
end
21
22
+
def check_queue
23
+
errors.add(:queue, 'must be nil if already processed') if queue && post
24
+
end
25
+
26
+
def import_item!(args = {})
27
post_uri = AT_URI(self.post_uri)
28
return nil if !post_uri.is_post?
29
30
+
self.assign_attributes(args)
31
self.save!
32
self
33
end
-2
app/models/like.rb
-2
app/models/like.rb
-2
app/models/pin.rb
-2
app/models/pin.rb
+7
app/models/post.rb
+7
app/models/post.rb
···
10
validates_length_of :text, maximum: 1000
11
validates_length_of :data, maximum: 10000
12
13
+
validate :check_null_bytes
14
+
15
belongs_to :user
16
17
has_many :likes, dependent: :delete_all
···
28
29
def at_uri
30
"at://#{user.did}/app.bsky.feed.post/#{rkey}"
31
+
end
32
+
33
+
def check_null_bytes
34
+
# Postgres doesn't allow null bytes in strings
35
+
errors.add(:text, 'must not contain a null byte') if text.include?("\u0000")
36
end
37
end
-2
app/models/quote.rb
-2
app/models/quote.rb
-2
app/models/repost.rb
-2
app/models/repost.rb
+38
-28
app/models/user.rb
+38
-28
app/models/user.rb
···
1
require 'active_record'
2
3
require_relative 'import'
4
require_relative 'like'
5
require_relative 'quote'
6
require_relative 'pin'
7
require_relative 'post'
8
require_relative 'repost'
9
10
class User < ActiveRecord::Base
11
validates_presence_of :did
12
validates_length_of :did, maximum: 260
13
14
has_many :posts
15
has_many :imports, dependent: :delete_all
16
17
before_destroy :delete_posts_cascading
18
19
-
has_many :likes, foreign_key: 'actor_id', dependent: :delete_all do
20
-
def import_from_record(like_uri, record)
21
-
like = self.new_from_record(like_uri, record)
22
-
return nil if like.nil? || self.where(rkey: like.rkey).exists?
23
24
-
like.import_item!
25
-
end
26
end
27
28
-
has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all do
29
-
def import_from_record(repost_uri, record)
30
-
repost = self.new_from_record(repost_uri, record)
31
-
return nil if repost.nil? || self.where(rkey: repost.rkey).exists?
32
33
-
repost.import_item!
34
-
end
35
end
36
37
-
has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all do
38
-
def import_from_record(post_uri, record)
39
-
quote = self.new_from_record(post_uri, record)
40
-
return nil if quote.nil? || self.where(rkey: quote.rkey).exists?
41
42
-
quote.import_item!
43
-
end
44
end
45
46
-
has_many :pins, foreign_key: 'actor_id', dependent: :delete_all do
47
-
def import_from_record(post_uri, record)
48
-
pin = self.new_from_record(post_uri, record)
49
-
return nil if pin.nil? || self.where(rkey: pin.rkey).exists?
50
51
-
pin.import_item!
52
end
53
end
54
55
-
def self.active
56
-
self.joins(:imports).distinct
57
-
end
58
59
-
def all_pending_items
60
-
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
61
end
62
63
def delete_posts_cascading
···
69
Quote.where(post_id: posts_subquery).delete_all
70
71
Post.where(user: self).delete_all
72
end
73
end
···
1
require 'active_record'
2
3
require_relative 'import'
4
+
require_relative 'import_job'
5
require_relative 'like'
6
require_relative 'quote'
7
require_relative 'pin'
8
require_relative 'post'
9
require_relative 'repost'
10
+
require_relative 'user_importable'
11
12
class User < ActiveRecord::Base
13
validates_presence_of :did
14
validates_length_of :did, maximum: 260
15
+
validates_format_of :did, with: /\Adid:(plc:[0-9a-z]{24}|web:[0-9a-z\-]+(\.[0-9a-z\-]+)+)\Z/
16
17
has_many :posts
18
has_many :imports, dependent: :delete_all
19
+
has_one :import_job, dependent: :delete
20
21
before_destroy :delete_posts_cascading
22
23
+
has_many :likes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
24
+
has_many :reposts, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
25
+
has_many :quotes, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
26
+
has_many :pins, foreign_key: 'actor_id', dependent: :delete_all, extend: UserImportable
27
28
+
def self.active
29
+
self.joins(:imports).distinct
30
end
31
32
+
def self.with_unfinished_import
33
+
self.where(id: Import.unfinished.select('user_id').distinct)
34
+
.or(self.where(id: Like.in_queue(:import).select('actor_id').distinct))
35
+
.or(self.where(id: Repost.in_queue(:import).select('actor_id').distinct))
36
+
.or(self.where(id: Quote.in_queue(:import).select('actor_id').distinct))
37
+
.or(self.where(id: Pin.in_queue(:import).select('actor_id').distinct))
38
+
end
39
40
+
def active?
41
+
imports.exists?
42
end
43
44
+
def all_pending_items
45
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).pending.to_a }.reduce(&:+)
46
+
end
47
48
+
def all_items_in_queue(queue)
49
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).in_queue(queue).to_a }.reduce(&:+)
50
end
51
52
+
def imported_until
53
+
import_positions = self.imports.map(&:imported_until)
54
55
+
if import_positions.empty? || import_positions.any? { |x| x.nil? }
56
+
nil
57
+
else
58
+
import_positions.sort.last
59
end
60
end
61
62
+
def erase_imports!
63
+
[:likes, :reposts, :quotes, :pins].map { |x| self.send(x).delete_all }
64
65
+
self.import_job&.destroy
66
+
self.imports.delete_all
67
end
68
69
def delete_posts_cascading
···
75
Quote.where(post_id: posts_subquery).delete_all
76
77
Post.where(user: self).delete_all
78
+
end
79
+
80
+
def to_s
81
+
%(<User id: #{id}, did: "#{did}">)
82
end
83
end
+20
app/models/user_importable.rb
+20
app/models/user_importable.rb
···
···
1
+
require_relative '../errors'
2
+
3
+
module UserImportable
4
+
def import_from_record(item_uri, record, **args)
5
+
item = try_build_from_record(item_uri, record)
6
+
return nil if item.nil? || already_imported?(item)
7
+
8
+
item.import_item!(args)
9
+
end
10
+
11
+
def try_build_from_record(item_uri, record)
12
+
self.new_from_record(item_uri, record)
13
+
rescue StandardError
14
+
raise InvalidRecordError
15
+
end
16
+
17
+
def already_imported?(item)
18
+
self.where(rkey: item.rkey).exists?
19
+
end
20
+
end
+62
-35
app/post_downloader.rb
+62
-35
app/post_downloader.rb
···
6
require_relative 'models/user'
7
8
class PostDownloader
9
-
attr_accessor :report, :stop_when_empty
10
11
def initialize
12
-
@sky = Minisky.new(ENV['APPVIEW'] || 'public.api.bsky.app', nil)
13
14
@total_count = 0
15
@oldest_imported = Time.now
···
31
32
@report&.update(queue: { length: queue.length })
33
34
-
existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a
35
36
-
items.dup.each do |item|
37
-
if post = existing_posts.detect { |post| post.at_uri == item.post_uri }
38
-
update_item(item, post)
39
-
items.delete(item)
40
-
end
41
end
42
43
-
next if items.empty?
44
45
-
begin
46
-
response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq })
47
48
-
response['posts'].each do |data|
49
-
begin
50
-
item = items.detect { |x| x.post_uri == data['uri'] }
51
-
items.delete(item)
52
53
-
post = save_post(data['uri'], data['record'])
54
55
-
if post.valid?
56
-
update_item(item, post)
57
-
else
58
-
puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}"
59
-
invalidate_item(item)
60
-
end
61
-
rescue StandardError => e
62
-
puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}"
63
end
64
end
65
-
66
-
check_missing_items(items)
67
-
rescue StandardError => e
68
-
puts "Error in PostDownloader: #{e.class}: #{e}"
69
end
70
end
71
end
72
73
def save_post(post_uri, record)
74
did, _, rkey = AT_URI(post_uri)
75
76
text = record.delete('text')
77
created = record.delete('createdAt')
78
79
-
author = User.find_or_create_by!(did: did)
80
81
-
Post.create(
82
user: author,
83
rkey: rkey,
84
time: Time.parse(created),
85
text: text,
86
data: JSON.generate(record)
87
)
88
end
89
90
def update_item(item, post)
91
-
item.update!(post: post, post_uri: nil)
92
93
@total_count += 1
94
@oldest_imported = [@oldest_imported, item.time].min
···
130
case status
131
when :active
132
# account is active but wasn't returned in getProfiles, probably was suspended on the AppView
133
-
puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
134
item.destroy
135
when nil
136
# account was deleted, so all posts were deleted too
137
-
puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
138
item.destroy
139
else
140
# account is inactive/suspended, but could come back, so leave it for now
141
-
puts "#{item.post_uri}: account #{did} is inactive: #{status}"
142
end
143
rescue StandardError => e
144
hostname = did_obj.document.pds_host rescue "???"
145
-
puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
146
147
# delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
148
item.destroy if hostname == 'bsky.social'
149
end
150
end
151
end
152
end
···
6
require_relative 'models/user'
7
8
class PostDownloader
9
+
attr_accessor :report, :logger, :stop_when_empty
10
11
def initialize
12
+
@sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
13
14
@total_count = 0
15
@oldest_imported = Time.now
···
31
32
@report&.update(queue: { length: queue.length })
33
34
+
process_items(items)
35
+
end
36
+
end
37
+
38
+
def process_items(items)
39
+
existing_posts = Post.where(rkey: items.map { |x| AT_URI(x.post_uri).rkey }).to_a
40
41
+
items.dup.each do |item|
42
+
if post = existing_posts.detect { |post| post.at_uri == item.post_uri }
43
+
update_item(item, post)
44
+
items.delete(item)
45
end
46
+
end
47
48
+
return if items.empty?
49
50
+
begin
51
+
response = @sky.get_request('app.bsky.feed.getPosts', { uris: items.map(&:post_uri).uniq })
52
53
+
response['posts'].each do |data|
54
+
current_items = items.select { |x| x.post_uri == data['uri'] }
55
+
items -= current_items
56
57
+
begin
58
+
post = save_post(data['uri'], data['record'])
59
60
+
if post.valid?
61
+
current_items.each { |i| update_item(i, post) }
62
+
else
63
+
@logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}"
64
+
current_items.each { |i| invalidate_item(i) }
65
end
66
+
rescue InvalidRecordError => e
67
+
@logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}"
68
+
current_items.each { |i| i.update!(queue: nil) }
69
end
70
end
71
+
72
+
check_missing_items(items)
73
+
rescue StandardError => e
74
+
@logger&.warn "Error in PostDownloader: #{e.class}: #{e}"
75
end
76
end
77
78
def save_post(post_uri, record)
79
did, _, rkey = AT_URI(post_uri)
80
81
+
begin
82
+
author = User.find_or_create_by!(did: did)
83
+
rescue ActiveRecord::RecordInvalid => e
84
+
raise InvalidRecordError
85
+
end
86
+
87
+
if post = Post.find_by(user: author, rkey: rkey)
88
+
return post
89
+
else
90
+
post = build_post(author, rkey, record)
91
+
post.save
92
+
post
93
+
end
94
+
end
95
+
96
+
def build_post(author, rkey, record)
97
text = record.delete('text')
98
created = record.delete('createdAt')
99
100
+
record.delete('$type')
101
102
+
Post.new(
103
user: author,
104
rkey: rkey,
105
time: Time.parse(created),
106
text: text,
107
data: JSON.generate(record)
108
)
109
+
rescue StandardError
110
+
raise InvalidRecordError
111
end
112
113
def update_item(item, post)
114
+
item.update!(post: post, post_uri: nil, queue: nil)
115
116
@total_count += 1
117
@oldest_imported = [@oldest_imported, item.time].min
···
153
case status
154
when :active
155
# account is active but wasn't returned in getProfiles, probably was suspended on the AppView
156
+
# puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
157
item.destroy
158
when nil
159
# account was deleted, so all posts were deleted too
160
+
# puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
161
item.destroy
162
else
163
# account is inactive/suspended, but could come back, so leave it for now
164
+
# puts "#{item.post_uri}: account #{did} is inactive: #{status}"
165
end
166
rescue StandardError => e
167
hostname = did_obj.document.pds_host rescue "???"
168
+
@logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
169
170
# delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
171
item.destroy if hostname == 'bsky.social'
172
end
173
+
end
174
+
175
+
if !item.destroyed?
176
+
item.update!(queue: nil)
177
end
178
end
179
end
+17
app/reports/basic_report.rb
+17
app/reports/basic_report.rb
···
···
1
+
class BasicReport
2
+
def initialize(logger)
3
+
@logger = logger
4
+
end
5
+
6
+
def update(data)
7
+
data.each do |k, v|
8
+
if k == :downloader
9
+
@logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0
10
+
elsif k == :queue
11
+
next
12
+
else
13
+
@logger.info({ k => v}.inspect)
14
+
end
15
+
end
16
+
end
17
+
end
+43
app/reports/console_report.rb
+43
app/reports/console_report.rb
···
···
1
+
class ConsoleReport
2
+
def initialize
3
+
@data = {}
4
+
@start = Time.now
5
+
@mutex = Mutex.new
6
+
end
7
+
8
+
def update(data)
9
+
@mutex.synchronize { deep_merge(@data, data) }
10
+
render
11
+
end
12
+
13
+
def deep_merge(target, updates)
14
+
updates.each do |k, v|
15
+
if v.is_a?(Hash)
16
+
target[k] ||= {}
17
+
deep_merge(target[k], v)
18
+
else
19
+
target[k] = v
20
+
end
21
+
end
22
+
end
23
+
24
+
def render
25
+
print " " * 80 + "\r"
26
+
puts "Elapsed time: #{(Time.now - @start).to_i} s"
27
+
28
+
importers = @data[:importers] || {}
29
+
30
+
importers.each do |name, data|
31
+
print " " * 80 + "\r"
32
+
puts "#{name}: imported items = #{data[:imported_items] || 0} (until: #{data[:oldest_date]}) #{" (DONE)" if data[:finished]}"
33
+
end
34
+
35
+
print " " * 80 + "\r"
36
+
puts "Downloader: imported posts = #{@data.dig(:downloader, :downloaded_posts) || 0} (until: #{@data.dig(:downloader, :oldest_date)})"
37
+
38
+
print " " * 80 + "\r"
39
+
puts "Queue size: #{@data.dig(:queue, :length) || 0}"
40
+
41
+
print "\e[#{3 + importers.length}A"
42
+
end
43
+
end
+9
app/reports/simple_logger.rb
+9
app/reports/simple_logger.rb
+105
-23
app/server.rb
+105
-23
app/server.rb
···
8
9
class Server < Sinatra::Application
10
register Sinatra::ActiveRecordExtension
11
-
set :port, 3000
12
13
PAGE_LIMIT = 25
14
-
HOSTNAME = 'lycan.feeds.blue'
15
16
helpers do
17
def json_response(data)
···
23
content_type :json
24
[status, JSON.generate({ error: name, message: message })]
25
end
26
end
27
28
before do
29
@authenticator = Authenticator.new(hostname: HOSTNAME)
30
-
end
31
-
32
-
get '/xrpc/blue.feeds.lycan.searchPosts' do
33
-
headers['access-control-allow-origin'] = '*'
34
35
if settings.development?
36
-
user = User.find_by(did: params[:user])
37
-
return json_error('UserNotFound', 'Missing "user" parameter') if user.nil?
38
-
else
39
-
begin
40
-
auth_header = env['HTTP_AUTHORIZATION']
41
-
did = @authenticator.decode_user_from_jwt(auth_header, 'blue.feeds.lycan.searchPosts')
42
-
rescue StandardError => e
43
-
p e
44
-
end
45
-
46
-
user = did && User.find_by(did: did)
47
-
return json_error('UserNotFound', 'Missing authentication header') if user.nil?
48
end
49
50
if params[:query].to_s.strip.empty?
51
return json_error('MissingParameter', 'Missing "query" parameter')
···
58
query = QueryParser.new(params[:query])
59
60
collection = case params[:collection]
61
-
when 'likes' then user.likes
62
-
when 'pins' then user.pins
63
-
when 'quotes' then user.quotes
64
-
when 'reposts' then user.reposts
65
else return json_error('InvalidParameter', 'Invalid search collection')
66
end
67
···
80
81
post_uris = case params[:collection]
82
when 'quotes'
83
-
items.map { |x| "at://#{user.did}/app.bsky.feed.post/#{x.rkey}" }
84
else
85
items.map(&:post).map(&:at_uri)
86
end
87
88
json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor)
89
end
90
91
get '/.well-known/did.json' do
···
8
9
class Server < Sinatra::Application
10
register Sinatra::ActiveRecordExtension
11
+
set :port, ENV['PORT'] || 3000
12
13
PAGE_LIMIT = 25
14
+
HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue'
15
16
helpers do
17
def json_response(data)
···
23
content_type :json
24
[status, JSON.generate({ error: name, message: message })]
25
end
26
+
27
+
def get_user_did
28
+
if settings.development?
29
+
did = if request.post?
30
+
json = JSON.parse(request.body.read)
31
+
request.body.rewind
32
+
json['user']
33
+
else
34
+
params[:user]
35
+
end
36
+
37
+
if did
38
+
return did
39
+
else
40
+
halt json_error('AuthMissing', 'Missing "user" parameter', status: 401)
41
+
end
42
+
else
43
+
auth_header = env['HTTP_AUTHORIZATION']
44
+
if auth_header.to_s.strip.empty?
45
+
halt json_error('AuthMissing', 'Missing authentication header', status: 401)
46
+
end
47
+
48
+
begin
49
+
method = request.path.split('/')[2]
50
+
did = @authenticator.decode_user_from_jwt(auth_header, method)
51
+
rescue StandardError => e
52
+
halt json_error('InvalidToken', e.message, status: 401)
53
+
end
54
+
55
+
if did
56
+
return did
57
+
else
58
+
halt json_error('InvalidToken', "Invalid JWT token", status: 401)
59
+
end
60
+
end
61
+
end
62
+
63
+
def load_user
64
+
did = get_user_did
65
+
66
+
if user = User.find_by(did: did)
67
+
return user
68
+
else
69
+
halt json_error('AccountNotFound', 'Account not found', status: 401)
70
+
end
71
+
end
72
end
73
74
before do
75
@authenticator = Authenticator.new(hostname: HOSTNAME)
76
77
if settings.development?
78
+
headers['Access-Control-Allow-Origin'] = '*'
79
end
80
+
end
81
+
82
+
get '/' do
83
+
"Awoo 🐺"
84
+
end
85
+
86
+
get '/xrpc/blue.feeds.lycan.searchPosts' do
87
+
@user = load_user
88
89
if params[:query].to_s.strip.empty?
90
return json_error('MissingParameter', 'Missing "query" parameter')
···
97
query = QueryParser.new(params[:query])
98
99
collection = case params[:collection]
100
+
when 'likes' then @user.likes
101
+
when 'pins' then @user.pins
102
+
when 'quotes' then @user.quotes
103
+
when 'reposts' then @user.reposts
104
else return json_error('InvalidParameter', 'Invalid search collection')
105
end
106
···
119
120
post_uris = case params[:collection]
121
when 'quotes'
122
+
items.map { |x| "at://#{@user.did}/app.bsky.feed.post/#{x.rkey}" }
123
else
124
items.map(&:post).map(&:at_uri)
125
end
126
127
json_response(terms: query.terms, posts: post_uris, cursor: items.last&.cursor)
128
+
end
129
+
130
+
if settings.development?
131
+
options '/xrpc/blue.feeds.lycan.startImport' do
132
+
headers['Access-Control-Allow-Origin'] = '*'
133
+
headers['Access-Control-Allow-Headers'] = 'content-type'
134
+
end
135
+
end
136
+
137
+
post '/xrpc/blue.feeds.lycan.startImport' do
138
+
did = get_user_did
139
+
user = User.find_or_create_by(did: did)
140
+
141
+
if !user.valid?
142
+
json_error('InvalidRequest', 'Invalid DID')
143
+
elsif user.import_job || user.active?
144
+
json_response(message: "Import has already started")
145
+
else
146
+
user.create_import_job!
147
+
148
+
status 202
149
+
json_response(message: "Import has been scheduled")
150
+
end
151
+
end
152
+
153
+
get '/xrpc/blue.feeds.lycan.getImportStatus' do
154
+
did = get_user_did
155
+
user = User.find_by(did: did)
156
+
until_date = user&.imported_until
157
+
158
+
case until_date
159
+
when nil
160
+
if user.import_job || user.imports.exists?
161
+
json_response(status: 'scheduled')
162
+
else
163
+
json_response(status: 'not_started')
164
+
end
165
+
when Import::IMPORT_END
166
+
json_response(status: 'finished')
167
+
else
168
+
progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
169
+
json_response(status: 'in_progress', position: until_date.iso8601, progress: progress)
170
+
end
171
end
172
173
get '/.well-known/did.json' do
+1
bin/server
+1
bin/server
+53
bin/worker
+53
bin/worker
···
···
1
+
#!/usr/bin/env ruby
2
+
3
+
require 'bundler/setup'
4
+
require_relative '../app/import_worker'
5
+
require_relative '../app/reports/simple_logger'
6
+
7
+
$stdout.sync = true
8
+
9
+
if ENV['ARLOG'] == '1'
10
+
ActiveRecord::Base.logger = Logger.new(STDOUT)
11
+
else
12
+
ActiveRecord::Base.logger = nil
13
+
end
14
+
15
+
def print_help
16
+
puts "Usage: #{$0} [options...]"
17
+
puts "Options:"
18
+
puts " -v = verbose"
19
+
end
20
+
21
+
logger = SimpleLogger.new
22
+
worker = ImportWorker.new
23
+
worker.logger = logger
24
+
25
+
args = ARGV.dup
26
+
27
+
while arg = args.shift
28
+
case arg
29
+
when '-v', '--verbose'
30
+
worker.verbose = true
31
+
when '-h', '--help'
32
+
print_help
33
+
exit 0
34
+
else
35
+
puts "Unrecognized option: #{arg}"
36
+
print_help
37
+
exit 1
38
+
end
39
+
end
40
+
41
+
trap("SIGINT") {
42
+
puts
43
+
puts "[#{Time.now}] Stopping..."
44
+
exit
45
+
}
46
+
47
+
trap("SIGTERM") {
48
+
puts "[#{Time.now}] Shutting down the service..."
49
+
exit
50
+
}
51
+
52
+
puts "[#{Time.now}] Starting background worker..."
53
+
worker.run
+5
config/deploy.rb
+5
config/deploy.rb
···
22
23
after 'deploy', 'deploy:cleanup'
24
after 'deploy:migrations', 'deploy:cleanup'
25
26
namespace :deploy do
27
task :restart, :roles => :web do
···
32
run "cd #{release_path} && bundle config set --local deployment 'true'"
33
run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'"
34
run "cd #{release_path} && bundle config set --local without 'development test'"
35
end
36
end
···
22
23
after 'deploy', 'deploy:cleanup'
24
after 'deploy:migrations', 'deploy:cleanup'
25
+
after 'deploy:update_code', 'deploy:link_shared'
26
27
namespace :deploy do
28
task :restart, :roles => :web do
···
33
run "cd #{release_path} && bundle config set --local deployment 'true'"
34
run "cd #{release_path} && bundle config set --local path '#{shared_path}/bundle'"
35
run "cd #{release_path} && bundle config set --local without 'development test'"
36
+
end
37
+
38
+
task :link_shared do
39
+
run "ln -s #{shared_path}/env #{release_path}/.env"
40
end
41
end
+9
db/migrate/20250920182018_add_import_jobs.rb
+9
db/migrate/20250920182018_add_import_jobs.rb
+7
db/migrate/20250923014702_add_queued_field.rb
+7
db/migrate/20250923014702_add_queued_field.rb
+5
db/migrate/20250923180153_add_user_created_at.rb
+5
db/migrate/20250923180153_add_user_created_at.rb
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+12
-1
db/schema.rb
+12
-1
db/schema.rb
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
-
ActiveRecord::Schema[7.2].define(version: 2025_09_18_024627) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
17
create_table "imports", force: :cascade do |t|
18
t.integer "user_id", null: false
···
20
t.datetime "started_from"
21
t.datetime "last_completed"
22
t.string "collection", limit: 20, null: false
23
t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true
24
end
25
···
29
t.datetime "time", null: false
30
t.bigint "post_id"
31
t.string "post_uri"
32
t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
33
t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
34
end
···
40
t.text "pin_text", null: false
41
t.bigint "post_id"
42
t.string "post_uri"
43
t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true
44
t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
45
end
···
61
t.text "quote_text", null: false
62
t.bigint "post_id"
63
t.string "post_uri"
64
t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true
65
t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
66
end
···
71
t.datetime "time", null: false
72
t.bigint "post_id"
73
t.string "post_uri"
74
t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true
75
t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
76
end
···
82
83
create_table "users", id: :serial, force: :cascade do |t|
84
t.string "did", limit: 260, null: false
85
t.index ["did"], name: "index_users_on_did", unique: true
86
end
87
end
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
+
ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
+
17
+
create_table "import_jobs", force: :cascade do |t|
18
+
t.integer "user_id", null: false
19
+
t.index ["user_id"], name: "index_import_jobs_on_user_id", unique: true
20
+
end
21
22
create_table "imports", force: :cascade do |t|
23
t.integer "user_id", null: false
···
25
t.datetime "started_from"
26
t.datetime "last_completed"
27
t.string "collection", limit: 20, null: false
28
+
t.datetime "fetched_until"
29
t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true
30
end
31
···
35
t.datetime "time", null: false
36
t.bigint "post_id"
37
t.string "post_uri"
38
+
t.integer "queue", limit: 2
39
t.index ["actor_id", "rkey"], name: "index_likes_on_actor_id_and_rkey", unique: true
40
t.index ["actor_id", "time", "id"], name: "index_likes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
41
end
···
47
t.text "pin_text", null: false
48
t.bigint "post_id"
49
t.string "post_uri"
50
+
t.integer "queue", limit: 2
51
t.index ["actor_id", "rkey"], name: "index_pins_on_actor_id_and_rkey", unique: true
52
t.index ["actor_id", "time", "id"], name: "index_pins_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
53
end
···
69
t.text "quote_text", null: false
70
t.bigint "post_id"
71
t.string "post_uri"
72
+
t.integer "queue", limit: 2
73
t.index ["actor_id", "rkey"], name: "index_quotes_on_actor_id_and_rkey", unique: true
74
t.index ["actor_id", "time", "id"], name: "index_quotes_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
75
end
···
80
t.datetime "time", null: false
81
t.bigint "post_id"
82
t.string "post_uri"
83
+
t.integer "queue", limit: 2
84
t.index ["actor_id", "rkey"], name: "index_reposts_on_actor_id_and_rkey", unique: true
85
t.index ["actor_id", "time", "id"], name: "index_reposts_on_actor_id_and_time_and_id", order: { time: :desc, id: :desc }
86
end
···
92
93
create_table "users", id: :serial, force: :cascade do |t|
94
t.string "did", limit: 260, null: false
95
+
t.datetime "registered_at"
96
t.index ["did"], name: "index_users_on_did", unique: true
97
end
98
end
+24
-6
lib/tasks/import.rake
+24
-6
lib/tasks/import.rake
···
1
-
require_relative '../../app/console_report'
2
require_relative '../../app/import_manager'
3
require_relative '../../app/item_queue'
4
require_relative '../../app/models/user'
5
require_relative '../../app/post_downloader'
6
7
task :import_user do
8
-
unless ENV['USER']
9
-
raise "Required USER parameter missing"
10
end
11
12
-
user = User.find_or_create_by!(did: ENV['USER'])
13
-
pending = !ENV['SKIP_PENDING']
14
15
unless ENV['COLLECTION']
16
raise "Required COLLECTION parameter missing"
···
18
19
import = ImportManager.new(user)
20
import.report = ConsoleReport.new
21
import.time_limit = ENV['UNTIL']
22
23
trap("SIGINT") {
···
25
exit
26
}
27
28
-
import.start(ENV['COLLECTION'], pending)
29
30
puts "\n\n\n\n\n"
31
end
···
1
require_relative '../../app/import_manager'
2
require_relative '../../app/item_queue'
3
require_relative '../../app/models/user'
4
require_relative '../../app/post_downloader'
5
+
require_relative '../../app/reports/console_report'
6
+
require_relative '../../app/reports/simple_logger'
7
+
8
+
task :enqueue_user do
9
+
unless ENV['DID']
10
+
raise "Required DID parameter missing"
11
+
end
12
+
13
+
user = User.find_or_create_by!(did: ENV['DID'])
14
+
15
+
if user.import_job
16
+
puts "Import for #{user.did} is already scheduled."
17
+
elsif user.active?
18
+
puts "Import for #{user.did} has already started."
19
+
else
20
+
user.create_import_job!
21
+
puts "Import for #{user.did} scheduled ✓"
22
+
end
23
+
end
24
25
task :import_user do
26
+
unless ENV['DID']
27
+
raise "Required DID parameter missing"
28
end
29
30
+
user = User.find_or_create_by!(did: ENV['DID'])
31
32
unless ENV['COLLECTION']
33
raise "Required COLLECTION parameter missing"
···
35
36
import = ImportManager.new(user)
37
import.report = ConsoleReport.new
38
+
import.logger = SimpleLogger.new
39
import.time_limit = ENV['UNTIL']
40
41
trap("SIGINT") {
···
43
exit
44
}
45
46
+
import.start(ENV['COLLECTION'])
47
48
puts "\n\n\n\n\n"
49
end