From 4cd35714483d3e079f50b1353e82fe50a9f33997 Mon Sep 17 00:00:00 2001 From: sudeeptarlekar Date: Sun, 20 Sep 2020 21:41:58 +0530 Subject: [PATCH 1/4] Added method to check subscribers --- lib/stomp/client.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/stomp/client.rb b/lib/stomp/client.rb index 8f1f0e9..f086426 100644 --- a/lib/stomp/client.rb +++ b/lib/stomp/client.rb @@ -281,6 +281,14 @@ def closed?() @connection.closed?() end + # subscriber? checks if subscriber with destination is passed is present or not + # returns : boolean + def subscriber?(destination, headers = {}) + headers = headers.symbolize_keys + headers = headers.merge(:id => build_subscription_id(destination, headers)) + !@listeners[headers[:id]].nil? + end + # jruby? tests if the connection has detcted a JRuby environment def jruby?() @connection.jruby From 7b341228e84a8e8c2aaf2dfd18b52d657234a27a Mon Sep 17 00:00:00 2001 From: sudeeptarlekar Date: Mon, 21 Sep 2020 10:54:25 +0530 Subject: [PATCH 2/4] Added specs --- lib/stomp/client.rb | 10 +++++-- spec/client_spec.rb | 63 +++++++++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/lib/stomp/client.rb b/lib/stomp/client.rb index f086426..ca1da1f 100644 --- a/lib/stomp/client.rb +++ b/lib/stomp/client.rb @@ -281,8 +281,14 @@ def closed?() @connection.closed?() end - # subscriber? checks if subscriber with destination is passed is present or not - # returns : boolean + ## Checks if subscriber with destination is passed is present or not + # @returns [Boolean] + # + # example + # Stomp::Client.new().subscriber?('/topic/topicName') => false + # Stomp::Client.new().subscriber?('/queue/queueName') => true + # Stomp::Client.new().subscriber?('/queue/Consumer.subscriber1.VirtualTopic.topicName') => true + # def subscriber?(destination, headers = {}) headers = headers.symbolize_keys headers = headers.merge(:id => build_subscription_id(destination, headers)) diff --git a/spec/client_spec.rb b/spec/client_spec.rb index b310b04..a76318d 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -3,7 +3,6 @@ require 'spec_helper' require 'client_shared_examples' - describe Stomp::Client do let(:null_logger) { double("mock Stomp::NullLogger") } @@ -192,7 +191,7 @@ it_should_behave_like "standard Client" end - + describe "(created with authenticating stomp:// URL and non-TLD host)" do before(:each) do @@ -283,7 +282,7 @@ it_should_behave_like "standard Client" end - + describe "(created with failover URL)" do before(:each) do @client = Stomp::Client.new('failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)') @@ -312,23 +311,23 @@ client = Stomp::Client.new(url) expect(client.parameters).to eq(@parameters) end - + it "should properly parse a URL with failover:" do url = "failover:(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost1:61617,stomp://login3:passcode3@remotehost2:61618)" - + @parameters[:hosts] = [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost1", :port => 61617, :ssl => false}, {:login => "login3", :passcode => "passcode3", :host => "remotehost2", :port => 61618, :ssl => false} ] - + @parameters.merge!({:logger => null_logger}) @parameters[:client_main] = @cli_thread expect(Stomp::Connection).to receive(:new).with(@parameters) client = Stomp::Client.new(url) expect(client.parameters).to eq(@parameters) end - + it "should properly parse a URL without user and password" do url = "failover:(stomp://localhost:61616,stomp://remotehost:61617)" @@ -336,39 +335,39 @@ {:login => "", :passcode => "", :host => "localhost", :port => 61616, :ssl => false}, {:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false} ] - + @parameters.merge!({:logger => null_logger}) @parameters[:client_main] = @cli_thread expect(Stomp::Connection).to receive(:new).with(@parameters) - + client = Stomp::Client.new(url) @parameters[:client_main] = client.parameters[:client_main] expect(client.parameters).to eq(@parameters) end - + it "should properly parse a URL with user and/or password blank" do url = "failover:(stomp://@localhost:61616,stomp://@remotehost:61617)" - + @parameters[:hosts] = [ {:login => "", :passcode => "", :host => "localhost", :port => 61616, :ssl => false}, {:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false} ] - + @parameters.merge!({:logger => null_logger}) @parameters[:client_main] = @cli_thread expect(Stomp::Connection).to receive(:new).with(@parameters) - + client = Stomp::Client.new(url) @parameters[:client_main] = client.parameters[:client_main] expect(client.parameters).to eq(@parameters) end - + it "should properly parse a URL with the options query" do query = "initialReconnectDelay=5000&maxReconnectDelay=60000&useExponentialBackOff=false&backOffMultiplier=3" query += "&maxReconnectAttempts=4&randomize=true&backup=true&timeout=10000" - + url = "failover:(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?#{query}" - + # @parameters = { :initial_reconnect_delay => 5.0, @@ -380,21 +379,21 @@ :connect_timeout => 0, :reliable => true } - + @parameters[:hosts] = [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ] - + @parameters.merge!({:logger => null_logger}) @parameters[:client_main] = @cli_thread expect(Stomp::Connection).to receive(:new).with(@parameters) - + client = Stomp::Client.new(url) @parameters[:client_main] = client.parameters[:client_main] expect(client.parameters).to eq(@parameters) end - + end @@ -407,7 +406,7 @@ message.command = Stomp::CMD_ERROR message end - + it 'should handle ProducerFlowControlException errors by raising' do expect do @client = Stomp::Client.new @@ -472,6 +471,27 @@ def original_headers it_behaves_like 'argument-safe method' end + describe '#subscriber?' do + subject { @client.subscriber?('/topic/topicName') } + + context 'When subscription is present' do + before { + allow(@mock_connection).to receive(:subscribe) + @client.subscribe('/topic/topicName') { |message| } + } + + it 'returns true' do + expect(subject).to be true + end + end + + context 'When subscription is not present' do + it 'returns false' do + expect(subject).to be false + end + end + end + describe '#unsubscribe' do let(:connection_headers) { original_headers.merge({:id => Digest::SHA1.hexdigest('destination')}) } before { @@ -563,6 +583,5 @@ def original_headers it_behaves_like 'argument-safe method' end end - end end From 687972145944e280b81594b36884af8f73346d82 Mon Sep 17 00:00:00 2001 From: sudeeptarlekar Date: Thu, 15 Oct 2020 21:04:27 +0530 Subject: [PATCH 3/4] Added subscriber check method for connection class and added Gemfile --- Gemfile | 6 ++++++ lib/stomp/client.rb | 2 +- lib/stomp/connection.rb | 7 +++++++ spec/client_spec.rb | 4 ++-- 4 files changed, 16 insertions(+), 3 deletions(-) create mode 100644 Gemfile diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..aaa58fa --- /dev/null +++ b/Gemfile @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' + +# Specify your gem's dependencies in stomp.gemspec +gemspec diff --git a/lib/stomp/client.rb b/lib/stomp/client.rb index ca1da1f..523abbc 100644 --- a/lib/stomp/client.rb +++ b/lib/stomp/client.rb @@ -289,7 +289,7 @@ def closed?() # Stomp::Client.new().subscriber?('/queue/queueName') => true # Stomp::Client.new().subscriber?('/queue/Consumer.subscriber1.VirtualTopic.topicName') => true # - def subscriber?(destination, headers = {}) + def subscribed?(destination, headers = {}) headers = headers.symbolize_keys headers = headers.merge(:id => build_subscription_id(destination, headers)) !@listeners[headers[:id]].nil? diff --git a/lib/stomp/connection.rb b/lib/stomp/connection.rb index dadb455..a45a97f 100644 --- a/lib/stomp/connection.rb +++ b/lib/stomp/connection.rb @@ -371,6 +371,13 @@ def subscribe(destination, headers = {}, subId = nil) transmit(Stomp::CMD_SUBSCRIBE, headers) end + def subscribed?(destination, subId = nil) + return false unless @reliable + + subId = destination if subId.nil? + @subscriptions[subId].present? + end + # Unsubscribe from a destination. A subscription name is required. # For Stomp 1.1+ a session unique subscription ID is also required. def unsubscribe(destination, headers = {}, subId = nil) diff --git a/spec/client_spec.rb b/spec/client_spec.rb index a76318d..66e966d 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -471,8 +471,8 @@ def original_headers it_behaves_like 'argument-safe method' end - describe '#subscriber?' do - subject { @client.subscriber?('/topic/topicName') } + describe '#subscribed?' do + subject { @client.subscribed?('/topic/topicName') } context 'When subscription is present' do before { From 689b14c210d165db4dcf83c8fe0debcfd8f6c6c5 Mon Sep 17 00:00:00 2001 From: sudeeptarlekar Date: Fri, 16 Oct 2020 00:06:02 +0530 Subject: [PATCH 4/4] Updated method to check subscriber --- lib/stomp/connection.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stomp/connection.rb b/lib/stomp/connection.rb index a45a97f..061b88d 100644 --- a/lib/stomp/connection.rb +++ b/lib/stomp/connection.rb @@ -375,7 +375,7 @@ def subscribed?(destination, subId = nil) return false unless @reliable subId = destination if subId.nil? - @subscriptions[subId].present? + !@subscriptions[subId].nil? end # Unsubscribe from a destination. A subscription name is required.