diff --git a/src/config.py b/src/config.py index 987f0ed..16f3566 100644 --- a/src/config.py +++ b/src/config.py @@ -1,18 +1,32 @@ import os + class Config(object): HARENA_LOGGER_FLASK_HOST = os.environ.get('HARENA_LOGGER_FLASK_HOST', '0.0.0.0') HARENA_LOGGER_FLASK_PORT = int(os.environ.get('HARENA_LOGGER_FLASK_PORT', 10030)) - HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', False)) + HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', True)) HARENA_LOGGER_MONGODB_HOST = os.environ.get('HARENA_LOGGER_MONGODB_HOST', 'localhost') HARENA_LOGGER_MONGODB_PORT = int(os.environ.get('HARENA_LOGGER_MONGODB_PORT', 10031)) + HARENA_LOGGER_MONGODB_URL ="mongodb://{0}:{1}/".format(HARENA_LOGGER_MONGODB_HOST, HARENA_LOGGER_MONGODB_PORT) HARENA_LOGGER_MONGODB_DB = os.environ.get('HARENA_LOGGER_MONGODB_DB', 'harena_logger') - HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'executions') + HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'event_logs') + + HARENA_LOGGER_KAFKA_BROKERS = os.environ.get('HARENA_LOGGER_KAFKA_BROKERS', 'kafka1:19092') + HARENA_LOGGER_KAFKA_TOPIC = os.environ.get('HARENA_LOGGER_KAFKA_TOPIC', 'harena-logs') + HARENA_LOGGER_INTERVAL_S = os.environ.get('HARENA_LOGGER_INTERVAL_S', 10) - HARENA_LOGGER_BROKER_HOST = os.environ.get('HARENA_LOGGER_BROKER_HOST', 'localhost') - HARENA_LOGGER_BROKER_PORT = int(os.environ.get('HARENA_LOGGER_BROKER_PORT', 10032)) + # LOGGING SETTINGS + LOGGING_NAME = os.environ.get('LOGGING_NAME', 'harena-logger') + LOGGING_LEVEL = os.environ.get('LOGGING_LEVEL', 'DEBUG') + LOGGING_STYLES = ('info=blue;' + 'warning=green;' + 'error=red;' + 'critical=red,bold;' + 'debug=white') + LOGGING_FORMAT = ('%(asctime) -19s | %(levelname) -8s | %(threadName) -10s | ' + '%(funcName) -16s | %(message)s') \ No newline at end of file diff --git a/src/server.py b/src/server.py index 6454f80..9a7aa6f 100644 --- a/src/server.py +++ b/src/server.py @@ -4,43 +4,126 @@ import random import pymongo import time +import threading +import logging +import coloredlogs + from flask import Flask, request, jsonify from flask_restful import Resource, Api from config import Config from flask_cors import CORS, cross_origin +from kafka import KafkaProducer +from kafka import KafkaConsumer +from kafka.errors import KafkaError + +# To do: Logging not appearing in Docker logs +LOGGER = logging.getLogger(Config.LOGGING_NAME) + +class KafkaMongodbAppender (threading.Thread): + + def __init__(self, mongodb_server_url, mongodb_database, mongodb_collection, kafka_consumer, topic, delay): + threading.Thread.__init__(self) + self.mongodb_server_url = mongodb_server_url + self.mongodb_database = mongodb_database + self.mongodb_collection = mongodb_collection + self.kafka_consumer = kafka_consumer + self.topic = topic + self.delay = delay + LOGGER.debug(mongodb_server_url) + + def run(self): + + LOGGER.info("Starting KafkaMongodbAppender") + + while True: + + # Opening and closing the connection during streamming checking. + # Adopting this since some memory problems appeared after long term one connection management + mongodb_client = pymongo.MongoClient(self.mongodb_server_url) + mongodb_db = mongodb_client[self.mongodb_database] + mongodb_collection = mongodb_db[self.mongodb_collection] + + print("Checking for newly streamed messages...") + + for message in self.kafka_consumer: + #print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) + print("Found {} events inside the message ".format(len(message.value['harena-log-stream']))) + for event in message.value['harena-log-stream']: + mongodb_collection.insert_one(event) + + + + print("Message stream timed out. Waiting {} seconds until next verification...".format(self.delay)) + mongodb_client.close() + time.sleep(self.delay) + + # mongodb_client = pymongo.MongoClient(web_app.config['HARENA_LOGGER_MONGODB_URL']) + # mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']] + # mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']] + + +# while True: + + + + +# def connect_to_mongodb(threadName, counter, delay): + +# def disconnect_from_mongodb(threadName, counter, delay): + +# def consume_from_kafka_and_append_to_mongodb(kafka_brokers, delay): + +# def get_num_of_messages(kafka_brokers, delay): + + + + + class IndexResource(Resource): - def __init__(self,broker,mongodb_client): - self.broker = broker + def __init__(self, kafka_producer): + self.kafka_producer=kafka_producer + + LOGGER.debug("IndexResource initialized") + def get(self): message = {"message": "Welcome to the Harena Logger module", - "broker_status" : broker.__repr__(), - "database_status":mongodb_client.server_info()['ok'] - + "kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected() } return message class HarenaMessageResource(Resource): - def __init__(self, broker, mongodb_collection): - self.broker = broker - self.mongodb_collection = mongodb_collection + def __init__(self, kafka_producer, target_topic): + self.kafka_producer = kafka_producer + self.target_topic=target_topic + + @cross_origin(origin='*') def post(self): + + # To do: properly evaluate message body parsing message = request.get_json() - print(json.dumps(message)) - topic = message['topic'] - payload = message['payload'] + message['server_timestamp'] = "{}".format(int(round(time.time() * 1000))) + + # Asynchronous by default + future = self.kafka_producer.send(self.target_topic, json.dumps(message).encode('utf-8')) - message['timestamp'] = "{}".format(int(round(time.time() * 1000))) + # Block for 'synchronous' sends + try: + record_metadata = future.get(timeout=10) + except KafkaError: + # Decide what to do if produce request failed... + log.exception() + pass - broker_publishing_flag = self.broker.publish(topic,json.dumps(payload)) - mongodb_insertion_flag = self.mongodb_collection.insert_one(message) + # Successful result returns assigned partition and offset + # print(future) data = {"message":'Message published successfully'} @@ -49,42 +132,54 @@ def post(self): @cross_origin(origin='*') def get(self): - message = {"message": "message streaming is up" - + message = {"message": "message streaming is up", + "kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected() + } return message - @cross_origin(origin='*') - def delete(self): - self.mongodb_collection.delete_many({}) - data = {"message":'Messages in the execution stream deleted successfully'} +if __name__ == '__main__': - return jsonify(data) + kafka_producer = None + kafka_consumer = None -if __name__ == '__main__': - web_app = Flask(__name__) - web_app.config.from_object(Config) - CORS(web_app) - api = Api(web_app) + while True: + try: + kafka_producer = KafkaProducer(bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS) + + kafka_consumer = KafkaConsumer(Config.HARENA_LOGGER_KAFKA_TOPIC, group_id='harena-logger-consumer', + bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + consumer_timeout_ms=10000) + break + except: + pass - mongodb_client = pymongo.MongoClient("mongodb://{0}:{1}/" - .format(web_app.config['HARENA_LOGGER_MONGODB_HOST'], \ - web_app.config['HARENA_LOGGER_MONGODB_PORT'])) + print("Could not exchange metadata with Kafka bootstrap servers for the first time. Retrying...") + time.sleep(1) - mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']] - mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']] - broker = paho.Client("publisher{0}".format(random.randint(0,99999999)) ) - broker.connect(web_app.config['HARENA_LOGGER_BROKER_HOST'], - web_app.config['HARENA_LOGGER_BROKER_PORT']) - broker.reconnect_delay_set(min_delay=1, max_delay=20) - api.add_resource(IndexResource, '/', resource_class_args=[broker,mongodb_client]) - api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[broker,mongodb_collection]) + consumer_thread = KafkaMongodbAppender(mongodb_server_url=Config.HARENA_LOGGER_MONGODB_URL, + mongodb_database=Config.HARENA_LOGGER_MONGODB_DB, + mongodb_collection=Config.HARENA_LOGGER_MONGODB_COLLECTION, + kafka_consumer=kafka_consumer, + topic=Config.HARENA_LOGGER_KAFKA_TOPIC, + delay=Config.HARENA_LOGGER_INTERVAL_S) + consumer_thread.start() + + + # Web Service for appending + web_app = Flask(__name__) + web_app.config.from_object(Config) + CORS(web_app) + api = Api(web_app) + api.add_resource(IndexResource, '/', resource_class_args=[kafka_producer]) + api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[kafka_producer, Config.HARENA_LOGGER_KAFKA_TOPIC]) web_app.run(host=web_app.config['HARENA_LOGGER_FLASK_HOST'], port=web_app.config['HARENA_LOGGER_FLASK_PORT'], debug=web_app.config['HARENA_LOGGER_FLASK_DEBUG'])