Skip to content

Commit

Permalink
Merge branch 'master' into 450
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Mensfeld <[email protected]>
  • Loading branch information
mensfeld authored Nov 22, 2024
2 parents 598b40f + 9a09a68 commit cc92377
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 12 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
- [Enhancement] Extract producers tracking `sync_threshold` into an internal config.
- [Enhancement] Support complex Pro license loading strategies (Pro).
- [Enhancement] Change default `retention.ms` for the metrics topic to support Redpanda Cloud defaults (#450).
- [Enhancement] Include subscription group id in the consumers error tracking metadata.
- [Enhancement] Collect metadata details of low level client errors when error tracking.
- [Enhancement] Collect metadata details of low level listener errors when error tracking.
- [Fix] Toggle menu button post-turbo refresh stops working.

## 0.10.3 (2024-09-17)
Expand Down
47 changes: 38 additions & 9 deletions lib/karafka/web/tracking/consumers/listeners/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ class Errors < Base
#
# @param event [Karafka::Core::Monitoring::Event]
def on_error_occurred(event)
track do |sampler|
# Collect extra info if it was a consumer related error.
# Those come from user code
details = if event[:caller].is_a?(Karafka::BaseConsumer)
extract_consumer_info(event[:caller])
else
{}
end
caller_ref = event[:caller]

# Collect extra info if it was a consumer related error.
# Those come from user code
details = case caller_ref
when Karafka::BaseConsumer
extract_consumer_info(caller_ref)
when Karafka::Connection::Client
extract_client_info(caller_ref)
when Karafka::Connection::Listener
extract_listener_info(caller_ref)
else
{}
end

error_class, error_message, backtrace = extract_error_info(event[:error])
error_class, error_message, backtrace = extract_error_info(event[:error])

track do |sampler|
sampler.errors << {
schema_version: SCHEMA_VERSION,
type: event[:type],
Expand Down Expand Up @@ -70,6 +77,7 @@ def extract_consumer_info(consumer)
{
topic: consumer.topic.name,
consumer_group: consumer.topic.consumer_group.id,
subscription_group: consumer.topic.subscription_group.id,
partition: consumer.partition,
first_offset: consumer.messages.metadata.first_offset,
last_offset: consumer.messages.metadata.last_offset,
Expand All @@ -80,6 +88,27 @@ def extract_consumer_info(consumer)
tags: consumer.tags
}
end

# @param client [::Karafka::Connection::Client]
# @return [Hash] hash with client specific info for details of error
def extract_client_info(client)
{
consumer_group: client.subscription_group.consumer_group.id,
subscription_group: client.subscription_group.id,
name: client.name,
id: client.id
}
end

# @param listener [::Karafka::Connection::Listener]
# @return [Hash] hash with listener specific info for details of error
def extract_listener_info(listener)
{
consumer_group: listener.subscription_group.consumer_group.id,
subscription_group: listener.subscription_group.id,
id: listener.id
}
end
end
end
end
Expand Down
137 changes: 134 additions & 3 deletions spec/lib/karafka/web/tracking/consumers/listeners/errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,161 @@
let(:event) do
{
type: 'error.occurred',
error: error
error: error,
caller: caller_ref
}
end

let(:consumer_group) do
instance_double(
Karafka::Routing::ConsumerGroup,
id: 'group1'
)
end

let(:subscription_group) do
instance_double(
Karafka::Routing::SubscriptionGroup,
id: 'sub1',
consumer_group: consumer_group
)
end

describe '#on_error_occurred' do
let(:topic) do
instance_double(
Karafka::Routing::Topic,
name: 'topic_name',
consumer_group: consumer_group,
subscription_group: subscription_group
)
end

context 'when error message string is frozen' do
let(:caller_ref) { nil }

it 'expect to process it without problems' do
expect { listener.on_error_occurred(event) }.not_to raise_error
end
end

context 'when caller is a consumer' do
let(:messages_metadata) do
instance_double(Karafka::Messages::BatchMetadata, first_offset: 5, last_offset: 10)
end

let(:messages) do
instance_double(Karafka::Messages::Messages, metadata: messages_metadata)
end

let(:topic) do
instance_double(
Karafka::Routing::Topic,
name: 'topic_name',
consumer_group: consumer_group,
subscription_group: subscription_group
)
end

let(:coordinator) { instance_double(Karafka::Processing::Coordinator, seek_offset: 100) }
let(:caller_ref) { Karafka::BaseConsumer.new }

before do
allow(caller_ref).to receive(:topic).and_return(topic)
allow(caller_ref).to receive(:partition).and_return(0)
allow(caller_ref).to receive(:messages).and_return(messages)
allow(caller_ref).to receive(:coordinator).and_return(coordinator)
allow(caller_ref).to receive(:tags).and_return(['tag1'])
end

it 'expect to include consumer specific details' do
listener.on_error_occurred(event)
error_details = sampler.errors.last[:details]

expect(error_details).to include(
topic: 'topic_name',
consumer_group: 'group1',
subscription_group: 'sub1',
partition: 0,
first_offset: 5,
last_offset: 10,
committed_offset: 99,
tags: ['tag1']
)
end

context 'when seek_offset is nil' do
let(:coordinator) { instance_double(Karafka::Processing::Coordinator, seek_offset: nil) }

it 'expect to set committed_offset to -1001' do
listener.on_error_occurred(event)
expect(sampler.errors.last[:details][:committed_offset]).to eq(-1001)
end
end
end

context 'when caller is a client' do
let(:caller_ref) do
Karafka::Connection::Client.new(
subscription_group,
nil
)
end

it 'expect to include client specific details' do
listener.on_error_occurred(event)
error_details = sampler.errors.last[:details]

expect(error_details).to include(
consumer_group: 'group1',
subscription_group: 'sub1',
name: ''
)
end
end

context 'when caller is a listener' do
before { allow(subscription_group).to receive(:topics).and_return([topic]) }

let(:caller_ref) do
Karafka::Connection::Listener.new(
subscription_group,
Karafka::Processing::JobsQueue.new,
nil
)
end

it 'expect to include listener specific details' do
listener.on_error_occurred(event)
error_details = sampler.errors.last[:details]

expect(error_details).to include(
consumer_group: 'group1',
subscription_group: 'sub1'
)
end
end

context 'when caller is unknown' do
let(:caller_ref) { Object.new }

it 'expect to include empty details' do
listener.on_error_occurred(event)
expect(sampler.errors.last[:details]).to eq({})
end
end
end

describe '#on_dead_letter_queue_dispatched' do
it 'expect to increase the dlq counter' do
listener.on_dead_letter_queue_dispatched(nil)

expect(sampler.counters[:dead]).to eq(1)
end
end

describe '#on_consumer_consuming_retry' do
it 'expect to increase the retry counter' do
listener.on_consumer_consuming_retry(nil)

expect(sampler.counters[:retries]).to eq(1)
end
end
Expand Down

0 comments on commit cc92377

Please sign in to comment.