Skip to content

Commit

Permalink
Revert "stashing some partial work on streaming"
Browse files Browse the repository at this point in the history
This reverts commit 98c280e.
  • Loading branch information
sneakers-the-rat committed Sep 22, 2024
1 parent 98c280e commit f25e337
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 108 deletions.
28 changes: 16 additions & 12 deletions app/controllers/api/v1/statuses_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ def context
ancestors_limit = ANCESTORS_LIMIT
descendants_limit = DESCENDANTS_LIMIT
descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT
else
unless @status.local? && !@status.should_fetch_replies?
json_status = fetch_resource(@status.uri, true, current_account)

# rescue this whole block on failure, don't want to fail the whole context request if we can't do this
collection = json_status['replies']

unless collection.nil?
ActivityPub::FetchRepliesService.new.call(
@status,
collection,
allow_synchronous_requests: true,
all_replies: true
)
end
end
end

ancestors_results = @status.in_reply_to_id.nil? ? [] : @status.ancestors(ancestors_limit, current_account)
Expand All @@ -59,18 +75,6 @@ def context
statuses = [@status] + @context.ancestors + @context.descendants

render json: @context, serializer: REST::ContextSerializer, relationships: StatusRelationshipsPresenter.new(statuses, current_user&.account_id)

unless @status.local? && !@status.should_fetch_replies?
ActivityPub::FetchRepliesWorker.perform_async(
@status.id,
nil,
{
allow_synchronous_requests: true,
all_replies: true,
current_account_id: current_account.id,
}
)
end
end

def create
Expand Down
16 changes: 8 additions & 8 deletions app/javascript/flavours/glitch/actions/statuses.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,33 @@ import { deleteFromTimelines } from './timelines';

export const STATUS_FETCH_REQUEST = 'STATUS_FETCH_REQUEST';
export const STATUS_FETCH_SUCCESS = 'STATUS_FETCH_SUCCESS';
export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL';
export const STATUS_FETCH_FAIL = 'STATUS_FETCH_FAIL';

export const STATUS_DELETE_REQUEST = 'STATUS_DELETE_REQUEST';
export const STATUS_DELETE_SUCCESS = 'STATUS_DELETE_SUCCESS';
export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL';
export const STATUS_DELETE_FAIL = 'STATUS_DELETE_FAIL';

export const CONTEXT_FETCH_REQUEST = 'CONTEXT_FETCH_REQUEST';
export const CONTEXT_FETCH_SUCCESS = 'CONTEXT_FETCH_SUCCESS';
export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL';
export const CONTEXT_FETCH_FAIL = 'CONTEXT_FETCH_FAIL';

export const STATUS_MUTE_REQUEST = 'STATUS_MUTE_REQUEST';
export const STATUS_MUTE_SUCCESS = 'STATUS_MUTE_SUCCESS';
export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL';
export const STATUS_MUTE_FAIL = 'STATUS_MUTE_FAIL';

export const STATUS_UNMUTE_REQUEST = 'STATUS_UNMUTE_REQUEST';
export const STATUS_UNMUTE_SUCCESS = 'STATUS_UNMUTE_SUCCESS';
export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL';
export const STATUS_UNMUTE_FAIL = 'STATUS_UNMUTE_FAIL';

export const STATUS_REVEAL = 'STATUS_REVEAL';
export const STATUS_HIDE = 'STATUS_HIDE';
export const STATUS_REVEAL = 'STATUS_REVEAL';
export const STATUS_HIDE = 'STATUS_HIDE';
export const STATUS_COLLAPSE = 'STATUS_COLLAPSE';

export const REDRAFT = 'REDRAFT';

export const STATUS_FETCH_SOURCE_REQUEST = 'STATUS_FETCH_SOURCE_REQUEST';
export const STATUS_FETCH_SOURCE_SUCCESS = 'STATUS_FETCH_SOURCE_SUCCESS';
export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL';
export const STATUS_FETCH_SOURCE_FAIL = 'STATUS_FETCH_SOURCE_FAIL';

export const STATUS_TRANSLATE_REQUEST = 'STATUS_TRANSLATE_REQUEST';
export const STATUS_TRANSLATE_SUCCESS = 'STATUS_TRANSLATE_SUCCESS';
Expand Down
9 changes: 1 addition & 8 deletions app/javascript/flavours/glitch/actions/streaming.js
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,4 @@ export const connectDirectStream = () =>
* @returns {function(): void}
*/
export const connectListStream = listId =>
connectTimelineStream(`list:${listId}`, 'list', {list: listId}, {fillGaps: () => fillListTimelineGaps(listId)});

/**
* @param {string} rootURI
* @returns {function(): void}
*/
export const connectThreadStream = rootURI =>
connectTimelineStream(`thread:${rootURI}`, 'thread', {rootURI: rootURI});
connectTimelineStream(`list:${listId}`, 'list', { list: listId }, { fillGaps: () => fillListTimelineGaps(listId) });
12 changes: 0 additions & 12 deletions app/javascript/flavours/glitch/actions/thread.ts

