diff --git a/src/config.py b/src/config.py index 16f3566..7035a85 100644 --- a/src/config.py +++ b/src/config.py @@ -15,7 +15,7 @@ class Config(object): HARENA_LOGGER_KAFKA_BROKERS = os.environ.get('HARENA_LOGGER_KAFKA_BROKERS', 'kafka1:19092') HARENA_LOGGER_KAFKA_TOPIC = os.environ.get('HARENA_LOGGER_KAFKA_TOPIC', 'harena-logs') - HARENA_LOGGER_INTERVAL_S = os.environ.get('HARENA_LOGGER_INTERVAL_S', 10) + HARENA_LOGGER_INTERVAL_S = int(os.environ.get('HARENA_LOGGER_INTERVAL_S', 10)) # LOGGING SETTINGS diff --git a/src/server.py b/src/server.py index 9a7aa6f..c705ea6 100644 --- a/src/server.py +++ b/src/server.py @@ -43,43 +43,20 @@ def run(self): mongodb_db = mongodb_client[self.mongodb_database] mongodb_collection = mongodb_db[self.mongodb_collection] - print("Checking for newly streamed messages...") + print("Checking for newly streamed messages during at least {} seconds...".format(self.delay)) for message in self.kafka_consumer: #print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) - print("Found {} events inside the message ".format(len(message.value['harena-log-stream']))) + print("Found {} events inside a message ".format(len(message.value['harena-log-stream']))) + for event in message.value['harena-log-stream']: mongodb_collection.insert_one(event) - - print("Message stream timed out. Waiting {} seconds until next verification...".format(self.delay)) + print("Waiting time ({} seconds) for new messages ended.".format(self.delay)) mongodb_client.close() time.sleep(self.delay) - # mongodb_client = pymongo.MongoClient(web_app.config['HARENA_LOGGER_MONGODB_URL']) - # mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']] - # mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']] - - -# while True: - - - - -# def connect_to_mongodb(threadName, counter, delay): - -# def disconnect_from_mongodb(threadName, counter, delay): - -# def consume_from_kafka_and_append_to_mongodb(kafka_brokers, delay): - -# def get_num_of_messages(kafka_brokers, delay): - - - - - - class IndexResource(Resource): @@ -153,7 +130,7 @@ def get(self): kafka_consumer = KafkaConsumer(Config.HARENA_LOGGER_KAFKA_TOPIC, group_id='harena-logger-consumer', bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS, value_deserializer=lambda m: json.loads(m.decode('utf-8')), - consumer_timeout_ms=10000) + consumer_timeout_ms=Config.HARENA_LOGGER_INTERVAL_S*1000) break except: pass