Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support rabbitmq as stream output like kafka, google pub/sub #415

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions blockchainetl/jobs/exporters/rabbitmq_item_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import collections
import json
import logging

import pika

from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter


class RabbitMQItemExporter:

def __init__(self, output, item_type_to_queue_mapping, converters=()):
self.item_type_to_queue_mapping = item_type_to_queue_mapping
self.converter = CompositeItemConverter(converters)
self.connection_url = self.get_connection_url(output)

connection = pika.BlockingConnection(pika.URLParameters("amqp://" + self.connection_url))
print(self.connection_url)
self.channel = connection.channel()

for item_type, queue in item_type_to_queue_mapping.items():
self.channel.queue_declare(queue=queue, durable=True)


def get_connection_url(self, output):
try:
return output.split('/')[1]
except KeyError:
raise Exception('Invalid rabbitmq output param, It should be in format of "amqp/guest:guest@localhost:5672"')

def open(self):
pass

def export_items(self, items):
for item in items:
self.export_item(item)

def export_item(self, item):
item_type = item.get('type')
if item_type is not None and item_type in self.item_type_to_queue_mapping:
data = json.dumps(item).encode('utf-8')
logging.debug(data)
return self.channel.basic_publish(exchange='', routing_key=self.item_type_to_queue_mapping[item_type], body=data, properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
))
else:
logging.warning('Topic for item type "{}" is not configured.'.format(item_type))

def convert_items(self, items):
for item in items:
yield self.converter.convert_item(item)

def close(self):
pass


def group_by_item_type(items):
result = collections.defaultdict(list)
for item in items:
result[item.get('type')].append(item)

return result
24 changes: 19 additions & 5 deletions ethereumetl/streaming/item_exporter_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ def create_item_exporter(output):
'contract': 'contracts',
'token': 'tokens',
})
elif item_exporter_type == ItemExporterType.RABBITMQ:
from blockchainetl.jobs.exporters.rabbitmq_item_exporter import RabbitMQItemExporter
item_exporter = RabbitMQItemExporter(output, item_type_to_queue_mapping={
'block': 'blocks',
'transaction': 'transactions',
'log': 'logs',
'token_transfer': 'token_transfers',
'trace': 'traces',
'contract': 'contracts',
'token': 'tokens',
})

else:
raise ValueError('Unable to determine item exporter type for output ' + output)
Expand All @@ -118,14 +129,16 @@ def determine_item_exporter_type(output):
return ItemExporterType.KINESIS
if output is not None and output.startswith('kafka'):
return ItemExporterType.KAFKA
elif output is not None and output.startswith('postgresql'):
if output is not None and output.startswith('amqp'):
return ItemExporterType.RABBITMQ
if output is not None and output.startswith('postgresql'):
return ItemExporterType.POSTGRES
elif output is not None and output.startswith('gs://'):
if output is not None and output.startswith('gs://'):
return ItemExporterType.GCS
elif output is None or output == 'console':
if output is None or output == 'console':
return ItemExporterType.CONSOLE
else:
return ItemExporterType.UNKNOWN

return ItemExporterType.UNKNOWN


class ItemExporterType:
Expand All @@ -135,4 +148,5 @@ class ItemExporterType:
GCS = 'gcs'
CONSOLE = 'console'
KAFKA = 'kafka'
RABBITMQ = 'rabbitmq'
UNKNOWN = 'unknown'
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def read(fname):
'google-cloud-pubsub==2.13.0',
'google-cloud-storage==1.33.0',
'kafka-python==2.0.2',
'pika==1.3.1',
'sqlalchemy==1.4',
'pg8000==1.16.6',
# This library is a dependency for google-cloud-pubsub, starting from 0.3.22 it requires Rust,
Expand Down