Skip to content

Commit

Permalink
restart kafka fetcher after exception
Browse files Browse the repository at this point in the history
  • Loading branch information
trim21 committed Sep 29, 2024
1 parent f60a0a0 commit f2f8039
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import html
import http
import logging
import os
import secrets
import sys
import time
Expand Down Expand Up @@ -215,25 +214,26 @@ async def read_from_db(self) -> None:

@sslog.logger.catch
def __watch_kafka_messages(self, loop: asyncio.AbstractEventLoop) -> None:
try:
logger.info("start watching kafka message")
consumer = KafkaConsumer(
"debezium.chii.bangumi.chii_members",
"debezium.chii.bangumi.chii_notify",
)
for msg in consumer:
match msg.topic:
case "debezium.chii.bangumi.chii_members":
asyncio.run_coroutine_threadsafe(
self.__pm_queue.put(msg), loop
).result()
case "debezium.chii.bangumi.chii_notify":
asyncio.run_coroutine_threadsafe(
self.__notify_queue.put(msg), loop
).result()
except Exception:
logger.exception("failed to watch kafka message, exit process")
os._exit(1)
logger.info("start watching kafka message")
consumer = KafkaConsumer(
"debezium.chii.bangumi.chii_members",
"debezium.chii.bangumi.chii_notify",
)

while True:
try:
for msg in consumer:
match msg.topic:
case "debezium.chii.bangumi.chii_members":
asyncio.run_coroutine_threadsafe(
self.__pm_queue.put(msg), loop
).result()
case "debezium.chii.bangumi.chii_notify":
asyncio.run_coroutine_threadsafe(
self.__notify_queue.put(msg), loop
).result()
except Exception:
logger.exception("failed to fetch kafka message")

async def __handle_new_notify(self) -> None:
while True:
Expand Down

0 comments on commit f2f8039

Please sign in to comment.