-
Notifications
You must be signed in to change notification settings - Fork 0
/
3.9+-+CoGroupByKey.py
31 lines (26 loc) · 1.08 KB
/
3.9+-+CoGroupByKey.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import apache_beam as beam
p1 = beam.Pipeline()
Tempo_Atrasos = (
p1
| "Importar Dados Atraso" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
| "Separar por Vírgulas Atraso" >> beam.Map(lambda record: record.split(','))
| "Pegar voos com atraso" >> beam.Filter(lambda record: int(record[8]) > 0 )
| "Criar par atraso" >> beam.Map(lambda record: (record[4],int(record[8])))
| "Somar por key" >> beam.CombinePerKey(sum)
# | "Mostrar Resultados" >> beam.Map(print)
)
Qtd_Atrasos = (
p1
| "Importar Dados" >> beam.io.ReadFromText("voos_sample.csv", skip_header_lines = 1)
| "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
| "Pegar voos com atraso qtd" >> beam.Filter(lambda record: int(record[8]) > 0 )
| "Criar par qtd" >> beam.Map(lambda record: (record[4],int(record[8])))
| "Contar por key" >> beam.combiners.Count.PerKey()
# | "Mostrar Resultados QTD" >> beam.Map(print)
)
tabela_atrasos = (
{'Qtd_Atrasos':Qtd_Atrasos,'Tempo_Atrasos':Tempo_Atrasos}
| "Group By" >> beam.CoGroupByKey()
| beam.Map(print)
)
p1.run()