This file was deleted.

Empty file.
5 changes: 0 additions & 5 deletions app/models/concerns/status/threading_concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ def self_replies(limit)
account.statuses.distributable_visibility.where(in_reply_to_id: id).reorder(id: :asc).limit(limit)
end

def thread_root
maybe_ancestors = ancestor_ids(Api::V1::StatusesController::CONTEXT_LIMIT)
maybe_ancestors.none? ? id : maybe_ancestors[0]
end

private

def ancestor_ids(limit)
Expand Down
14 changes: 2 additions & 12 deletions app/services/activitypub/fetch_replies_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ class ActivityPub::FetchRepliesService < BaseService
# limit of fetched replies used when fetching all replies
MAX_REPLIES_HIGH = 500

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false, current_account_id: nil)
def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false)
# Whether we are getting replies from more than the originating server,
# and don't limit ourselves to getting at most `MAX_REPLIES_LOW`
@all_replies = all_replies
@current_account_id = current_account_id
# store the status and whether we should fetch replies for it to avoid
# race conditions if something else updates us in the meantime
@status = parent_status
Expand All @@ -24,16 +23,7 @@ def call(parent_status, collection_or_uri, allow_synchronous_requests: true, req
@items = collection_items(collection_or_uri)
return if @items.nil?

FetchReplyWorker.push_bulk(filtered_replies) do |reply_uri|
[
reply_uri,
{
request_id: request_id,
all_replies: @all_replies,
current_account_id: @current_account_id,
},
]
end
FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id, 'all_replies' => @all_replies }] }
# Store last fetched all to debounce
@status.touch(:fetched_replies_at)

Expand Down
11 changes: 0 additions & 11 deletions app/services/fan_out_on_write_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def check_race_condition!

def fan_out_to_local_recipients!
deliver_to_self!
deliver_to_thread_stream!

unless @options[:skip_notifications]
notify_mentioned_accounts!
Expand Down Expand Up @@ -72,12 +71,6 @@ def deliver_to_self!
FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility?
end

def deliver_to_thread_stream
return unless subscribed_to_thread_stream?

redis.publish("timeline:thread:#{@status.thread_root}", anonymous_payload)
end

def notify_mentioned_accounts!
@status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
LocalNotificationWorker.push_bulk(mentions) do |mention|
Expand Down Expand Up @@ -190,8 +183,4 @@ def broadcastable?
def subscribed_to_streaming_api?(account_id)
redis.exists?("subscribed:timeline:#{account_id}") || redis.exists?("subscribed:timeline:#{account_id}:notifications")
end

def subscribed_to_thread_stream?
redis.exists?("subscribed:timeline:thread:#{@status.thread_root}")
end
end
16 changes: 1 addition & 15 deletions app/workers/activitypub/fetch_replies_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,9 @@ class ActivityPub::FetchRepliesWorker

sidekiq_options queue: 'pull', retry: 3

def perform(parent_status_id, replies_uri = nil, options = {})
@current_account_id = options.fetch(:current_account_id, nil)
replies_uri = get_replies_uri(status) if replies_uri.nil?
return if replies_uri.nil?

def perform(parent_status_id, replies_uri, options = {})
ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys)
rescue ActiveRecord::RecordNotFound
true
end

private

def get_replies_uri(status)
current_account = @current_account_id.nil? ? nil : Account.find(id: @current_account_id)
json_status = fetch_resource(status.uri, true, current_account)
replies_uri = json_status['replies']
Rails.logger.debug { "Could not find replies uri for status: #{status}" } if replies_uri.nil?
replies_uri
end
end
47 changes: 22 additions & 25 deletions app/workers/fetch_reply_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,27 @@ class FetchReplyWorker
sidekiq_options queue: 'pull', retry: 3

def perform(child_url, options = {})
@all_replies = options.fetch('all_replies', nil)
@current_account_id = options.delete('current_account_id')

@status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)

recurse

@status
end

private

def recurse
return unless @all_replies && @status

ActivityPub::FetchRepliesWorker.perform_in(
rand(1..30).seconds,
@status.id,
nil,
{
allow_synchronous_requests: true,
all_replies: true,
current_account_id: @current_account_id,
}
)
all_replies = options.delete('all_replies')

status = FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)

# asked to fetch replies recursively - do the second-level calls async
if all_replies && status
json_status = fetch_resource(status.uri, true)

collection = json_status['replies']
unless collection.nil?
# if expanding replies recursively, spread out the recursive calls
ActivityPub::FetchRepliesWorker.perform_in(
rand(1..30).seconds,
status.id,
collection,
{
allow_synchronous_requests: true,
all_replies: true,
}
)
end
end
end
end

0 comments on commit f25e337

Please sign in to comment.