Skip to content

Commit

Permalink
Remove recursion, separate out into separate workers/services, add li…
Browse files Browse the repository at this point in the history
…mit to global maximum statuses fetched (untested, this might not work yet)
  • Loading branch information
sneakers-the-rat committed Sep 30, 2024
1 parent 0c9abe1 commit 33b5964
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 58 deletions.
16 changes: 13 additions & 3 deletions app/controllers/api/v1/statuses_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,25 @@ def context
end
end

ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account)
ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account)
descendants_results = @status.descendants(descendants_limit, current_account, descendants_depth_limit)
loaded_ancestors = preload_collection(ancestors_results, Status)
loaded_descendants = preload_collection(descendants_results, Status)
loaded_ancestors = preload_collection(ancestors_results, Status)
loaded_descendants = preload_collection(descendants_results, Status)

@context = Context.new(ancestors: loaded_ancestors, descendants: loaded_descendants)
statuses = [@status] + @context.ancestors + @context.descendants

render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)

if @status.should_fetch_replies?
ActivityPub::FetchAllRepliesWorker.perform_async(
@status.id,
{
allow_synchronous_requests: true,
current_account_id: current_account.id,
}
)
end
end

def create
Expand Down
27 changes: 27 additions & 0 deletions app/models/concerns/status/fetch_replies_concern.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module Status::FetchRepliesConcern
extend ActiveSupport::Concern

# debounce fetching all replies to minimize DoS
FETCH_REPLIES_DEBOUNCE = 30.minutes

CREATED_RECENTLY_DEBOUNCE = 10.minutes

included do
scope :created_recently, -> { where(created_at: CREATED_RECENTLY_DEBOUNCE.ago..) }
scope :not_created_recently, -> { where(created_at: ..CREATED_RECENTLY_DEBOUNCE.ago) }
scope :fetched_recently, -> { where(fetched_replies_at: FETCH_REPLIES_DEBOUNCE.ago..) }
scope :not_fetched_recently, -> { where(fetched_replies_at: ..FETCH_REPLIES_DEBOUNCE.ago).or(where(fetched_replies_at: nil)) }

scope :shouldnt_fetch_replies, -> { local.merge(created_recently).merge(fetched_recently) }
scope :should_fetch_replies, -> { local.invert_where.merge(not_created_recently).merge(not_fetched_recently) }
end

def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
!local? && created_at <= 10.minutes.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago
)
end
end
10 changes: 1 addition & 9 deletions app/models/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Status < ApplicationRecord
include Discard::Model
include Paginable
include RateLimitable
include Status::FetchRepliesConcern
include Status::SafeReblogInsert
include Status::SearchConcern
include Status::SnapshotConcern
Expand Down Expand Up @@ -184,8 +185,6 @@ class Status < ApplicationRecord
delegate :domain, to: :account, prefix: true

REAL_TIME_WINDOW = 6.hours
# debounce fetching all replies to minimize DoS
FETCH_REPLIES_DEBOUNCE = 30.minutes

def cache_key
"v3:#{super}"
Expand Down Expand Up @@ -443,13 +442,6 @@ def unlink_from_conversations!
end
end

def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
!local? && created_at <= 10.minutes.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago
)
end

private

def update_status_stat!(attrs)
Expand Down
37 changes: 37 additions & 0 deletions app/services/activitypub/fetch_all_replies_service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

class ActivityPub::FetchAllRepliesService < ActivityPub::FetchRepliesService
include JsonLdHelper

# Limit of replies to fetch per status
MAX_REPLIES = 500

def call(collection_or_uri, allow_synchronous_requests: true, request_id: nil)
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = false

@items = collection_items(collection_or_uri)
@items = filtered_replies
return if @items.nil?

FetchReplyWorker.push_bulk(@items) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }

@items
end

private

def filtered_replies
return if @items.nil?

# find all statuses that we *shouldn't* update the replies for, and use that as a filter
uris = @items.map { |item| value_or_id(item) }
dont_update = Status.where(uri: uris).shouldnt_fetch_replies.pluck(:uri)

# touch all statuses that already exist and that we're about to update
Status.where(uri: uris).should_fetch_replies.touch_all(:fetched_replies_at)

# Reject all statuses that we already have in the db
uris.reject { |uri| dont_update.include?(uri) }.take(MAX_REPLIES)
end
end
34 changes: 10 additions & 24 deletions app/services/activitypub/fetch_replies_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,13 @@
class ActivityPub::FetchRepliesService < BaseService
include JsonLdHelper

