+16
-16
Gemfile.lock
+16
-16
Gemfile.lock
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
-
activemodel (7.2.2.2)
11
-
activesupport (= 7.2.2.2)
12
-
activerecord (7.2.2.2)
13
-
activemodel (= 7.2.2.2)
14
-
activesupport (= 7.2.2.2)
15
timeout (>= 0.4.0)
16
-
activesupport (7.2.2.2)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
41
concurrent-ruby (1.3.5)
42
connection_pool (2.5.4)
43
daemons (1.4.1)
44
-
date (3.4.1)
45
dotenv (3.1.8)
46
drb (2.2.3)
47
ed25519 (1.4.0)
48
-
erb (5.1.1)
49
eventmachine (1.2.7)
50
faye-websocket (0.12.0)
51
eventmachine (>= 0.12.0)
···
55
i18n (1.14.7)
56
concurrent-ruby (~> 1.0)
57
io-console (0.8.1)
58
-
irb (1.15.2)
59
pp (>= 0.6.0)
60
rdoc (>= 4.0.0)
61
reline (>= 0.4.2)
···
64
logger (1.7.0)
65
minisky (0.5.0)
66
base64 (~> 0.1)
67
-
minitest (5.26.0)
68
mustermann (3.0.4)
69
ruby2_keywords (~> 0.0.1)
70
net-scp (4.1.0)
···
87
psych (5.2.6)
88
date
89
stringio
90
-
rack (3.2.3)
91
rack-protection (4.2.1)
92
base64 (>= 0.1.0)
93
logger (>= 1.6.0)
···
98
rackup (2.2.1)
99
rack (>= 3)
100
rainbow (3.1.1)
101
-
rake (13.3.0)
102
-
rdoc (6.15.0)
103
erb
104
psych (>= 4.0.0)
105
tsort
106
-
reline (0.6.2)
107
io-console (~> 0.5)
108
ruby2_keywords (0.0.5)
109
securerandom (0.4.1)
···
123
cbor (~> 0.5, >= 0.5.9.6)
124
eventmachine (~> 1.2, >= 1.2.7)
125
faye-websocket (~> 0.12)
126
-
stringio (3.1.7)
127
thin (2.0.1)
128
daemons (~> 1.0, >= 1.0.9)
129
eventmachine (~> 1.0, >= 1.0.4)
130
logger
131
rack (>= 1, < 4)
132
tilt (2.6.1)
133
-
timeout (0.4.3)
134
tsort (0.2.0)
135
tzinfo (2.0.6)
136
concurrent-ruby (~> 1.0)
···
7
GEM
8
remote: https://rubygems.org/
9
specs:
10
+
activemodel (7.2.3)
11
+
activesupport (= 7.2.3)
12
+
activerecord (7.2.3)
13
+
activemodel (= 7.2.3)
14
+
activesupport (= 7.2.3)
15
timeout (>= 0.4.0)
16
+
activesupport (7.2.3)
17
base64
18
benchmark (>= 0.3)
19
bigdecimal
···
41
concurrent-ruby (1.3.5)
42
connection_pool (2.5.4)
43
daemons (1.4.1)
44
+
date (3.5.0)
45
dotenv (3.1.8)
46
drb (2.2.3)
47
ed25519 (1.4.0)
48
+
erb (6.0.0)
49
eventmachine (1.2.7)
50
faye-websocket (0.12.0)
51
eventmachine (>= 0.12.0)
···
55
i18n (1.14.7)
56
concurrent-ruby (~> 1.0)
57
io-console (0.8.1)
58
+
irb (1.15.3)
59
pp (>= 0.6.0)
60
rdoc (>= 4.0.0)
61
reline (>= 0.4.2)
···
64
logger (1.7.0)
65
minisky (0.5.0)
66
base64 (~> 0.1)
67
+
minitest (5.26.1)
68
mustermann (3.0.4)
69
ruby2_keywords (~> 0.0.1)
70
net-scp (4.1.0)
···
87
psych (5.2.6)
88
date
89
stringio
90
+
rack (3.2.4)
91
rack-protection (4.2.1)
92
base64 (>= 0.1.0)
93
logger (>= 1.6.0)
···
98
rackup (2.2.1)
99
rack (>= 3)
100
rainbow (3.1.1)
101
+
rake (13.3.1)
102
+
rdoc (6.15.1)
103
erb
104
psych (>= 4.0.0)
105
tsort
106
+
reline (0.6.3)
107
io-console (~> 0.5)
108
ruby2_keywords (0.0.5)
109
securerandom (0.4.1)
···
123
cbor (~> 0.5, >= 0.5.9.6)
124
eventmachine (~> 1.2, >= 1.2.7)
125
faye-websocket (~> 0.12)
126
+
stringio (3.1.8)
127
thin (2.0.1)
128
daemons (~> 1.0, >= 1.0.9)
129
eventmachine (~> 1.0, >= 1.0.4)
130
logger
131
rack (>= 1, < 4)
132
tilt (2.6.1)
133
+
timeout (0.4.4)
134
tsort (0.2.0)
135
tzinfo (2.0.6)
136
concurrent-ruby (~> 1.0)
+110
README.md
+110
README.md
···
···
1
+
# Lycan ๐บ
2
+
3
+
A service which downloads and indexes the Bluesky posts you've liked, reposted, quoted or bookmarked, and allows you to search in that archive.
4
+
5
+
6
+
## How it works
7
+
8
+
Lycan is kind of like a tiny specialized AppView, which only indexes some specific things from some specific people. To avoid having to keep a full-network AppView, it only indexes posts and likes on demand from people who request to use it. So the first time you want to use it, you need to ask it to run an import process, which can take anything between a few minutes and an hour, depending on how much data there is to download. After that, new likes are being indexed live from the firehose.
9
+
10
+
At the moment, Lycan indexes four types of content:
11
+
12
+
- posts you've liked
13
+
- posts you've reposted
14
+
- posts you've quoted
15
+
- your old-style bookmarks (using the ๐ emoji method)
16
+
17
+
New bookmarks are private data, so at the moment they can't be imported until support for OAuth is added.
18
+
19
+
Lycan is written in Ruby, using Sinatra and ActiveRecord, with Postgres as the database. The official instance runs at [lycan.feeds.blue](https://lycan.feeds.blue) (this service only implements an XRPC API โ the UI is implemented as part of [Skythread](https://skythread.mackuba.eu)).
20
+
21
+
The service consists of three separate components:
22
+
23
+
- a **firehose client**, which streams events from a relay/Jetstream and saves new data for the users whose data is/has been imported
24
+
- a **background worker**, which runs the import process
25
+
- an **HTTP server**, which serves the XRPC endpoints (currently there are 3: `startImport`, `getImportStatus` and `searchPosts`, plus a `did.json`); all the endpoints require service authentication through PDS proxying
26
+
27
+
28
+
## Setting up on localhost
29
+
30
+
This app should run on any somewhat recent version of Ruby, though of course it's recommended to run one that's still getting maintenance updates, ideally the latest one. It's also recommended to install it with [YJIT support](https://shopify.engineering/ruby-yjit-is-production-ready), and on Linux also with [jemalloc](https://scalingo.com/blog/improve-ruby-application-memory-jemalloc). You will probably need to have some familiarity with the Ruby ecosystem in order to set it up and run it.
31
+
32
+
A Postgres database is also required (again, any non-ancient version should work).
33
+
34
+
Download or clone the repository, then install the dependencies:
35
+
36
+
```
37
+
bundle install
38
+
```
39
+
40
+
Next, create the database โ the configuration is defined in [`config/database.yml`](config/database.yml), for development it's `lycan_development`. Create it either manually, or with a rake task:
41
+
42
+
```
43
+
bundle exec rake db:create
44
+
```
45
+
46
+
Then, run the migrations:
47
+
48
+
```
49
+
bundle exec rake db:migrate
50
+
```
51
+
52
+
To run an import, you will need to run three separate processes, probably in separate terminal tabs:
53
+
54
+
1) the firehose client, [`bin/firehose`](bin/firehose)
55
+
2) the background worker, [`bin/worker`](bin/worker)
56
+
3) the Sinatra HTTP server, [`bin/server`](bin/server)
57
+
58
+
The UI can be accessed through Skythread, either on the official site on [skythread.mackuba.eu](https://skythread.mackuba.eu), or a copy you can download [from the repo](https://tangled.org/mackuba.eu/skythread). Log in and open "[Archive search](https://skythread.mackuba.eu/?page=search&mode=likes)" from the account menu โ but importantly, to use the `localhost` Lycan instance, add `&lycan=local` to the URL.
59
+
60
+
You should then be able to start an import from there, and see the worker process printing some logs as it starts to download the data. (The firehose process needs to be running too, because the import job needs to pass through it first.)
61
+
62
+
63
+
## Configuration
64
+
65
+
There's a few things you can configure through ENV variables:
66
+
67
+
- `RELAY_HOST` โ hostname of the relay to use for the firehose (default: `bsky.network`)
68
+
- `JETSTREAM_HOST` โ alternatively, instead of `RELAY_HOST`, set this to a hostname of a [Jetstream](https://github.com/bluesky-social/jetstream) instance
69
+
- `FIREHOSE_USER_AGENT` โ when running in production, it's recommended that you set this to some name that identifies who is running the service
70
+
- `APPVIEW_HOST` โ hostname of the AppView used to download posts (default: `public.api.bsky.app`)
71
+
- `SERVER_HOSTNAME` โ hostname of the server on which you're running the service in production
72
+
73
+
74
+
## Rake tasks
75
+
76
+
Some Rake tasks that might be useful:
77
+
78
+
```
79
+
bundle exec rake enqueue_user DID=did:plc:qweqwe
80
+
```
81
+
82
+
- request an import of the given account (to be handled by firehose + worker)
83
+
84
+
```
85
+
bundle exec rake import_user DID=did:plc:qweqwe COLLECTION=likes/reposts/posts/all
86
+
```
87
+
88
+
- run a complete import synchronously
89
+
90
+
```
91
+
bundle exec rake process_posts
92
+
```
93
+
94
+
- process all previously queued and unfinished or failed items
95
+
96
+
97
+
## Running in production
98
+
99
+
This will probably heavily depend on where and how you prefer to run it, I'm using a Capistrano deploy config in [`config/deploy.rb`](config/deploy.rb) to deploy to a VPS at [lycan.feeds.blue](https://lycan.feeds.blue). To use something like Docker or a service like Fly or Railway, you'll need to adapt the config for your specific setup.
100
+
101
+
On the server, you need to make sure that the firehose & worker processes are always running and are restarted if necessary. One option to do this (which I'm using) may be writing a `systemd` service config file and adding it to `/etc/systemd/system`. To run the HTTP server, you need Nginx/Caddy/Apache and a Ruby app server โ my recommendation is Nginx with either Passenger (runs your app automatically from Nginx) or something like Puma (needs to be started by e.g. systemd like the firehose).
102
+
103
+
104
+
## Credits
105
+
106
+
Copyright ยฉ 2025 Kuba Suder ([@mackuba.eu](https://bsky.app/profile/did:plc:oio4hkxaop4ao4wz2pp3f4cr)).
107
+
108
+
The code is available under the terms of the [zlib license](https://choosealicense.com/licenses/zlib/) (permissive, similar to MIT).
109
+
110
+
Bug reports and pull requests are welcome ๐
+19
-3
app/import_manager.rb
+19
-3
app/import_manager.rb
···
5
require_relative 'post_downloader'
6
7
class ImportManager
8
-
attr_accessor :report, :time_limit
9
10
def initialize(user)
11
@user = user
···
39
40
downloader = PostDownloader.new
41
downloader.report = @report
42
43
-
download_thread = Thread.new { downloader.import_from_queue(queue) }
44
45
import_threads = importers.map do |import|
46
import.item_queue = queue
47
import.report = @report
48
49
-
Thread.new { import.run_import(@time_limit) }
50
end
51
52
import_threads.each { |i| i.join }
53
54
downloader.stop_when_empty = true
55
download_thread.join
···
5
require_relative 'post_downloader'
6
7
class ImportManager
8
+
attr_accessor :report, :time_limit, :logger, :log_status_updates
9
10
def initialize(user)
11
@user = user
···
39
40
downloader = PostDownloader.new
41
downloader.report = @report
42
+
downloader.logger = @logger
43
44
+
download_thread = Thread.new do
45
+
@logger&.info "Starting downloader thread for #{@user}" if @log_status_updates
46
+
47
+
downloader.import_from_queue(queue)
48
+
49
+
@logger&.info "Ended downloader thread for #{@user}" if @log_status_updates
50
+
end
51
52
import_threads = importers.map do |import|
53
import.item_queue = queue
54
import.report = @report
55
+
import.logger = @logger
56
57
+
Thread.new do
58
+
@logger&.info "Starting #{import.class} thread for #{@user}" if @log_status_updates
59
+
60
+
import.run_import(@time_limit)
61
+
62
+
@logger&.info "Ended #{import.class} thread for #{@user}" if @log_status_updates
63
+
end
64
end
65
66
import_threads.each { |i| i.join }
67
+
68
+
@logger&.info "Finished all importer threads for #{@user}, waiting for downloader" if @log_status_updates
69
70
downloader.stop_when_empty = true
71
download_thread.join
+16
-4
app/import_worker.rb
+16
-4
app/import_worker.rb
···
9
require_relative 'reports/basic_report'
10
11
class ImportWorker
12
-
attr_accessor :verbose
13
14
class UserThread < Thread
15
-
def initialize(user, collections, verbose = false)
16
@user = user
17
@verbose = verbose
18
19
super { run(collections) }
20
end
···
24
end
25
26
def run(collections)
27
if @user.registered_at.nil?
28
registration_time = get_registration_time(@user)
29
@user.update!(registered_at: registration_time)
30
end
31
32
import = ImportManager.new(@user)
33
-
import.report = BasicReport.new if @verbose
34
import.start(collections)
35
end
36
37
def get_registration_time(user)
···
47
48
@firehose_thread = Thread.new { process_firehose_items }
49
@downloader = PostDownloader.new
50
51
loop do
52
@user_threads.delete_if { |t| !t.alive? }
···
55
56
users.each do |user|
57
collections = user.imports.unfinished.map(&:collection)
58
-
thread = UserThread.new(user, collections, @verbose)
59
@user_threads << thread
60
end
61
···
9
require_relative 'reports/basic_report'
10
11
class ImportWorker
12
+
attr_accessor :verbose, :logger
13
14
class UserThread < Thread
15
+
def initialize(user, collections, logger, verbose = false)
16
@user = user
17
@verbose = verbose
18
+
@logger = logger
19
20
super { run(collections) }
21
end
···
25
end
26
27
def run(collections)
28
+
@logger&.info "Starting import thread for #{@user}"
29
+
30
if @user.registered_at.nil?
31
registration_time = get_registration_time(@user)
32
@user.update!(registered_at: registration_time)
33
end
34
35
import = ImportManager.new(@user)
36
+
37
+
if @logger
38
+
import.report = BasicReport.new(@logger) if @verbose
39
+
import.logger = @logger
40
+
import.log_status_updates = true
41
+
end
42
+
43
import.start(collections)
44
+
45
+
@logger&.info "Ended import thread for #{@user}"
46
end
47
48
def get_registration_time(user)
···
58
59
@firehose_thread = Thread.new { process_firehose_items }
60
@downloader = PostDownloader.new
61
+
@downloader.logger = @logger
62
63
loop do
64
@user_threads.delete_if { |t| !t.alive? }
···
67
68
users.each do |user|
69
collections = user.imports.unfinished.map(&:collection)
70
+
thread = UserThread.new(user, collections, @logger, @verbose)
71
@user_threads << thread
72
end
73
+3
-3
app/importers/base_importer.rb
+3
-3
app/importers/base_importer.rb
···
8
require_relative '../models/user'
9
10
class BaseImporter
11
-
attr_accessor :item_queue, :report
12
13
def initialize(user)
14
@did = DID.new(user.did)
···
33
end
34
35
@time_limit = requested_time_limit || @import.last_completed
36
-
puts "Fetching until: #{@time_limit}" if @time_limit
37
38
import_items
39
40
@import.update!(last_completed: @import.started_from) unless requested_time_limit
41
-
@import.update!(cursor: nil, started_from: nil)
42
@report&.update(importers: { importer_name => { :finished => true }})
43
end
44
···
8
require_relative '../models/user'
9
10
class BaseImporter
11
+
attr_accessor :item_queue, :report, :logger
12
13
def initialize(user)
14
@did = DID.new(user.did)
···
33
end
34
35
@time_limit = requested_time_limit || @import.last_completed
36
+
@logger&.info "Fetching until: #{@time_limit}" if @time_limit
37
38
import_items
39
40
@import.update!(last_completed: @import.started_from) unless requested_time_limit
41
+
@import.update!(cursor: nil, started_from: nil, fetched_until: nil)
42
@report&.update(importers: { importer_name => { :finished => true }})
43
end
44
+2
-2
app/importers/likes_importer.rb
+2
-2
app/importers/likes_importer.rb
···
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
rescue InvalidRecordError => e
27
-
puts "Error in LikesImporter: #{record['uri']}: #{e}"
28
end
29
end
30
···
33
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
35
params[:cursor] = cursor
36
-
@import.update!(cursor: cursor)
37
38
break if !cursor
39
break if @time_limit && oldest_date && oldest_date < @time_limit
···
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in LikesImporter: #{record['uri']}: #{e}"
28
end
29
end
30
···
33
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
35
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
38
break if !cursor
39
break if @time_limit && oldest_date && oldest_date < @time_limit
+2
-2
app/importers/posts_importer.rb
+2
-2
app/importers/posts_importer.rb
···
32
@report&.update(queue: { length: @item_queue.length })
33
end
34
rescue InvalidRecordError => e
35
-
puts "Error in PostsImporter: #{record['uri']}: #{e}"
36
end
37
end
38
···
41
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
42
43
params[:cursor] = cursor
44
-
@import.update!(cursor: cursor)
45
46
break if !cursor
47
break if @time_limit && oldest_date && oldest_date < @time_limit
···
32
@report&.update(queue: { length: @item_queue.length })
33
end
34
rescue InvalidRecordError => e
35
+
@logger&.warn "Error in PostsImporter: #{record['uri']}: #{e}"
36
end
37
end
38
···
41
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
42
43
params[:cursor] = cursor
44
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
45
46
break if !cursor
47
break if @time_limit && oldest_date && oldest_date < @time_limit
+2
-2
app/importers/reposts_importer.rb
+2
-2
app/importers/reposts_importer.rb
···
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
rescue InvalidRecordError => e
27
-
puts "Error in RepostsImporter: #{record['uri']}: #{e}"
28
end
29
end
30
···
33
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
35
params[:cursor] = cursor
36
-
@import.update!(cursor: cursor)
37
38
break if !cursor
39
break if @time_limit && oldest_date && oldest_date < @time_limit
···
24
@report&.update(queue: { length: @item_queue.length })
25
end
26
rescue InvalidRecordError => e
27
+
@logger&.warn "Error in RepostsImporter: #{record['uri']}: #{e}"
28
end
29
end
30
···
33
@report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date
34
35
params[:cursor] = cursor
36
+
@import.update!(cursor: cursor, fetched_until: oldest_date)
37
38
break if !cursor
39
break if @time_limit && oldest_date && oldest_date < @time_limit
+26
app/models/import.rb
+26
app/models/import.rb
···
9
validates_uniqueness_of :collection, scope: :user_id
10
11
scope :unfinished, -> { where('(started_from IS NOT NULL) OR (last_completed IS NULL)') }
12
+
13
+
IMPORT_END = Time.at(0)
14
+
15
+
def imported_until
16
+
return nil if cursor.nil? && last_completed.nil?
17
+
18
+
groups = case collection
19
+
when 'likes'
20
+
[:likes]
21
+
when 'reposts'
22
+
[:reposts]
23
+
when 'posts'
24
+
[:pins, :quotes]
25
+
end
26
+
27
+
newest_queued_items = groups.map { |g| user.send(g).where(queue: :import).order(:time).last }
28
+
newest_queued = newest_queued_items.compact.sort_by(&:time).last
29
+
30
+
if newest_queued
31
+
newest_queued.time
32
+
elsif fetched_until
33
+
fetched_until
34
+
else
35
+
IMPORT_END
36
+
end
37
+
end
38
end
+7
-16
app/models/user.rb
+7
-16
app/models/user.rb
···
50
end
51
52
def imported_until
53
-
return nil unless self.imports.exists?
54
-
55
-
oldest_imported_items = []
56
-
started = false
57
58
-
[:likes, :reposts, :pins, :quotes].each do |group|
59
-
if self.send(group).where(queue: :import).exists?
60
-
oldest_imported_items << self.send(group).where(queue: nil).order(:time).first
61
-
end
62
-
end
63
-
64
-
earliest_oldest = oldest_imported_items.compact.sort_by(&:time).last
65
-
66
-
if earliest_oldest
67
-
earliest_oldest.time
68
-
elsif self.imports.merge(Import.unfinished).exists?
69
nil
70
else
71
-
:end
72
end
73
end
74
···
88
Quote.where(post_id: posts_subquery).delete_all
89
90
Post.where(user: self).delete_all
91
end
92
end
···
50
end
51
52
def imported_until
53
+
import_positions = self.imports.map(&:imported_until)
54
55
+
if import_positions.empty? || import_positions.any? { |x| x.nil? }
56
nil
57
else
58
+
import_positions.sort.last
59
end
60
end
61
···
75
Quote.where(post_id: posts_subquery).delete_all
76
77
Post.where(user: self).delete_all
78
+
end
79
+
80
+
def to_s
81
+
%(<User id: #{id}, did: "#{did}">)
82
end
83
end
+13
-9
app/post_downloader.rb
+13
-9
app/post_downloader.rb
···
6
require_relative 'models/user'
7
8
class PostDownloader
9
-
attr_accessor :report, :stop_when_empty
10
11
def initialize
12
@sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
···
60
if post.valid?
61
current_items.each { |i| update_item(i, post) }
62
else
63
-
puts "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}"
64
current_items.each { |i| invalidate_item(i) }
65
end
66
rescue InvalidRecordError => e
67
-
puts "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}"
68
current_items.each { |i| i.update!(queue: nil) }
69
end
70
end
71
72
check_missing_items(items)
73
rescue StandardError => e
74
-
puts "Error in PostDownloader: #{e.class}: #{e}"
75
end
76
end
77
78
def save_post(post_uri, record)
79
did, _, rkey = AT_URI(post_uri)
80
81
-
author = User.find_or_create_by!(did: did)
82
83
if post = Post.find_by(user: author, rkey: rkey)
84
return post
···
149
case status
150
when :active
151
# account is active but wasn't returned in getProfiles, probably was suspended on the AppView
152
-
puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
153
item.destroy
154
when nil
155
# account was deleted, so all posts were deleted too
156
-
puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
157
item.destroy
158
else
159
# account is inactive/suspended, but could come back, so leave it for now
160
-
puts "#{item.post_uri}: account #{did} is inactive: #{status}"
161
end
162
rescue StandardError => e
163
hostname = did_obj.document.pds_host rescue "???"
164
-
puts "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
165
166
# delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
167
item.destroy if hostname == 'bsky.social'
···
6
require_relative 'models/user'
7
8
class PostDownloader
9
+
attr_accessor :report, :logger, :stop_when_empty
10
11
def initialize
12
@sky = Minisky.new(ENV['APPVIEW_HOST'] || 'public.api.bsky.app', nil)
···
60
if post.valid?
61
current_items.each { |i| update_item(i, post) }
62
else
63
+
@logger&.warn "Invalid post #{data['uri']}: #{post.errors.full_messages.join("; ")}"
64
current_items.each { |i| invalidate_item(i) }
65
end
66
rescue InvalidRecordError => e
67
+
@logger&.warn "Error in PostDownloader: #{data['uri']}: #{e.class}: #{e}"
68
current_items.each { |i| i.update!(queue: nil) }
69
end
70
end
71
72
check_missing_items(items)
73
rescue StandardError => e
74
+
@logger&.warn "Error in PostDownloader: #{e.class}: #{e}"
75
end
76
end
77
78
def save_post(post_uri, record)
79
did, _, rkey = AT_URI(post_uri)
80
81
+
begin
82
+
author = User.find_or_create_by!(did: did)
83
+
rescue ActiveRecord::RecordInvalid => e
84
+
raise InvalidRecordError
85
+
end
86
87
if post = Post.find_by(user: author, rkey: rkey)
88
return post
···
153
case status
154
when :active
155
# account is active but wasn't returned in getProfiles, probably was suspended on the AppView
156
+
# puts "#{item.post_uri}: account #{did} exists on the PDS, account must have been taken down"
157
item.destroy
158
when nil
159
# account was deleted, so all posts were deleted too
160
+
# puts "#{item.post_uri}: account #{did} doesn't exist on the PDS, post must have been deleted"
161
item.destroy
162
else
163
# account is inactive/suspended, but could come back, so leave it for now
164
+
# puts "#{item.post_uri}: account #{did} is inactive: #{status}"
165
end
166
rescue StandardError => e
167
hostname = did_obj.document.pds_host rescue "???"
168
+
@logger&.warn "#{item.post_uri}: couldn't check account status for #{did} on #{hostname}: #{e.class}: #{e}"
169
170
# delete reference if the account's PDS is the old bsky.social (so it must have been deleted pre Nov 2023)
171
item.destroy if hostname == 'bsky.social'
+6
-2
app/reports/basic_report.rb
+6
-2
app/reports/basic_report.rb
···
1
class BasicReport
2
+
def initialize(logger)
3
+
@logger = logger
4
+
end
5
+
6
def update(data)
7
data.each do |k, v|
8
if k == :downloader
9
+
@logger.info({ k => v }.inspect) if v[:downloaded_posts] && v[:downloaded_posts] % 100 == 0
10
elsif k == :queue
11
next
12
else
13
+
@logger.info({ k => v}.inspect)
14
end
15
end
16
end
+9
app/reports/simple_logger.rb
+9
app/reports/simple_logger.rb
+7
-3
app/server.rb
+7
-3
app/server.rb
···
8
9
class Server < Sinatra::Application
10
register Sinatra::ActiveRecordExtension
11
-
set :port, 3000
12
13
PAGE_LIMIT = 25
14
HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue'
···
157
158
case until_date
159
when nil
160
-
json_response(status: 'not_started')
161
-
when :end
162
json_response(status: 'finished')
163
else
164
progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
···
8
9
class Server < Sinatra::Application
10
register Sinatra::ActiveRecordExtension
11
+
set :port, ENV['PORT'] || 3000
12
13
PAGE_LIMIT = 25
14
HOSTNAME = ENV['SERVER_HOSTNAME'] || 'lycan.feeds.blue'
···
157
158
case until_date
159
when nil
160
+
if user.import_job || user.imports.exists?
161
+
json_response(status: 'scheduled')
162
+
else
163
+
json_response(status: 'not_started')
164
+
end
165
+
when Import::IMPORT_END
166
json_response(status: 'finished')
167
else
168
progress = 1 - (until_date - user.registered_at) / (Time.now - user.registered_at)
+7
-2
bin/worker
+7
-2
bin/worker
···
2
3
require 'bundler/setup'
4
require_relative '../app/import_worker'
5
6
$stdout.sync = true
7
···
17
puts " -v = verbose"
18
end
19
20
worker = ImportWorker.new
21
22
args = ARGV.dup
23
···
36
end
37
38
trap("SIGINT") {
39
-
puts "Stopping..."
40
exit
41
}
42
43
trap("SIGTERM") {
44
-
puts "Shutting down the service..."
45
exit
46
}
47
48
worker.run
···
2
3
require 'bundler/setup'
4
require_relative '../app/import_worker'
5
+
require_relative '../app/reports/simple_logger'
6
7
$stdout.sync = true
8
···
18
puts " -v = verbose"
19
end
20
21
+
logger = SimpleLogger.new
22
worker = ImportWorker.new
23
+
worker.logger = logger
24
25
args = ARGV.dup
26
···
39
end
40
41
trap("SIGINT") {
42
+
puts
43
+
puts "[#{Time.now}] Stopping..."
44
exit
45
}
46
47
trap("SIGTERM") {
48
+
puts "[#{Time.now}] Shutting down the service..."
49
exit
50
}
51
52
+
puts "[#{Time.now}] Starting background worker..."
53
worker.run
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+5
db/migrate/20251027134657_add_fetched_until_to_imports.rb
+2
-1
db/schema.rb
+2
-1
db/schema.rb
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
-
ActiveRecord::Schema[7.2].define(version: 2025_09_23_180153) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
···
25
t.datetime "started_from"
26
t.datetime "last_completed"
27
t.string "collection", limit: 20, null: false
28
t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true
29
end
30
···
10
#
11
# It's strongly recommended that you check this file into your version control system.
12
13
+
ActiveRecord::Schema[7.2].define(version: 2025_10_27_134657) do
14
# These are extensions that must be enabled in order to support this database
15
enable_extension "plpgsql"
16
···
25
t.datetime "started_from"
26
t.datetime "last_completed"
27
t.string "collection", limit: 20, null: false
28
+
t.datetime "fetched_until"
29
t.index ["user_id", "collection"], name: "index_imports_on_user_id_and_collection", unique: true
30
end
31
+2
lib/tasks/import.rake
+2
lib/tasks/import.rake
···
3
require_relative '../../app/models/user'
4
require_relative '../../app/post_downloader'
5
require_relative '../../app/reports/console_report'
6
7
task :enqueue_user do
8
unless ENV['DID']
···
34
35
import = ImportManager.new(user)
36
import.report = ConsoleReport.new
37
import.time_limit = ENV['UNTIL']
38
39
trap("SIGINT") {
···
3
require_relative '../../app/models/user'
4
require_relative '../../app/post_downloader'
5
require_relative '../../app/reports/console_report'
6
+
require_relative '../../app/reports/simple_logger'
7
8
task :enqueue_user do
9
unless ENV['DID']
···
35
36
import = ImportManager.new(user)
37
import.report = ConsoleReport.new
38
+
import.logger = SimpleLogger.new
39
import.time_limit = ENV['UNTIL']
40
41
trap("SIGINT") {