Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] use celery task send data to pulsar, init pulsar client error #150

Open
dengshaochun opened this issue Aug 29, 2023 · 3 comments
Open

Comments

@dengshaochun
Copy link

python: 3.6
pulsar-client: pulsar-client[avro]==2.10.2
celery: 5.1.2

code example:

#!/usr/bin/env python

import time
import random
import string
from pulsar import Client, CompressionType
from pulsar.schema import AvroSchema, Record, String, Integer


def generate_random_string(length=6):
    charset = string.ascii_letters + string.digits
    random_chars = random.choices(charset, k=length)
    random_string = "".join(random_chars)
    return random_string.capitalize()


class User(Record):
    name = String()
    age = Integer


UserAvroSchema = AvroSchema(User)  # type: ignore


def gen_random_data():
    return User(user=generate_random_string(), age=random.randint(0, 100))


class PulsarDemo(object):
    def __init__(self) -> None:
        self.SERVICE_URL = "pulsar://***"
        self.TOPIC = "persistent://****"
        client = Client(service_url=self.SERVICE_URL)
        self.producer = client.create_producer(
            topic=self.TOPIC,
            schema=UserAvroSchema,
            batching_enabled=True,
            batching_max_messages=1000,
            batching_max_publish_delay_ms=1000,
            compression_type=CompressionType.SNAPPY,  # type: ignore
        )

    def send_callback(self, send_result, msg_id):
        print("Message published: result:{}  msg_id:{}".format(send_result, msg_id))

    def async_producer(self, cnt=1000):
        while cnt >= 0:
            data = gen_random_data()
            self.producer.send_async(
                data,
                callback=self.send_callback,
            )
            time.sleep(0.01)
            cnt -= 1
        self.producer.flush()


# celery task
from celery import shared_task
@shared_task
def mock_data2pulsar(cnt=1000):
    mock = PulsarDemo()
    mock.async_producer()

an exception occurred at:

[2023-08-29 10:27:46,933: ERROR/ForkPoolWorker-31] Pulsar error: TopicNotFound
@BewareMyPower
Copy link
Contributor

I'm new to celery. Could you explain how to run the script you provided?

@dengshaochun
Copy link
Author

It cannot be fully reproduced, but the online environment has always been problematic. The test of a separate celery + pulsar code is normal. At present, there is no idea. The online environment reports the following errors:

 p._producer = self._client.create_producer(topic, conf)
_pulsar.TopicNotFound: Pulsar error: TopicNotFound
0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 WARN  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 ERROR [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [0000-00-00 00:00:00.000 INFO  [

The online environment only has problems when calling celery.delay, and there will be no exception when calling the function directly,as follows

# ok
mock_data2pulsar(cnt=1000)

# exception
mock_data2pulsar.delay(cnt=1000)

debug pulsar/__init__.py print topic and conf before p._producer = self._client.create_producer(topic, conf) looks ok

@gromsterus
Copy link

Hi @dengshaochun. I tried to reproduce the bug but the code works. There are suspicions that the problem is in the name of topic.

Here my example: https://github.com/gromsterus/issues-sandbox/tree/main/pulsar-client-python-150
I use persistent://public/default/test-celery.

If you provide the complete code with celery initialization it will be easier to help 🤝

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants