Don't forget to lycansubscribe

improved handling of invalid records (hi retr0id 👋)

+2
app/errors.rb
···
··· 1 + class InvalidRecordError < StandardError 2 + end
+12
app/firehose_client.rb
··· 173 elsif op.action == :delete 174 @current_user.likes.where(rkey: op.rkey).delete_all 175 end 176 end 177 178 def process_repost(msg, op) ··· 184 elsif op.action == :delete 185 @current_user.reposts.where(rkey: op.rkey).delete_all 186 end 187 end 188 189 def process_post(msg, op) ··· 204 post.destroy 205 end 206 end 207 end 208 209 def log(text)
··· 173 elsif op.action == :delete 174 @current_user.likes.where(rkey: op.rkey).delete_all 175 end 176 + rescue StandardError => e 177 + log "Error in #process_like (#{msg.seq}, #{op.uri}): #{e}" 178 + log e.backtrace.reject { |x| x.include?('/ruby/') } 179 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 180 end 181 182 def process_repost(msg, op) ··· 188 elsif op.action == :delete 189 @current_user.reposts.where(rkey: op.rkey).delete_all 190 end 191 + rescue StandardError => e 192 + log "Error in #process_repost (#{msg.seq}, #{op.uri}): #{e}" 193 + log e.backtrace.reject { |x| x.include?('/ruby/') } 194 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 195 end 196 197 def process_post(msg, op) ··· 212 post.destroy 213 end 214 end 215 + rescue StandardError => e 216 + log "Error in #process_post (#{msg.seq}, #{op.uri}): #{e}" 217 + log e.backtrace.reject { |x| x.include?('/ruby/') } 218 + sleep 5 if e.is_a?(ActiveRecord::ConnectionFailed) 219 end 220 221 def log(text)
+8
app/importers/base_importer.rb
··· 1 require 'didkit' 2 require 'minisky' 3 4 require_relative '../at_uri' 5 require_relative '../models/post' 6 require_relative '../models/user' 7 ··· 42 43 def import_items 44 raise NotImplementedError 45 end 46 end
··· 1 require 'didkit' 2 require 'minisky' 3 + require 'time' 4 5 require_relative '../at_uri' 6 + require_relative '../errors' 7 require_relative '../models/post' 8 require_relative '../models/user' 9 ··· 44 45 def import_items 46 raise NotImplementedError 47 + end 48 + 49 + def created_at(record) 50 + Time.parse(record['createdAt']) 51 + rescue StandardError 52 + raise InvalidRecordError 53 end 54 end
+10 -7
app/importers/likes_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class LikesImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 22 23 if like && like.pending? && @item_queue 24 @item_queue.push(like) 25 @report&.update(queue: { length: @item_queue.length }) 26 end 27 - rescue StandardError => e 28 puts "Error in LikesImporter: #{record['uri']}: #{e}" 29 end 30 end 31 32 params[:cursor] = cursor 33 @import.update!(cursor: cursor) 34 35 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 37 end 38 end 39 end
··· 1 require_relative 'base_importer' 2 3 class LikesImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 like = @user.likes.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = like&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 22 if like && like.pending? && @item_queue 23 @item_queue.push(like) 24 @report&.update(queue: { length: @item_queue.length }) 25 end 26 + rescue InvalidRecordError => e 27 puts "Error in LikesImporter: #{record['uri']}: #{e}" 28 end 29 end 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 params[:cursor] = cursor 36 @import.update!(cursor: cursor) 37 38 break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 end 41 end 42 end
+10 -7
app/importers/posts_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class PostsImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 22 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 23 24 if @item_queue 25 if quote && quote.pending? ··· 32 33 @report&.update(queue: { length: @item_queue.length }) 34 end 35 - rescue StandardError => e 36 puts "Error in PostsImporter: #{record['uri']}: #{e}" 37 end 38 end 39 40 params[:cursor] = cursor 41 @import.update!(cursor: cursor) 42 43 break if !cursor 44 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 45 end 46 end 47 end
··· 1 require_relative 'base_importer' 2 3 class PostsImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 quote = @user.quotes.import_from_record(record['uri'], record['value'], queue: :import) 18 pin = @user.pins.import_from_record(record['uri'], record['value'], queue: :import) 19 + 20 + record_date = quote&.time || pin&.time || created_at(record['value']) 21 + oldest_date = [oldest_date, record_date].compact.min 22 23 if @item_queue 24 if quote && quote.pending? ··· 31 32 @report&.update(queue: { length: @item_queue.length }) 33 end 34 + rescue InvalidRecordError => e 35 puts "Error in PostsImporter: #{record['uri']}: #{e}" 36 end 37 end 38 39 + @imported_count += records.length 40 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 41 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 42 + 43 params[:cursor] = cursor 44 @import.update!(cursor: cursor) 45 46 break if !cursor 47 + break if @time_limit && oldest_date && oldest_date < @time_limit 48 end 49 end 50 end
+10 -7
app/importers/reposts_importer.rb
··· 1 - require 'time' 2 require_relative 'base_importer' 3 4 class RepostsImporter < BaseImporter ··· 11 12 records = response['records'] 13 cursor = response['cursor'] 14 - 15 - @imported_count += records.length 16 - @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 17 - @report&.update(importers: { importer_name => { :oldest_date => Time.parse(records.last['value']['createdAt']) }}) unless records.empty? 18 19 records.each do |record| 20 begin 21 repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 22 23 if repost && repost.pending? && @item_queue 24 @item_queue.push(repost) 25 @report&.update(queue: { length: @item_queue.length }) 26 end 27 - rescue StandardError => e 28 puts "Error in RepostsImporter: #{record['uri']}: #{e}" 29 end 30 end 31 32 params[:cursor] = cursor 33 @import.update!(cursor: cursor) 34 35 break if !cursor 36 - break if @time_limit && records.any? { |x| Time.parse(x['value']['createdAt']) < @time_limit } 37 end 38 end 39 end
··· 1 require_relative 'base_importer' 2 3 class RepostsImporter < BaseImporter ··· 10 11 records = response['records'] 12 cursor = response['cursor'] 13 + oldest_date = nil 14 15 records.each do |record| 16 begin 17 repost = @user.reposts.import_from_record(record['uri'], record['value'], queue: :import) 18 + 19 + record_date = repost&.time || created_at(record['value']) 20 + oldest_date = [oldest_date, record_date].compact.min 21 22 if repost && repost.pending? && @item_queue 23 @item_queue.push(repost) 24 @report&.update(queue: { length: @item_queue.length }) 25 end 26 + rescue InvalidRecordError => e 27 puts "Error in RepostsImporter: #{record['uri']}: #{e}" 28 end 29 end 30 31 + @imported_count += records.length 32 + @report&.update(importers: { importer_name => { :imported_items => @imported_count }}) 33 + @report&.update(importers: { importer_name => { :oldest_date => oldest_date }}) if oldest_date 34 + 35 params[:cursor] = cursor 36 @import.update!(cursor: cursor) 37 38 break if !cursor 39 + break if @time_limit && oldest_date && oldest_date < @time_limit 40 end 41 end 42 end
+9 -1
app/models/user_importable.rb
··· 1 module UserImportable 2 def import_from_record(item_uri, record, **args) 3 - item = self.new_from_record(item_uri, record) 4 return nil if item.nil? || already_imported?(item) 5 6 item.import_item!(args) 7 end 8 9 def already_imported?(item)
··· 1 + require_relative '../errors' 2 + 3 module UserImportable 4 def import_from_record(item_uri, record, **args) 5 + item = try_build_from_record(item_uri, record) 6 return nil if item.nil? || already_imported?(item) 7 8 item.import_item!(args) 9 + end 10 + 11 + def try_build_from_record(item_uri, record) 12 + self.new_from_record(item_uri, record) 13 + rescue StandardError 14 + raise InvalidRecordError 15 end 16 17 def already_imported?(item)
+19 -6
app/post_downloader.rb
··· 63 puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}" 64 invalidate_item(item) 65 end 66 - rescue StandardError => e 67 puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}" 68 end 69 end 70 ··· 77 def save_post(post_uri, record) 78 did, _, rkey = AT_URI(post_uri) 79 80 - text = record.delete('text') 81 - created = record.delete('createdAt') 82 - record.delete('$type') 83 - 84 author = User.find_or_create_by!(did: did) 85 86 if post = Post.find_by(user: author, rkey: rkey) 87 return post 88 end 89 90 - Post.create( 91 user: author, 92 rkey: rkey, 93 time: Time.parse(created), 94 text: text, 95 data: JSON.generate(record) 96 ) 97 end 98 99 def update_item(item, post)
··· 63 puts "Invalid post #{item.post_uri}: #{post.errors.full_messages.join("; ")}" 64 invalidate_item(item) 65 end 66 + rescue InvalidRecordError => e 67 puts "Error in PostDownloader: #{item.post_uri}: #{e.class}: #{e}" 68 + 69 + item = items.detect { |x| x.post_uri == data['uri'] } 70 + item.update!(queue: nil) 71 + items.delete(item) 72 end 73 end 74 ··· 81 def save_post(post_uri, record) 82 did, _, rkey = AT_URI(post_uri) 83 84 author = User.find_or_create_by!(did: did) 85 86 if post = Post.find_by(user: author, rkey: rkey) 87 return post 88 + else 89 + post = build_post(author, rkey, record) 90 + post.save 91 + post 92 end 93 + end 94 95 + def build_post(author, rkey, record) 96 + text = record.delete('text') 97 + created = record.delete('createdAt') 98 + 99 + record.delete('$type') 100 + 101 + Post.new( 102 user: author, 103 rkey: rkey, 104 time: Time.parse(created), 105 text: text, 106 data: JSON.generate(record) 107 ) 108 + rescue StandardError 109 + raise InvalidRecordError 110 end 111 112 def update_item(item, post)