# Limit of fetched replies used when not fetching all replies
MAX_REPLIES_LOW = 5
# limit of fetched replies used when fetching all replies
MAX_REPLIES_HIGH = 500

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false)
# Whether we are getting replies from more than the originating server,
# and don't limit ourselves to getting at most `MAX_REPLIES_LOW`
@all_replies = all_replies
# store the status and whether we should fetch replies for it to avoid
# race conditions if something else updates us in the meantime
@status = parent_status
@should_fetch_replies = parent_status.should_fetch_replies?
# Limit of fetched replies
MAX_REPLIES = 5

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, filter_by_host: true)
@account = parent_status.account
@allow_synchronous_requests = allow_synchronous_requests
@filter_by_host = filter_by_host

@items = collection_items(collection_or_uri)
return if @items.nil?
Expand Down Expand Up @@ -51,7 +42,7 @@ def collection_items(collection_or_uri)
all_items.concat(as_array(items))

# Quit early if we are not fetching all replies
break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies?
break if all_items.size >= MAX_REPLIES

collection = collection['next'].present? ? fetch_collection(collection['next']) : nil
end
Expand All @@ -62,7 +53,7 @@ def collection_items(collection_or_uri)
def fetch_collection(collection_or_uri)
return collection_or_uri if collection_or_uri.is_a?(Hash)
return unless @allow_synchronous_requests
return if !@all_replies && non_matching_uri_hosts?(@account.uri, collection_or_uri)
return if @filter_by_host && non_matching_uri_hosts?(@account.uri, collection_or_uri)

# NOTE: For backward compatibility reasons, Mastodon signs outgoing
# queries incorrectly by default.
Expand All @@ -81,19 +72,14 @@ def fetch_collection(collection_or_uri)
end

def filtered_replies
if @all_replies
# Reject all statuses that we already have in the db
@items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) }
else
if @filter_by_host
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.

# Also limit to 5 fetched replies to limit potential for DoS.
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW)
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES)
else
@items.map { |item| value_or_id(item) }.take(MAX_REPLIES)
end
end

def fetch_all_replies?
@all_replies && @should_fetch_replies
end
end
45 changes: 45 additions & 0 deletions app/workers/activitypub/fetch_all_replies_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# frozen_string_literal: true

# Fetch all replies to a status, querying recursively through
# ActivityPub replies collections, fetching any statuses that
# we either don't already have or we haven't checked for new replies
# in the Status::FETCH_REPLIES_DEBOUNCE interval
class ActivityPub::FetchAllRepliesWorker
include Sidekiq::Worker
include ExponentialBackoff

sidekiq_options queue: 'pull', retry: 3

# Global max replies to fetch per request
MAX_REPLIES = 1000

def perform(parent_status_id, options = {})
@parent_status = Status.find(parent_status_id)
@current_account_id = options.fetch(:current_account_id, nil)
@current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id)

all_replies = get_replies(@parent_status.uri)
got_replies = all_replies.length
until all_replies.empty? || got_replies >= MAX_REPLIES
new_replies = get_replies(all_replies.pop)
got_replies += new_replies.length
all_replies << new_replies
end
end

private

def get_replies(status_uri)
replies_uri = get_replies_uri(status_uri)
return if replies_uri.nil?

ActivityPub::FetchAllRepliesService.new.call(replies_uri, **options.deep_symbolize_keys)
end

def get_replies_uri(parent_status_uri)
json_status = fetch_resource(parent_status_uri, true, @current_account)
replies_uri = json_status['replies']
Rails.logger.debug { "Could not find replies uri for status URI: #{parent_status_uri}" } if replies_uri.nil?
replies_uri
end
end
23 changes: 1 addition & 22 deletions app/workers/fetch_reply_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,6 @@ class FetchReplyWorker
sidekiq_options queue: 'pull', retry: 3

def perform(child_url, options = {})
all_replies = options.delete('all_replies')

status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)

# asked to fetch replies recursively - do the second-level calls async
if all_replies && status
json_status = fetch_resource(status.uri, true)

collection = json_status['replies']
unless collection.nil?
# if expanding replies recursively, spread out the recursive calls
ActivityPub::FetchRepliesWorker.perform_in(
rand(1..30).seconds,
status.id,
collection,
{
allow_synchronous_requests: true,
all_replies: true,
}
)
end
end
FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)
end
end

0 comments on commit 33b5964

Please sign in to comment.