forked from GoogleCloudPlatform/pubsub2inbox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.py
113 lines (101 loc) · 4.82 KB
/
pubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import Output, NotConfiguredException
import json
from google.cloud import pubsub_v1
from concurrent import futures
import hashlib
class PubsubOutput(Output):
def callback(self, future):
pass
def output(self):
if 'topic' not in self.output_config:
raise NotConfiguredException(
'No Pub/Sub topic defined in configuration.')
topic_template = self.jinja_environment.from_string(
self.output_config['topic'])
topic_template.name = 'topic'
topic_output = topic_template.render()
if 'content' not in self.output_config:
raise NotConfiguredException(
'No Pub/Sub message content defined in configuration.')
messages = {'single': 'message'}
if 'messages' in self.output_config:
messages_template = self.jinja_environment.from_string(
self.output_config['messages'])
messages_template.name = 'messages'
messages_output = messages_template.render()
messages = json.loads(messages_output)
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True if 'ordering_key' in
self.output_config else False)
client_options = {}
if 'api_endpoint' in self.output_config:
client_options = {
"api_endpoint": self.output_config['api_endpoint']
}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options)
publish_futures = []
if isinstance(messages, list):
new_messages = {}
for message in messages:
new_messages[hashlib.md5(
json.dumps(message).encode()).hexdigest()] = message
messages = new_messages
for message_key, message_value in messages.items():
attributes = {}
if 'attributes' in self.output_config:
attributes_template = self.jinja_environment.from_string(
self.output_config['attributes'])
attributes_template.name = 'attributes'
attributes_output = attributes_template.render(
key=message_key, value=message_value)
attributes = json.loads(attributes_output)
ordering_key = None
if 'ordering_key' in self.output_config:
ordering_key_template = self.jinja_environment.from_string(
self.output_config['ordering_key'])
ordering_key_template.name = 'ordering_key'
ordering_key = ordering_key_template.render(key=message_key,
value=message_value)
content_template = self.jinja_environment.from_string(
self.output_config['content'])
content_template.name = 'content'
content = content_template.render(key=message_key,
value=message_value)
if ordering_key:
future = publisher.publish(topic_output,
data=content.encode('utf-8'),
ordering_key=ordering_key,
**attributes)
else:
future = publisher.publish(topic_output,
data=content.encode('utf-8'),
**attributes)
self.logger.info('Message published.',
extra={
'key': message_key,
'topic': topic_output,
'attributes': attributes,
'ordering_key': ordering_key
})
future.add_done_callback(self.callback)
publish_futures.append(future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
self.logger.info('Message sending finished!',
extra={
'count': len(publish_futures),
'topic': topic_output,
})