import apache_beam as beam
import re
from apache_beam.io.textio import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.core import CombinePerKey, GroupBy, GroupByKey, Map
pipeline_options = PipelineOptions(argv=None)
pipeline = beam.Pipeline(options=pipeline_options)
### Time Mandante ###
def texto_para_lista_m(elemento, delimitador=','):
return elemento.split(delimitador)
def seleciona_elemetos_m(elemento):
indice = elemento[0]
ano_campeonato = elemento[1]
time_man = elemento[9]
gols_man = (elemento[19])
gols_vis = (elemento[20])
gols_man = gols_man.replace(',','.')
gols_vis = gols_vis.replace(',','.')
return (indice, ano_campeonato, time_man, gols_man, gols_vis)
def elemento_para_lista_m(elemento):
return list(elemento)
colunas_fut_m = [
'indice',
'ano_campeonato',
'time_man',
'gols_man',
'gols_vis'
]
def elemento_para_dicionario_m(elemento, colunas):
return dict(zip(colunas, elemento))
def chave_elemento_m(elemento):
chave = elemento['time_man']
return (chave, elemento)
def partida_m(elemento):
""" Recebe uma tupla ('RS',[{}, {}])
Retorna uma tupla ('RS-201-02', 8.0 ) """
time , registros = elemento
for registro in registros:
if bool(re.search(r'\d', registro['gols_man'])):
yield (f"{time}_{registro['ano_campeonato']}", float(registro['gols_man']))
else:
yield (f"{time}_{registro['ano_campeonato']}", 0.0)
def arredonda_numero_m(elemento):
chave = elemento[0]
num = elemento[1]
return (chave, round(num, 2))
### Time Visitante ###
def texto_para_lista_v(elemento, delimitador=','):
return elemento.split(delimitador)
def seleciona_elemetos_v(elemento):
indice = elemento[0]
ano_campeonato = elemento[1]
time_vis = elemento[10]
gols_man = (elemento[19])
gols_vis = (elemento[20])
gols_man = gols_man.replace(',','.')
gols_vis = gols_vis.replace(',','.')
return (indice, ano_campeonato, time_vis, gols_man, gols_vis)
colunas_fut_v = [
'indice',
'ano_campeonato',
'time_vis',
'gols_man',
'gols_vis'
]
def elemento_para_dicionario_v(elemento, colunas):
return dict(zip(colunas, elemento))
def chave_elemento_v(elemento):
chave = elemento['time_vis']
return (chave, elemento)
def partida_v(elemento):
""" Recebe uma tupla ('RS',[{}, {}])
Retorna uma tupla ('RS-201-02', 8.0 ) """
time , registros = elemento
for registro in registros:
if bool(re.search(r'\d', registro['gols_vis'])):
yield (f"{time}_{registro['ano_campeonato']}", float(registro['gols_vis']))
else:
yield (f"{time}_{registro['ano_campeonato']}", 0.0)
def arredonda_numero_v(elemento):
chave = elemento[0]
num = elemento[1]
return (chave, round(num, 2))
### Unir ###
def passando_para_lista(elemento):
a = elemento[0]
b = elemento[1][-1]
c = elemento[1][0]
d = b + c
return a, c, b, d
def descompactar_elemetos(elemento):
""" Recebe uma tupla
Retorna uma tupla """
chave = elemento[0]
golmc = elemento[1]
golmv = elemento[2]
golt = elemento[3]
time = chave.split('_')
time1 = time[0]
ano = time[1]
return str(time1), ano, str(golmc), str(golmv), str(golt)
def preparar_csv(elemento, delimitador=';'):
""" Recebe um tupla
Retorna uma string delimitada """
return f"{delimitador}".join(elemento)