forked from GoogleCloudPlatform/DataflowSDK-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
group_with_coder.py
123 lines (100 loc) · 4.6 KB
/
group_with_coder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""An example of using custom classes and coder for grouping operations.
This workflow demonstrates registration and usage of a custom coder for a user-
defined class. A deterministic custom coder is needed to use a class as a key in
a combine or group operation.
This example assumes an input file with, on each line, a comma-separated name
and score.
"""
from __future__ import absolute_import
import argparse
import logging
import sys
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.typehints import typehints
from apache_beam.typehints.decorators import with_output_types
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class Player(object):
"""A custom class used as a key in combine/group transforms."""
def __init__(self, name):
self.name = name
class PlayerCoder(coders.Coder):
"""A custom coder for the Player class."""
def encode(self, o):
"""Encode to bytes with a trace that coder was used."""
# Our encoding prepends an 'x:' prefix.
return 'x:%s' % str(o.name)
def decode(self, s):
# To decode, we strip off the prepended 'x:' prefix.
assert s[0:2] == 'x:'
return Player(s[2:])
def is_deterministic(self):
# Since coded Player objects are used as keys below with
# beam.CombinePerKey(sum), we require that this coder is deterministic
# (i.e., two equivalent instances of the classes are encoded into the same
# byte string) in order to guarantee consistent results.
return True
# Annotate the get_players function so that the typehint system knows that the
# input to the CombinePerKey operation is a key-value pair of a Player object
# and an integer.
@with_output_types(typehints.KV[Player, int])
def get_players(descriptor):
name, points = descriptor.split(',')
return Player(name), int(points)
def run(args=None):
"""Runs the workflow computing total points from a collection of matches."""
if args is None:
args = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument('--input',
required=True,
help='Input file to process.')
parser.add_argument('--output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(args)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Register the custom coder for the Player class, so that it will be used in
# the computation.
coders.registry.register_coder(Player, PlayerCoder)
(p # pylint: disable=expression-not-assigned
| ReadFromText(known_args.input)
# The get_players function is annotated with a type hint above, so the type
# system knows the output type of the following operation is a key-value
# pair of a Player and an int. Please see the documentation for details on
# types that are inferred automatically as well as other ways to specify
# type hints.
| beam.Map(get_players)
# The output type hint of the previous step is used to infer that the key
# type of the following operation is the Player type. Since a custom coder
# is registered for the Player class above, a PlayerCoder will be used to
# encode Player objects as keys for this combine operation.
| beam.CombinePerKey(sum)
| beam.Map(lambda (k, v): '%s,%d' % (k.name, v))
| WriteToText(known_args.output))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()