Skip to content

Commit

Permalink
tests: remove explicit confluent address
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 7, 2024
1 parent 6f2ebb8 commit 741a22a
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
broker = KafkaBroker()
app = FastStream(broker)


Expand All @@ -12,5 +12,5 @@ async def handle(msg: str):

@app.after_startup
async def test():
async with KafkaBroker("localhost:9092") as br:
async with KafkaBroker() as br:
await br.publish("Hi!", topic="test-confluent-topic")
15 changes: 5 additions & 10 deletions tests/opentelemetry/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class LocalTelemetryTestcase(BaseTestcaseConfig):
messaging_system: str
include_messages_counters: bool
broker_class: Type[BrokerUsecase]
resource: Resource = Resource.create(
attributes={"service.name": "faststream.test"})
resource: Resource = Resource.create(attributes={"service.name": "faststream.test"})

telemetry_middleware_class: TelemetryMiddleware

Expand Down Expand Up @@ -237,14 +236,10 @@ async def handler2(m):
parent_span_id = create.context.span_id

self.assert_span(create, Action.CREATE, first_queue, msg)
self.assert_span(pub1, Action.PUBLISH,
first_queue, msg, parent_span_id)
self.assert_span(proc1, Action.PROCESS,
first_queue, msg, parent_span_id)
self.assert_span(pub2, Action.PUBLISH, second_queue,
msg, proc1.context.span_id)
self.assert_span(proc2, Action.PROCESS,
second_queue, msg, parent_span_id)
self.assert_span(pub1, Action.PUBLISH, first_queue, msg, parent_span_id)
self.assert_span(proc1, Action.PROCESS, first_queue, msg, parent_span_id)
self.assert_span(pub2, Action.PUBLISH, second_queue, msg, proc1.context.span_id)
self.assert_span(proc2, Action.PROCESS, second_queue, msg, parent_span_id)

assert (
create.start_time
Expand Down
3 changes: 3 additions & 0 deletions tests/opentelemetry/confluent/test_confluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async def test_batch(
expected_link_attrs = {"messaging.batch.message_count": 3}

args, kwargs = self.get_subscriber_params(queue, batch=True)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
Expand Down Expand Up @@ -132,6 +133,7 @@ async def test_batch_publish_with_single_consume(
expected_pub_batch_count = 1

args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(msg):
await msgs_queue.put(msg)
Expand Down Expand Up @@ -191,6 +193,7 @@ async def test_single_publish_with_batch_consume(
expected_process_batch_count = 1

args, kwargs = self.get_subscriber_params(queue, batch=True)

@broker.subscriber(*args, **kwargs)
async def handler(m):
m.sort()
Expand Down
3 changes: 3 additions & 0 deletions tests/opentelemetry/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def test_batch(
expected_link_attrs = {"messaging.batch.message_count": 3}

args, kwargs = self.get_subscriber_params(queue, batch=True)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
Expand Down Expand Up @@ -133,6 +134,7 @@ async def test_batch_publish_with_single_consume(
expected_pub_batch_count = 1

args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(msg):
await msgs_queue.put(msg)
Expand Down Expand Up @@ -192,6 +194,7 @@ async def test_single_publish_with_batch_consume(
expected_process_batch_count = 1

args, kwargs = self.get_subscriber_params(queue, batch=True)

@broker.subscriber(*args, **kwargs)
async def handler(m):
m.sort()
Expand Down
5 changes: 3 additions & 2 deletions tests/opentelemetry/nats/test_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ async def test_batch(
expected_span_count = 4
expected_proc_batch_count = 1

@broker.subscriber(
args, kwargs = self.get_subscriber_params(
queue,
stream=stream,
pull_sub=PullSub(1, batch=True, timeout=30.0),
**self.subscriber_kwargs,
)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down
12 changes: 9 additions & 3 deletions tests/opentelemetry/redis/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ async def test_batch(
expected_link_count = 1
expected_link_attrs = {"messaging.batch.message_count": 3}

@broker.subscriber(list=ListSub(queue, batch=True), **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True))

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down Expand Up @@ -97,7 +99,9 @@ async def test_batch_publish_with_single_consume(
expected_span_count = 8
expected_pub_batch_count = 1

@broker.subscriber(list=ListSub(queue), **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(list=ListSub(queue))

@broker.subscriber(*args, **kwargs)
async def handler(msg):
await msgs_queue.put(msg)

Expand Down Expand Up @@ -155,7 +159,9 @@ async def test_single_publish_with_batch_consume(
expected_span_count = 6
expected_process_batch_count = 1

@broker.subscriber(list=ListSub(queue, batch=True), **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True))

@broker.subscriber(*args, **kwargs)
async def handler(m):
m.sort()
mock(m)
Expand Down

0 comments on commit 741a22a

Please sign in to comment.