forked from GoogleCloudPlatform/DataflowSDK-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiple_output_pardo.py
183 lines (145 loc) · 6.63 KB
/
multiple_output_pardo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A workflow demonstrating a DoFn with multiple outputs.
DoFns may produce multiple outputs. Outputs that are not the default ("main")
output are marked with a tag at output time and later the same tag will be used
to get the corresponding result (a PCollection) for that output.
This is a slightly modified version of the basic wordcount example. In this
example words are divided into 2 buckets as shorts words (3 characters in length
or less) and words (all other words). There will be 3 output files:::
[OUTPUT]-chars : Character count for the input.
[OUTPUT]-short-words : Word count for short words only.
[OUTPUT]-words : Word count for all other words.
To execute this pipeline locally, specify a local output file or output prefix
on GCS:::
--output [YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
To execute this pipeline using the Google Cloud Dataflow service, specify
pipeline configuration:::
--project YOUR_PROJECT_ID
--staging_location gs://YOUR_STAGING_DIRECTORY
--temp_location gs://YOUR_TEMP_DIRECTORY
--job_name YOUR_JOB_NAME
--runner DataflowRunner
and an output prefix on GCS:::
--output gs://YOUR_OUTPUT_PREFIX
"""
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class SplitLinesToWordsFn(beam.DoFn):
"""A transform to split a line of text into individual words.
This transform will have 3 outputs:
- main output: all words that are longer than 3 characters.
- short words output: all other words.
- character count output: Number of characters in each processed line.
"""
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
"""Receives a single element (a line) and produces words and character
counts.
Important things to note here:
- For a single element you may produce multiple main outputs:
words of a single line.
- For that same input you may produce multiple outputs, potentially
across multiple PCollections
- Outputs may have different types (count) or may share the same type
(words) as with the main output.
Args:
element: processing element.
Yields:
words as main output, short words as tagged output, line character count
as tagged output.
"""
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
if len(word) <= 3:
# yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged
# collection.
yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
else:
# yield word to add it to the main collection.
yield word
class CountWords(beam.PTransform):
"""A transform to count the occurrences of each word.
A PTransform that converts a PCollection containing words into a PCollection
of "word: count" strings.
"""
def expand(self, pcoll):
return (pcoll
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)))
def run(argv=None):
"""Runs the workflow counting the long words and short words separately."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help='Output prefix for files to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
# split_lines_result is an object of type DoOutputsTuple. It supports
# accessing result in alternative ways.
words, _, _ = split_lines_result
short_words = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
character_count = split_lines_result.tag_character_count
# pylint: disable=expression-not-assigned
(character_count
| 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
| beam.GroupByKey()
| 'count chars' >> beam.Map(lambda (_, counts): sum(counts))
| 'write chars' >> WriteToText(known_args.output + '-chars'))
# pylint: disable=expression-not-assigned
(short_words
| 'count short words' >> CountWords()
| 'write short words' >> WriteToText(
known_args.output + '-short-words'))
# pylint: disable=expression-not-assigned
(words
| 'count words' >> CountWords()
| 'write words' >> WriteToText(known_args.output + '-words'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()