diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 6c4df02f..fabf4ab4 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -446,7 +446,8 @@ def create_consumer(client_id, group_instance_id) props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s) if schema_registry_proxy && !schema_registry_proxy.empty? props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host) - props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port) + # Java Kafka client requires port to be a 32 bit int + props.put(serdes_config::PROXY_PORT, @schema_registry_proxy_port.to_java(Java::int)) end if schema_registry_key && !schema_registry_key.empty? props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO') diff --git a/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb b/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb index 670231f6..5561087a 100644 --- a/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb +++ b/lib/logstash/plugin_mixins/kafka/avro_schema_registry.rb @@ -22,7 +22,7 @@ def setup_schema_registry_config # Option to set the proxy of the Schema Registry. # This option permits to define a proxy to be used to reach the schema registry service instance. - config :schema_registry_proxy, :validate => :uri + config :schema_registry_proxy, :validate => :string # If schema registry client authentication is required, this setting stores the keystore path. config :schema_registry_ssl_keystore_location, :validate => :string @@ -80,8 +80,8 @@ def check_for_schema_registry_conflicts private def check_for_schema_registry_connectivity_and_subjects options = {} - if schema_registry_proxy && !schema_registry_proxy.empty? - options[:proxy] = schema_registry_proxy.to_s + if @schema_registry_proxy_host + options[:proxy] = { host: @schema_registry_proxy_host, port: @schema_registry_proxy_port } end if schema_registry_key and !schema_registry_key.empty? options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value} @@ -99,13 +99,7 @@ def check_for_schema_registry_connectivity_and_subjects options[:ssl][:keystore_type] = schema_registry_ssl_keystore_type unless schema_registry_ssl_keystore_type.nil? end - client = Manticore::Client.new(options) - begin - response = client.get(@schema_registry_url.uri.to_s + '/subjects').body - rescue Manticore::ManticoreException => e - raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}") - end - registered_subjects = JSON.parse response + registered_subjects = retrieve_subjects(options) expected_subjects = @topics.map { |t| "#{t}-value"} if (expected_subjects & registered_subjects).size != expected_subjects.size undefined_topic_subjects = expected_subjects - registered_subjects @@ -113,18 +107,23 @@ def check_for_schema_registry_connectivity_and_subjects end end + def retrieve_subjects(options) + client = Manticore::Client.new(options) + response = client.get(@schema_registry_url.uri.to_s + '/subjects').body + JSON.parse response + rescue Manticore::ManticoreException => e + raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}") + end + def split_proxy_into_host_and_port(proxy_uri) return nil unless proxy_uri && !proxy_uri.empty? - port = proxy_uri.port + proxy_uri = LogStash::Util::SafeURI.new(proxy_uri) - host_spec = "" - host_spec << proxy_uri.scheme || "http" - host_spec << "://" - host_spec << "#{proxy_uri.userinfo}@" if proxy_uri.userinfo - host_spec << proxy_uri.host + port = proxy_uri.port + host = proxy_uri.host - [host_spec, port] + [host, port] end def check_for_key_and_secret diff --git a/spec/unit/inputs/kafka_spec.rb b/spec/unit/inputs/kafka_spec.rb index 4fbb5ae6..c1b60864 100644 --- a/spec/unit/inputs/kafka_spec.rb +++ b/spec/unit/inputs/kafka_spec.rb @@ -287,6 +287,22 @@ end end end + context "when schema_registry_proxy is enabled" do + let(:schema_registry_proxy_url) { "http://myproxy:3333" } + let(:config) { base_config.merge('schema_registry_proxy' => schema_registry_proxy_url) } + + it "doesn't raise error on register" do + expect(subject).to receive(:check_for_schema_registry_connectivity_and_subjects) + expect { subject.register }.to_not raise_error + end + + it "extracts the correct proxy host and port" do + expect(subject).to receive(:check_for_schema_registry_connectivity_and_subjects) + subject.register + expect(subject.instance_variable_get('@schema_registry_proxy_host')).to eq('myproxy') + expect(subject.instance_variable_get('@schema_registry_proxy_port')).to eq(3333) + end + end end context "decorate_events" do