Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added broadcast_list to SolidCable adapter #41

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion app/models/solid_cable/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ class Message < SolidCable::Record
where(created_at: ...::SolidCable.message_retention.ago)
}
scope :broadcastable, lambda { |channels, last_id|
where(channel_hash: channel_hashes_for(channels)).
where(broadcast_to_list: false).
where(channel_hash: channel_hashes_for(channels)).
where(id: (last_id + 1)..).order(:id)
}
scope :broadcastable_to_list, lambda { |channels, last_id|
where(broadcast_to_list: true).
where(id: (last_id + 1)..).order(:id)
}

Expand All @@ -16,6 +21,11 @@ def broadcast(channel, payload)
channel_hash: channel_hash_for(channel) })
end

def broadcast_list(channel, payload)
insert({ created_at: Time.current, channel:, payload:,
channel_hash: channel_hash_for(channel), broadcast_to_list: true })
end

def channel_hashes_for(channels)
channels.map { |channel| channel_hash_for(channel) }
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddBroadcastToList < ActiveRecord::Migration[7.2]
def change
add_column :solid_cable_messages, :broadcast_to_list, :boolean, null: false, default: false
end
end
3 changes: 2 additions & 1 deletion bench/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[8.0].define(version: 2024_09_12_235943) do
ActiveRecord::Schema[8.0].define(version: 2024_10_08_221317) do
create_table "active_error_faults", force: :cascade do |t|
t.integer "cause_id"
t.binary "backtrace", limit: 536870912
Expand Down Expand Up @@ -48,6 +48,7 @@
t.binary "payload", limit: 536870912, null: false
t.datetime "created_at", null: false
t.integer "channel_hash", limit: 8, null: false
t.boolean "broadcast_to_list", default: false, null: false
t.index ["channel"], name: "index_solid_cable_messages_on_channel"
t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash"
t.index ["created_at"], name: "index_solid_cable_messages_on_created_at"
Expand Down
42 changes: 41 additions & 1 deletion lib/action_cable/subscription_adapter/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ def broadcast(channel, payload)
::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim?
end

def broadcast_list(channel, payload)
::SolidCable::Message.broadcast_list(channel, payload)

::SolidCable::TrimJob.perform_now if ::SolidCable.autotrim?
end

def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel, callback, success_callback)
end
Expand Down Expand Up @@ -78,6 +84,7 @@ def invoke_callback(*)
private
attr_reader :event_loop, :thread
attr_writer :running, :last_id
attr_writer :running, :last_list_id

def running?
if defined?(@running)
Expand All @@ -88,7 +95,11 @@ def running?
end

def last_id
@last_id ||= ::SolidCable::Message.maximum(:id) || 0
@last_id ||= ::SolidCable::Message.where(broadcast_to_list: false).maximum(:id) || 0
end

def last_list_id
@last_list_id ||= ::SolidCable::Message.where(broadcast_to_list: true).maximum(:id) || 0
end

def channels
Expand All @@ -101,6 +112,14 @@ def broadcast_messages
broadcast(message.channel, message.payload)
self.last_id = message.id
end

::SolidCable::Message.broadcastable_to_list(channels, last_list_id).
each do |message|
find_matching_channels(message.channel, channels).each do |channel|
broadcast(channel, message.payload)
end
self.last_list_id = message.id
end
end

def with_polling_volume
Expand All @@ -110,6 +129,27 @@ def with_polling_volume
yield
end
end

# Returns a list of channels that match the broadcast_to_list nomenclature
# For example, if channel is "posts:1", and channels is ["posts:1-2", "posts:2-3", "posts:1-3"],
# this method will return ["posts:1-2", "post:1-3"].
# channel attr must have parts separated by ":" and must have at least 2 parts
# channels must have parts separated by ":" and must have at least 2 parts, and in the last part
# which represents the identifiers, they must be separated by "-"
def find_matching_channels(channel, channels)
parts = channel.split(":")
return [] if parts.length == 1

id = parts.pop
base_channel = parts.join(":")

channels.filter_map do |ch|
if ch.start_with?(base_channel)
ids = ch.split(":").last
ch if ids != ch && ids.split("-").include?(id)
end
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
t.binary "payload", limit: 536870912, null: false
t.datetime "created_at", null: false
t.integer "channel_hash", limit: 8, null: false
t.boolean "broadcast_to_list", default: false, null: false
t.index ["channel"], name: "index_solid_cable_messages_on_channel"
t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash"
t.index ["created_at"], name: "index_solid_cable_messages_on_created_at"
Expand Down
1 change: 1 addition & 0 deletions test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
t.binary "payload", limit: 536870912, null: false
t.datetime "created_at", null: false
t.integer "channel_hash", limit: 8, null: false
t.boolean "broadcast_to_list", default: false, null: false
t.index ["channel"], name: "index_solid_cable_messages_on_channel"
t.index ["channel_hash"], name: "index_solid_cable_messages_on_channel_hash"
t.index ["created_at"], name: "index_solid_cable_messages_on_created_at"
Expand Down
10 changes: 10 additions & 0 deletions test/lib/action_cable/subscription_adapter/solid_cable_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase
end
end

test "broadcast_list" do
subscribe_as_queue("channel:2-3") do |queue|
subscribe_as_queue("channel:3-4") do |queue2|
@tx_adapter.broadcast_list("channel:2", "hello world")
assert_empty queue2
end
assert_equal "hello world", queue.pop
end
end

private
def cable_config
{ adapter: "solid_cable", message_retention: "1.second",
Expand Down