diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index fb86bd28c8f4d8..c7569f4285dfea 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,6 +48,22 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT + else + unless @status.local? && !@status.should_fetch_replies? + json_status = fetch_resource(@status.uri, true, current_account) + + # rescue this whole block on failure, don't want to fail the whole context request if we can't do this + collection = json_status['replies'] + + unless collection.nil? + ActivityPub::FetchRepliesService.new.call( + @status, + collection, + allow_synchronous_requests: true, + all_replies: true + ) + end + end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -59,18 +75,6 @@ def context statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) - - unless @status.local? && !@status.should_fetch_replies? - ActivityPub::FetchRepliesWorker.perform_async( - @status.id, - nil, - { - allow_synchronous_requests: true, - all_replies: true, - current_account_id: current_account.id, - } - ) - end end def create diff --git a/app/javascript/flavours/glitch/actions/statuses.js b/app/javascript/flavours/glitch/actions/statuses.js index b6a7bb20d253f0..2d83ae9926895a 100644 --- a/app/javascript/flavours/glitch/actions/statuses.js +++ b/app/javascript/flavours/glitch/actions/statuses.js @@ -8,33 +8,33 @@ import { deleteFromTimelines } from './timelines'; export const STATUS_FETCH_REQUEST = 'STATUS_FETCH_REQUEST'; export const STATUS_FETCH_SUCCESS = 'STATUS_FETCH_SUCCESS'; -export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; +export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; export const STATUS_DELETE_REQUEST = 'STATUS_DELETE_REQUEST'; export const STATUS_DELETE_SUCCESS = 'STATUS_DELETE_SUCCESS'; -export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; +export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; export const CONTEXT_FETCH_REQUEST = 'CONTEXT_FETCH_REQUEST'; export const CONTEXT_FETCH_SUCCESS = 'CONTEXT_FETCH_SUCCESS'; -export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; +export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; export const STATUS_MUTE_REQUEST = 'STATUS_MUTE_REQUEST'; export const STATUS_MUTE_SUCCESS = 'STATUS_MUTE_SUCCESS'; -export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; +export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; export const STATUS_UNMUTE_REQUEST = 'STATUS_UNMUTE_REQUEST'; export const STATUS_UNMUTE_SUCCESS = 'STATUS_UNMUTE_SUCCESS'; -export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; +export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; -export const STATUS_REVEAL = 'STATUS_REVEAL'; -export const STATUS_HIDE = 'STATUS_HIDE'; +export const STATUS_REVEAL = 'STATUS_REVEAL'; +export const STATUS_HIDE = 'STATUS_HIDE'; export const STATUS_COLLAPSE = 'STATUS_COLLAPSE'; export const REDRAFT = 'REDRAFT'; export const STATUS_FETCH_SOURCE_REQUEST = 'STATUS_FETCH_SOURCE_REQUEST'; export const STATUS_FETCH_SOURCE_SUCCESS = 'STATUS_FETCH_SOURCE_SUCCESS'; -export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; +export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; export const STATUS_TRANSLATE_REQUEST = 'STATUS_TRANSLATE_REQUEST'; export const STATUS_TRANSLATE_SUCCESS = 'STATUS_TRANSLATE_SUCCESS'; diff --git a/app/javascript/flavours/glitch/actions/streaming.js b/app/javascript/flavours/glitch/actions/streaming.js index 40df5b3804ac59..2d69542fc70333 100644 --- a/app/javascript/flavours/glitch/actions/streaming.js +++ b/app/javascript/flavours/glitch/actions/streaming.js @@ -188,11 +188,4 @@ export const connectDirectStream = () => * @returns {function(): void} */ export const connectListStream = listId => - connectTimelineStream(`list:${listId}`, 'list', {list: listId}, {fillGaps: () => fillListTimelineGaps(listId)}); - -/** - * @param {string} rootURI - * @returns {function(): void} - */ -export const connectThreadStream = rootURI => - connectTimelineStream(`thread:${rootURI}`, 'thread', {rootURI: rootURI}); + connectTimelineStream(`list:${listId}`, 'list', { list: listId }, { fillGaps: () => fillListTimelineGaps(listId) }); diff --git a/app/javascript/flavours/glitch/actions/thread.ts b/app/javascript/flavours/glitch/actions/thread.ts deleted file mode 100644 index c3e26b18e8073e..00000000000000 --- a/app/javascript/flavours/glitch/actions/thread.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { createAction } from '@reduxjs/toolkit'; - -import type { ApiStatusJSON } from 'flavours/glitch/api_types/statuses'; - -export const threadMount = createAction('THREAD_MOUNT'); -export const threadUnmount = createAction('THREAD_UNMOUNT'); - -interface ThreadUpdatePayload { - status: ApiStatusJSON; -} - -export const threadUpdate = createAction('THREAD_UPDATE'); diff --git a/app/javascript/flavours/glitch/reducers/thread.ts b/app/javascript/flavours/glitch/reducers/thread.ts deleted file mode 100644 index e69de29bb2d1d6..00000000000000 diff --git a/app/models/concerns/status/threading_concern.rb b/app/models/concerns/status/threading_concern.rb index 59bd7fc7a45dd4..478a139d633be2 100644 --- a/app/models/concerns/status/threading_concern.rb +++ b/app/models/concerns/status/threading_concern.rb @@ -32,11 +32,6 @@ def self_replies(limit) account.statuses.distributable_visibility.where(in_reply_to_id: id).reorder(id: :asc).limit(limit) end - def thread_root - maybe_ancestors = ancestor_ids(Api::V1::StatusesController::CONTEXT_LIMIT) - maybe_ancestors.none? ? id : maybe_ancestors[0] - end - private def ancestor_ids(limit) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 383c6acee812d2..04b035a718a3fc 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -8,11 +8,10 @@ class ActivityPub::FetchRepliesService < BaseService # limit of fetched replies used when fetching all replies MAX_REPLIES_HIGH = 500 - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false, current_account_id: nil) + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies - @current_account_id = current_account_id # store the status and whether we should fetch replies for it to avoid # race conditions if something else updates us in the meantime @status = parent_status @@ -24,16 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) do |reply_uri| - [ - reply_uri, - { - request_id: request_id, - all_replies: @all_replies, - current_account_id: @current_account_id, - }, - ] - end + FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } # Store last fetched all to debounce @status.touch(:fetched_replies_at) diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index ec934026899950..71ab1ac494d53f 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -38,7 +38,6 @@ def check_race_condition! def fan_out_to_local_recipients! deliver_to_self! - deliver_to_thread_stream! unless @options[:skip_notifications] notify_mentioned_accounts! @@ -72,12 +71,6 @@ def deliver_to_self! FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility? end - def deliver_to_thread_stream - return unless subscribed_to_thread_stream? - - redis.publish("timeline:thread:#{@status.thread_root}", anonymous_payload) - end - def notify_mentioned_accounts! @status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions| LocalNotificationWorker.push_bulk(mentions) do |mention| @@ -190,8 +183,4 @@ def broadcastable? def subscribed_to_streaming_api?(account_id) redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications") end - - def subscribed_to_thread_stream? - redis.exists?("subscribed:timeline:thread:#{@status.thread_root}") - end end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index a55df06bfa3e7f..d72bad745261d3 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,23 +6,9 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri = nil, options = {}) - @current_account_id = options.fetch(:current_account_id, nil) - replies_uri = get_replies_uri(status) if replies_uri.nil? - return if replies_uri.nil? - + def perform(parent_status_id, replies_uri, options = {}) ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end - - private - - def get_replies_uri(status) - current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) - json_status = fetch_resource(status.uri, true, current_account) - replies_uri = json_status['replies'] - Rails.logger.debug { "Could not find replies uri for status: #{status}" } if replies_uri.nil? - replies_uri - end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index ca05d0ae6cad92..280bc8406b4d84 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,30 +8,27 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - @all_replies = options.fetch('all_replies', nil) - @current_account_id = options.delete('current_account_id') - - @status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - recurse - - @status - end - - private - - def recurse - return unless @all_replies && @status - - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - @status.id, - nil, - { - allow_synchronous_requests: true, - all_replies: true, - current_account_id: @current_account_id, - } - ) + all_replies = options.delete('all_replies') + + status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + # asked to fetch replies recursively - do the second-level calls async + if all_replies && status + json_status = fetch_resource(status.uri, true) + + collection = json_status['replies'] + unless collection.nil? + # if expanding replies recursively, spread out the recursive calls + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + status.id, + collection, + { + allow_synchronous_requests: true, + all_replies: true, + } + ) + end + end end end