Skip to content

Commit

Permalink
major updates
Browse files Browse the repository at this point in the history
  • Loading branch information
matheusmota committed Sep 7, 2020
1 parent 90aa819 commit 24bd13e
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 41 deletions.
22 changes: 18 additions & 4 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import os


class Config(object):

HARENA_LOGGER_FLASK_HOST = os.environ.get('HARENA_LOGGER_FLASK_HOST', '0.0.0.0')
HARENA_LOGGER_FLASK_PORT = int(os.environ.get('HARENA_LOGGER_FLASK_PORT', 10030))
HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', False))
HARENA_LOGGER_FLASK_DEBUG = bool(os.environ.get('HARENA_LOGGER_FLASK_DEBUG', True))

HARENA_LOGGER_MONGODB_HOST = os.environ.get('HARENA_LOGGER_MONGODB_HOST', 'localhost')
HARENA_LOGGER_MONGODB_PORT = int(os.environ.get('HARENA_LOGGER_MONGODB_PORT', 10031))
HARENA_LOGGER_MONGODB_URL ="mongodb://{0}:{1}/".format(HARENA_LOGGER_MONGODB_HOST, HARENA_LOGGER_MONGODB_PORT)
HARENA_LOGGER_MONGODB_DB = os.environ.get('HARENA_LOGGER_MONGODB_DB', 'harena_logger')
HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'executions')
HARENA_LOGGER_MONGODB_COLLECTION = os.environ.get('HARENA_LOGGER_MONGODB_COLLECTION', 'event_logs')

HARENA_LOGGER_KAFKA_BROKERS = os.environ.get('HARENA_LOGGER_KAFKA_BROKERS', 'kafka1:19092')
HARENA_LOGGER_KAFKA_TOPIC = os.environ.get('HARENA_LOGGER_KAFKA_TOPIC', 'harena-logs')
HARENA_LOGGER_INTERVAL_S = os.environ.get('HARENA_LOGGER_INTERVAL_S', 10)


HARENA_LOGGER_BROKER_HOST = os.environ.get('HARENA_LOGGER_BROKER_HOST', 'localhost')
HARENA_LOGGER_BROKER_PORT = int(os.environ.get('HARENA_LOGGER_BROKER_PORT', 10032))
# LOGGING SETTINGS
LOGGING_NAME = os.environ.get('LOGGING_NAME', 'harena-logger')
LOGGING_LEVEL = os.environ.get('LOGGING_LEVEL', 'DEBUG')

LOGGING_STYLES = ('info=blue;'
'warning=green;'
'error=red;'
'critical=red,bold;'
'debug=white')

LOGGING_FORMAT = ('%(asctime) -19s | %(levelname) -8s | %(threadName) -10s | '
'%(funcName) -16s | %(message)s')
169 changes: 132 additions & 37 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,126 @@
import random
import pymongo
import time
import threading
import logging
import coloredlogs

from flask import Flask, request, jsonify
from flask_restful import Resource, Api
from config import Config
from flask_cors import CORS, cross_origin
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError

# To do: Logging not appearing in Docker logs
LOGGER = logging.getLogger(Config.LOGGING_NAME)

class KafkaMongodbAppender (threading.Thread):

def __init__(self, mongodb_server_url, mongodb_database, mongodb_collection, kafka_consumer, topic, delay):
threading.Thread.__init__(self)
self.mongodb_server_url = mongodb_server_url
self.mongodb_database = mongodb_database
self.mongodb_collection = mongodb_collection
self.kafka_consumer = kafka_consumer
self.topic = topic
self.delay = delay
LOGGER.debug(mongodb_server_url)

def run(self):

LOGGER.info("Starting KafkaMongodbAppender")

while True:

# Opening and closing the connection during streamming checking.
# Adopting this since some memory problems appeared after long term one connection management
mongodb_client = pymongo.MongoClient(self.mongodb_server_url)
mongodb_db = mongodb_client[self.mongodb_database]
mongodb_collection = mongodb_db[self.mongodb_collection]

print("Checking for newly streamed messages...")

for message in self.kafka_consumer:
#print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
print("Found {} events inside the message ".format(len(message.value['harena-log-stream'])))
for event in message.value['harena-log-stream']:
mongodb_collection.insert_one(event)



print("Message stream timed out. Waiting {} seconds until next verification...".format(self.delay))
mongodb_client.close()
time.sleep(self.delay)

# mongodb_client = pymongo.MongoClient(web_app.config['HARENA_LOGGER_MONGODB_URL'])
# mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']]
# mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']]


# while True:




# def connect_to_mongodb(threadName, counter, delay):

# def disconnect_from_mongodb(threadName, counter, delay):

# def consume_from_kafka_and_append_to_mongodb(kafka_brokers, delay):

