diff --git a/Dockerfile b/Dockerfile index f2cdfca..cc0fa27 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,4 +9,4 @@ RUN pip3 install --upgrade -r requirements.txt ADD ./src . -CMD ["python3", "server.py"] +CMD ["python3", "-u", "server.py"] diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml new file mode 100644 index 0000000..f8e25c0 --- /dev/null +++ b/docker-compose-dev.yml @@ -0,0 +1,150 @@ +version: '3' + +services: + + + harena-logger: + build: . + environment: + - HARENA_LOGGER_FLASK_HOST=0.0.0.0 + - HARENA_LOGGER_FLASK_PORT=10030 + - HARENA_LOGGER_FLASK_DEBUG=True + - FLASK_DEBUG=True + - FLASK_ENV=development + - HARENA_LOGGER_MONGODB_HOST=mongodb + - HARENA_LOGGER_MONGODB_PORT=27017 + - HARENA_LOGGER_MONGODB_DB=harena_logger + - HARENA_LOGGER_MONGODB_COLLECTION=event_logs + - HARENA_LOGGER_KAFKA_BROKERS=kafka1:19092 + - PYTHONUNBUFFERED=1 + - PYTHONIOENCODING=UTF-8 + + ports: + - 10030:10030 + depends_on: + - kafka1 + - mongodb + - zoo1 + restart: always + networks: + - harena-logger + volumes: + - ./src:/app/src + + + mongodb: + image: mongo:latest + environment: + - MONGO_DATA_DIR=/data/db + - MONGO_LOG_DIR=/dev/null + ports: + - 10031:27017 + volumes: + - harena_logger_mongodb_data:/data/db + # command: mongod --smallfiles --logpath=/dev/null # --quiet + networks: + - harena-logger + + + zoo1: + image: zookeeper:3.4.9 + hostname: zoo1 + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zoo1:2888:3888 + volumes: + - harena_logger_kafka_zoo1_data:/data + - harena_logger_kafka_zoo1_datalog:/datalog + networks: + - harena-logger + + + kafka1: + image: confluentinc/cp-kafka:5.5.1 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + volumes: + - harena_logger_kafka1_data:/var/lib/kafka/data + depends_on: + - zoo1 + networks: + - harena-logger + + # kafka-connect: + # image: confluentinc/cp-kafka-connect:5.1.2 + # build: + # context: . + # dockerfile: Dockerfile + # hostname: kafka-connect + # container_name: kafka-connect + # depends_on: + # - zoo1 + # - kafka1 + # ports: + # - "8083:8083" + + # environment: + # CONNECT_BOOTSTRAP_SERVERS: 'kafka1:29092' + # CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + # CONNECT_REST_PORT: 8083 + # CONNECT_GROUP_ID: compose-connect-group + # CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + # CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + # CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 + # CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + # CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + # CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + # CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + # CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + # CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + # CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + # CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + # CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + # CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG" + # CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components + # CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' + # # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image + # CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar + # CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" + # CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" + # command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'" + # volumes: + # - ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb + # networks: + # - harena-logger + + + kafdrop: + image: obsidiandynamics/kafdrop:latest + depends_on: + - kafka1 + ports: + - 9000:9000 + environment: + KAFKA_BROKERCONNECT: kafka1:19092 + networks: + - harena-logger + + +volumes: + harena_logger_mongodb_data: + harena_logger_kafka_zoo1_data: + harena_logger_kafka_zoo1_datalog: + harena_logger_kafka1_data: + + +networks: + harena-logger: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index bed6999..1028f32 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,24 +8,29 @@ services: environment: - HARENA_LOGGER_FLASK_HOST=0.0.0.0 - HARENA_LOGGER_FLASK_PORT=10030 - - HARENA_LOGGER_FLASK_DEBUG=False - - HARENA_LOGGER_BROKER_HOST=harena-logger-broker - - HARENA_LOGGER_BROKER_PORT=1883 - - HARENA_LOGGER_MONGODB_HOST=harena-logger-rawdata + - HARENA_LOGGER_FLASK_DEBUG=True + - FLASK_DEBUG=True + - FLASK_ENV=production + - HARENA_LOGGER_MONGODB_HOST=mongodb - HARENA_LOGGER_MONGODB_PORT=27017 - HARENA_LOGGER_MONGODB_DB=harena_logger - - HARENA_LOGGER_MONGODB_COLLECTION=executions + - HARENA_LOGGER_MONGODB_COLLECTION=event_logs + - HARENA_LOGGER_KAFKA_BROKERS=kafka1:19092 + - PYTHONUNBUFFERED=1 + - PYTHONIOENCODING=UTF-8 ports: - 10030:10030 depends_on: - - harena-logger-broker - - harena-logger-rawdata + - kafka1 + - mongodb + - zoo1 + restart: always restart: always networks: - harena-logger - harena-logger-rawdata: + mongodb: image: mongo:latest environment: - MONGO_DATA_DIR=/data/db @@ -33,23 +38,109 @@ services: ports: - 10031:27017 volumes: - - harena_logger_rawdata:/data/db + - harena_logger_mongodb_data:/data/db # command: mongod --smallfiles --logpath=/dev/null # --quiet networks: - harena-logger - harena-logger-broker: - image: eclipse-mosquitto + zoo1: + image: zookeeper:3.4.9 + hostname: zoo1 ports: - - 10032:1883 - restart: always + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zoo1:2888:3888 + volumes: + - harena_logger_kafka_zoo1_data:/data + - harena_logger_kafka_zoo1_datalog:/datalog + networks: + - harena-logger + + + kafka1: + image: confluentinc/cp-kafka:5.5.1 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + volumes: + - harena_logger_kafka1_data:/var/lib/kafka/data + depends_on: + - zoo1 + networks: + - harena-logger + + kafka-connect: + image: confluentinc/cp-kafka-connect:5.1.2 + build: + context: . + dockerfile: Dockerfile + hostname: kafka-connect + container_name: kafka-connect + depends_on: + - zoo1 + - kafka1 + ports: + - "8083:8083" + + environment: + CONNECT_BOOTSTRAP_SERVERS: 'kafka1:29092' + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG" + CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components + CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' + # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image + CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar + CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" + CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" + command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'" + volumes: + - ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb + networks: + - harena-logger + + + kafdrop: + image: obsidiandynamics/kafdrop:latest + depends_on: + - kafka1 + ports: + - 9000:9000 + environment: + KAFKA_BROKERCONNECT: kafka1:19092 networks: - harena-logger volumes: - harena_logger_rawdata: + harena_logger_mongodb_data: + harena_logger_kafka_zoo1_data: + harena_logger_kafka_zoo1_datalog: + harena_logger_kafka1_data: networks: diff --git a/requirements.txt b/requirements.txt index dc1706c..f3173d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ flask-restful #flask-migrate paho-mqtt pymongo -kafka \ No newline at end of file +kafka-python +coloredlogs \ No newline at end of file diff --git a/src/config.py b/src/config.py index 987f0ed..7035a85 100644 --- a/src/config.py +++ b/src/config.py @@ -1,18 +1,32 @@ import os + class Config(object): HARENA_LOGGER_FLASK_HOST = os.environ.get('HARENA_LOGGER_FLASK_HOST', '0.0.0.0') HARENA_LOGGER_FLASK_PORT = int(os.environ.get('HARENA_LOGGER_FLASK_PORT', 10030)) - HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', False)) + HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', True)) HARENA_LOGGER_MONGODB_HOST = os.environ.get('HARENA_LOGGER_MONGODB_HOST', 'localhost') HARENA_LOGGER_MONGODB_PORT = int(os.environ.get('HARENA_LOGGER_MONGODB_PORT', 10031)) + HARENA_LOGGER_MONGODB_URL ="mongodb://{0}:{1}/".format(HARENA_LOGGER_MONGODB_HOST, HARENA_LOGGER_MONGODB_PORT) HARENA_LOGGER_MONGODB_DB = os.environ.get('HARENA_LOGGER_MONGODB_DB', 'harena_logger') - HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'executions') + HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'event_logs') + + HARENA_LOGGER_KAFKA_BROKERS = os.environ.get('HARENA_LOGGER_KAFKA_BROKERS', 'kafka1:19092') + HARENA_LOGGER_KAFKA_TOPIC = os.environ.get('HARENA_LOGGER_KAFKA_TOPIC', 'harena-logs') + HARENA_LOGGER_INTERVAL_S = int(os.environ.get('HARENA_LOGGER_INTERVAL_S', 10)) - HARENA_LOGGER_BROKER_HOST = os.environ.get('HARENA_LOGGER_BROKER_HOST', 'localhost') - HARENA_LOGGER_BROKER_PORT = int(os.environ.get('HARENA_LOGGER_BROKER_PORT', 10032)) + # LOGGING SETTINGS + LOGGING_NAME = os.environ.get('LOGGING_NAME', 'harena-logger') + LOGGING_LEVEL = os.environ.get('LOGGING_LEVEL', 'DEBUG') + LOGGING_STYLES = ('info=blue;' + 'warning=green;' + 'error=red;' + 'critical=red,bold;' + 'debug=white') + LOGGING_FORMAT = ('%(asctime) -19s | %(levelname) -8s | %(threadName) -10s | ' + '%(funcName) -16s | %(message)s') \ No newline at end of file diff --git a/src/server.py b/src/server.py index 6454f80..c705ea6 100644 --- a/src/server.py +++ b/src/server.py @@ -4,43 +4,103 @@ import random import pymongo import time +import threading +import logging +import coloredlogs + from flask import Flask, request, jsonify from flask_restful import Resource, Api from config import Config from flask_cors import CORS, cross_origin +from kafka import KafkaProducer +from kafka import KafkaConsumer +from kafka.errors import KafkaError + +# To do: Logging not appearing in Docker logs +LOGGER = logging.getLogger(Config.LOGGING_NAME) + +class KafkaMongodbAppender (threading.Thread): + + def __init__(self, mongodb_server_url, mongodb_database, mongodb_collection, kafka_consumer, topic, delay): + threading.Thread.__init__(self) + self.mongodb_server_url = mongodb_server_url + self.mongodb_database = mongodb_database + self.mongodb_collection = mongodb_collection + self.kafka_consumer = kafka_consumer + self.topic = topic + self.delay = delay + LOGGER.debug(mongodb_server_url) + + def run(self): + + LOGGER.info("Starting KafkaMongodbAppender") + + while True: + + # Opening and closing the connection during streamming checking. + # Adopting this since some memory problems appeared after long term one connection management + mongodb_client = pymongo.MongoClient(self.mongodb_server_url) + mongodb_db = mongodb_client[self.mongodb_database] + mongodb_collection = mongodb_db[self.mongodb_collection] + + print("Checking for newly streamed messages during at least {} seconds...".format(self.delay)) + + for message in self.kafka_consumer: + #print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) + print("Found {} events inside a message ".format(len(message.value['harena-log-stream']))) + + for event in message.value['harena-log-stream']: + mongodb_collection.insert_one(event) + + + print("Waiting time ({} seconds) for new messages ended.".format(self.delay)) + mongodb_client.close() + time.sleep(self.delay) class IndexResource(Resource): - def __init__(self,broker,mongodb_client): - self.broker = broker + def __init__(self, kafka_producer): + self.kafka_producer=kafka_producer + + LOGGER.debug("IndexResource initialized") + def get(self): message = {"message": "Welcome to the Harena Logger module", - "broker_status" : broker.__repr__(), - "database_status":mongodb_client.server_info()['ok'] - + "kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected() } return message class HarenaMessageResource(Resource): - def __init__(self, broker, mongodb_collection): - self.broker = broker - self.mongodb_collection = mongodb_collection + def __init__(self, kafka_producer, target_topic): + self.kafka_producer = kafka_producer + self.target_topic=target_topic + + @cross_origin(origin='*') def post(self): + + # To do: properly evaluate message body parsing message = request.get_json() - print(json.dumps(message)) - topic = message['topic'] - payload = message['payload'] + message['server_timestamp'] = "{}".format(int(round(time.time() * 1000))) + + # Asynchronous by default + future = self.kafka_producer.send(self.target_topic, json.dumps(message).encode('utf-8')) - message['timestamp'] = "{}".format(int(round(time.time() * 1000))) + # Block for 'synchronous' sends + try: + record_metadata = future.get(timeout=10) + except KafkaError: + # Decide what to do if produce request failed... + log.exception() + pass - broker_publishing_flag = self.broker.publish(topic,json.dumps(payload)) - mongodb_insertion_flag = self.mongodb_collection.insert_one(message) + # Successful result returns assigned partition and offset + # print(future) data = {"message":'Message published successfully'} @@ -49,42 +109,54 @@ def post(self): @cross_origin(origin='*') def get(self): - message = {"message": "message streaming is up" - + message = {"message": "message streaming is up", + "kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected() + } return message - @cross_origin(origin='*') - def delete(self): - self.mongodb_collection.delete_many({}) - data = {"message":'Messages in the execution stream deleted successfully'} +if __name__ == '__main__': - return jsonify(data) + kafka_producer = None + kafka_consumer = None -if __name__ == '__main__': - web_app = Flask(__name__) - web_app.config.from_object(Config) - CORS(web_app) - api = Api(web_app) + while True: + try: + kafka_producer = KafkaProducer(bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS) + + kafka_consumer = KafkaConsumer(Config.HARENA_LOGGER_KAFKA_TOPIC, group_id='harena-logger-consumer', + bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + consumer_timeout_ms=Config.HARENA_LOGGER_INTERVAL_S*1000) + break + except: + pass + + print("Could not exchange metadata with Kafka bootstrap servers for the first time. Retrying...") + time.sleep(1) + - mongodb_client = pymongo.MongoClient("mongodb://{0}:{1}/" - .format(web_app.config['HARENA_LOGGER_MONGODB_HOST'], \ - web_app.config['HARENA_LOGGER_MONGODB_PORT'])) - mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']] - mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']] - broker = paho.Client("publisher{0}".format(random.randint(0,99999999)) ) - broker.connect(web_app.config['HARENA_LOGGER_BROKER_HOST'], - web_app.config['HARENA_LOGGER_BROKER_PORT']) - broker.reconnect_delay_set(min_delay=1, max_delay=20) + consumer_thread = KafkaMongodbAppender(mongodb_server_url=Config.HARENA_LOGGER_MONGODB_URL, + mongodb_database=Config.HARENA_LOGGER_MONGODB_DB, + mongodb_collection=Config.HARENA_LOGGER_MONGODB_COLLECTION, + kafka_consumer=kafka_consumer, + topic=Config.HARENA_LOGGER_KAFKA_TOPIC, + delay=Config.HARENA_LOGGER_INTERVAL_S) + consumer_thread.start() - api.add_resource(IndexResource, '/', resource_class_args=[broker,mongodb_client]) - api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[broker,mongodb_collection]) + # Web Service for appending + web_app = Flask(__name__) + web_app.config.from_object(Config) + CORS(web_app) + api = Api(web_app) + api.add_resource(IndexResource, '/', resource_class_args=[kafka_producer]) + api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[kafka_producer, Config.HARENA_LOGGER_KAFKA_TOPIC]) web_app.run(host=web_app.config['HARENA_LOGGER_FLASK_HOST'], port=web_app.config['HARENA_LOGGER_FLASK_PORT'], debug=web_app.config['HARENA_LOGGER_FLASK_DEBUG'])