From 98c280e6a26a2ebbbacf34be0a26a642b1f3dee7 Mon Sep 17 00:00:00 2001 From: sneakers-the-rat Date: Sun, 22 Sep 2024 14:46:38 -0700 Subject: [PATCH] stashing some partial work on streaming --- app/controllers/api/v1/statuses_controller.rb | 28 +++++------ .../flavours/glitch/actions/statuses.js | 16 +++---- .../flavours/glitch/actions/streaming.js | 9 +++- .../flavours/glitch/actions/thread.ts | 12 +++++ .../flavours/glitch/reducers/thread.ts | 0 .../concerns/status/threading_concern.rb | 5 ++ .../activitypub/fetch_replies_service.rb | 14 +++++- app/services/fan_out_on_write_service.rb | 11 +++++ .../activitypub/fetch_replies_worker.rb | 16 ++++++- app/workers/fetch_reply_worker.rb | 47 ++++++++++--------- 10 files changed, 108 insertions(+), 50 deletions(-) create mode 100644 app/javascript/flavours/glitch/actions/thread.ts create mode 100644 app/javascript/flavours/glitch/reducers/thread.ts diff --git a/app/controllers/api/v1/statuses_controller.rb b/app/controllers/api/v1/statuses_controller.rb index c7569f4285dfea..fb86bd28c8f4d8 100644 --- a/app/controllers/api/v1/statuses_controller.rb +++ b/app/controllers/api/v1/statuses_controller.rb @@ -48,22 +48,6 @@ def context ancestors_limit = ANCESTORS_LIMIT descendants_limit = DESCENDANTS_LIMIT descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT - else - unless @status.local? && !@status.should_fetch_replies? - json_status = fetch_resource(@status.uri, true, current_account) - - # rescue this whole block on failure, don't want to fail the whole context request if we can't do this - collection = json_status['replies'] - - unless collection.nil? - ActivityPub::FetchRepliesService.new.call( - @status, - collection, - allow_synchronous_requests: true, - all_replies: true - ) - end - end end ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account) @@ -75,6 +59,18 @@ def context statuses = [@status] + @context.ancestors + @context.descendants render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id) + + unless @status.local? && !@status.should_fetch_replies? + ActivityPub::FetchRepliesWorker.perform_async( + @status.id, + nil, + { + allow_synchronous_requests: true, + all_replies: true, + current_account_id: current_account.id, + } + ) + end end def create diff --git a/app/javascript/flavours/glitch/actions/statuses.js b/app/javascript/flavours/glitch/actions/statuses.js index 2d83ae9926895a..b6a7bb20d253f0 100644 --- a/app/javascript/flavours/glitch/actions/statuses.js +++ b/app/javascript/flavours/glitch/actions/statuses.js @@ -8,33 +8,33 @@ import { deleteFromTimelines } from './timelines'; export const STATUS_FETCH_REQUEST = 'STATUS_FETCH_REQUEST'; export const STATUS_FETCH_SUCCESS = 'STATUS_FETCH_SUCCESS'; -export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; +export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL'; export const STATUS_DELETE_REQUEST = 'STATUS_DELETE_REQUEST'; export const STATUS_DELETE_SUCCESS = 'STATUS_DELETE_SUCCESS'; -export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; +export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL'; export const CONTEXT_FETCH_REQUEST = 'CONTEXT_FETCH_REQUEST'; export const CONTEXT_FETCH_SUCCESS = 'CONTEXT_FETCH_SUCCESS'; -export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; +export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL'; export const STATUS_MUTE_REQUEST = 'STATUS_MUTE_REQUEST'; export const STATUS_MUTE_SUCCESS = 'STATUS_MUTE_SUCCESS'; -export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; +export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL'; export const STATUS_UNMUTE_REQUEST = 'STATUS_UNMUTE_REQUEST'; export const STATUS_UNMUTE_SUCCESS = 'STATUS_UNMUTE_SUCCESS'; -export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; +export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL'; -export const STATUS_REVEAL = 'STATUS_REVEAL'; -export const STATUS_HIDE = 'STATUS_HIDE'; +export const STATUS_REVEAL = 'STATUS_REVEAL'; +export const STATUS_HIDE = 'STATUS_HIDE'; export const STATUS_COLLAPSE = 'STATUS_COLLAPSE'; export const REDRAFT = 'REDRAFT'; export const STATUS_FETCH_SOURCE_REQUEST = 'STATUS_FETCH_SOURCE_REQUEST'; export const STATUS_FETCH_SOURCE_SUCCESS = 'STATUS_FETCH_SOURCE_SUCCESS'; -export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; +export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL'; export const STATUS_TRANSLATE_REQUEST = 'STATUS_TRANSLATE_REQUEST'; export const STATUS_TRANSLATE_SUCCESS = 'STATUS_TRANSLATE_SUCCESS'; diff --git a/app/javascript/flavours/glitch/actions/streaming.js b/app/javascript/flavours/glitch/actions/streaming.js index 2d69542fc70333..40df5b3804ac59 100644 --- a/app/javascript/flavours/glitch/actions/streaming.js +++ b/app/javascript/flavours/glitch/actions/streaming.js @@ -188,4 +188,11 @@ export const connectDirectStream = () => * @returns {function(): void} */ export const connectListStream = listId => - connectTimelineStream(`list:${listId}`, 'list', { list: listId }, { fillGaps: () => fillListTimelineGaps(listId) }); + connectTimelineStream(`list:${listId}`, 'list', {list: listId}, {fillGaps: () => fillListTimelineGaps(listId)}); + +/** + * @param {string} rootURI + * @returns {function(): void} + */ +export const connectThreadStream = rootURI => + connectTimelineStream(`thread:${rootURI}`, 'thread', {rootURI: rootURI}); diff --git a/app/javascript/flavours/glitch/actions/thread.ts b/app/javascript/flavours/glitch/actions/thread.ts new file mode 100644 index 00000000000000..c3e26b18e8073e --- /dev/null +++ b/app/javascript/flavours/glitch/actions/thread.ts @@ -0,0 +1,12 @@ +import { createAction } from '@reduxjs/toolkit'; + +import type { ApiStatusJSON } from 'flavours/glitch/api_types/statuses'; + +export const threadMount = createAction('THREAD_MOUNT'); +export const threadUnmount = createAction('THREAD_UNMOUNT'); + +interface ThreadUpdatePayload { + status: ApiStatusJSON; +} + +export const threadUpdate = createAction('THREAD_UPDATE'); diff --git a/app/javascript/flavours/glitch/reducers/thread.ts b/app/javascript/flavours/glitch/reducers/thread.ts new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/app/models/concerns/status/threading_concern.rb b/app/models/concerns/status/threading_concern.rb index 478a139d633be2..59bd7fc7a45dd4 100644 --- a/app/models/concerns/status/threading_concern.rb +++ b/app/models/concerns/status/threading_concern.rb @@ -32,6 +32,11 @@ def self_replies(limit) account.statuses.distributable_visibility.where(in_reply_to_id: id).reorder(id: :asc).limit(limit) end + def thread_root + maybe_ancestors = ancestor_ids(Api::V1::StatusesController::CONTEXT_LIMIT) + maybe_ancestors.none? ? id : maybe_ancestors[0] + end + private def ancestor_ids(limit) diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 04b035a718a3fc..383c6acee812d2 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -8,10 +8,11 @@ class ActivityPub::FetchRepliesService < BaseService # limit of fetched replies used when fetching all replies MAX_REPLIES_HIGH = 500 - def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false) + def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false, current_account_id: nil) # Whether we are getting replies from more than the originating server, # and don't limit ourselves to getting at most `MAX_REPLIES_LOW` @all_replies = all_replies + @current_account_id = current_account_id # store the status and whether we should fetch replies for it to avoid # race conditions if something else updates us in the meantime @status = parent_status @@ -23,7 +24,16 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req @items = collection_items(collection_or_uri) return if @items.nil? - FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] } + FetchReplyWorker.push_bulk(filtered_replies) do |reply_uri| + [ + reply_uri, + { + request_id: request_id, + all_replies: @all_replies, + current_account_id: @current_account_id, + }, + ] + end # Store last fetched all to debounce @status.touch(:fetched_replies_at) diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 71ab1ac494d53f..ec934026899950 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -38,6 +38,7 @@ def check_race_condition! def fan_out_to_local_recipients! deliver_to_self! + deliver_to_thread_stream! unless @options[:skip_notifications] notify_mentioned_accounts! @@ -71,6 +72,12 @@ def deliver_to_self! FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility? end + def deliver_to_thread_stream + return unless subscribed_to_thread_stream? + + redis.publish("timeline:thread:#{@status.thread_root}", anonymous_payload) + end + def notify_mentioned_accounts! @status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions| LocalNotificationWorker.push_bulk(mentions) do |mention| @@ -183,4 +190,8 @@ def broadcastable? def subscribed_to_streaming_api?(account_id) redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications") end + + def subscribed_to_thread_stream? + redis.exists?("subscribed:timeline:thread:#{@status.thread_root}") + end end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index d72bad745261d3..a55df06bfa3e7f 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,9 +6,23 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri, options = {}) + def perform(parent_status_id, replies_uri = nil, options = {}) + @current_account_id = options.fetch(:current_account_id, nil) + replies_uri = get_replies_uri(status) if replies_uri.nil? + return if replies_uri.nil? + ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end + + private + + def get_replies_uri(status) + current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id) + json_status = fetch_resource(status.uri, true, current_account) + replies_uri = json_status['replies'] + Rails.logger.debug { "Could not find replies uri for status: #{status}" } if replies_uri.nil? + replies_uri + end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 280bc8406b4d84..ca05d0ae6cad92 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -8,27 +8,30 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 def perform(child_url, options = {}) - all_replies = options.delete('all_replies') - - status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) - - # asked to fetch replies recursively - do the second-level calls async - if all_replies && status - json_status = fetch_resource(status.uri, true) - - collection = json_status['replies'] - unless collection.nil? - # if expanding replies recursively, spread out the recursive calls - ActivityPub::FetchRepliesWorker.perform_in( - rand(1..30).seconds, - status.id, - collection, - { - allow_synchronous_requests: true, - all_replies: true, - } - ) - end - end + @all_replies = options.fetch('all_replies', nil) + @current_account_id = options.delete('current_account_id') + + @status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) + + recurse + + @status + end + + private + + def recurse + return unless @all_replies && @status + + ActivityPub::FetchRepliesWorker.perform_in( + rand(1..30).seconds, + @status.id, + nil, + { + allow_synchronous_requests: true, + all_replies: true, + current_account_id: @current_account_id, + } + ) end end