# def get_num_of_messages(kafka_brokers, delay):







class IndexResource(Resource):

def __init__(self,broker,mongodb_client):
self.broker = broker
def __init__(self, kafka_producer):
self.kafka_producer=kafka_producer

LOGGER.debug("IndexResource initialized")


def get(self):
message = {"message": "Welcome to the Harena Logger module",
"broker_status" : broker.__repr__(),
"database_status":mongodb_client.server_info()['ok']

"kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected()
}
return message


class HarenaMessageResource(Resource):

def __init__(self, broker, mongodb_collection):
self.broker = broker
self.mongodb_collection = mongodb_collection
def __init__(self, kafka_producer, target_topic):
self.kafka_producer = kafka_producer
self.target_topic=target_topic



@cross_origin(origin='*')
def post(self):

# To do: properly evaluate message body parsing
message = request.get_json()
print(json.dumps(message))
topic = message['topic']
payload = message['payload']
message['server_timestamp'] = "{}".format(int(round(time.time() * 1000)))

# Asynchronous by default
future = self.kafka_producer.send(self.target_topic, json.dumps(message).encode('utf-8'))

message['timestamp'] = "{}".format(int(round(time.time() * 1000)))
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass

broker_publishing_flag = self.broker.publish(topic,json.dumps(payload))
mongodb_insertion_flag = self.mongodb_collection.insert_one(message)
# Successful result returns assigned partition and offset
# print(future)

data = {"message":'Message published successfully'}

Expand All @@ -49,42 +132,54 @@ def post(self):

@cross_origin(origin='*')
def get(self):
message = {"message": "message streaming is up"

message = {"message": "message streaming is up",
"kafka_bootstrap_connected" : self.kafka_producer.bootstrap_connected()

}
return message


@cross_origin(origin='*')
def delete(self):
self.mongodb_collection.delete_many({})
data = {"message":'Messages in the execution stream deleted successfully'}
if __name__ == '__main__':

return jsonify(data)

kafka_producer = None
kafka_consumer = None

if __name__ == '__main__':

web_app = Flask(__name__)
web_app.config.from_object(Config)
CORS(web_app)
api = Api(web_app)
while True:
try:
kafka_producer = KafkaProducer(bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS)

kafka_consumer = KafkaConsumer(Config.HARENA_LOGGER_KAFKA_TOPIC, group_id='harena-logger-consumer',
bootstrap_servers=Config.HARENA_LOGGER_KAFKA_BROKERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=10000)
break
except:
pass

mongodb_client = pymongo.MongoClient("mongodb://{0}:{1}/"
.format(web_app.config['HARENA_LOGGER_MONGODB_HOST'], \
web_app.config['HARENA_LOGGER_MONGODB_PORT']))
print("Could not exchange metadata with Kafka bootstrap servers for the first time. Retrying...")
time.sleep(1)

mongodb_db = mongodb_client[web_app.config['HARENA_LOGGER_MONGODB_DB']]
mongodb_collection = mongodb_db[web_app.config['HARENA_LOGGER_MONGODB_COLLECTION']]

broker = paho.Client("publisher{0}".format(random.randint(0,99999999)) )
broker.connect(web_app.config['HARENA_LOGGER_BROKER_HOST'],
web_app.config['HARENA_LOGGER_BROKER_PORT'])
broker.reconnect_delay_set(min_delay=1, max_delay=20)

api.add_resource(IndexResource, '/', resource_class_args=[broker,mongodb_client])
api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[broker,mongodb_collection])

consumer_thread = KafkaMongodbAppender(mongodb_server_url=Config.HARENA_LOGGER_MONGODB_URL,
mongodb_database=Config.HARENA_LOGGER_MONGODB_DB,
mongodb_collection=Config.HARENA_LOGGER_MONGODB_COLLECTION,
kafka_consumer=kafka_consumer,
topic=Config.HARENA_LOGGER_KAFKA_TOPIC,
delay=Config.HARENA_LOGGER_INTERVAL_S)
consumer_thread.start()


# Web Service for appending
web_app = Flask(__name__)
web_app.config.from_object(Config)
CORS(web_app)
api = Api(web_app)
api.add_resource(IndexResource, '/', resource_class_args=[kafka_producer])
api.add_resource(HarenaMessageResource, '/api/v1/message',resource_class_args=[kafka_producer, Config.HARENA_LOGGER_KAFKA_TOPIC])
web_app.run(host=web_app.config['HARENA_LOGGER_FLASK_HOST'],
port=web_app.config['HARENA_LOGGER_FLASK_PORT'],
debug=web_app.config['HARENA_LOGGER_FLASK_DEBUG'])

0 comments on commit 24bd13e

Please sign in to comment.