4
respostas

Agrupar Lista

Boa tarde professor,

estou tentando realizar o agrupamento de uma fonde dados(Lista) - mais não estou conseguindo

def texto_para_lista(elemento, delimitador=','):
    return elemento.split(delimitador)

def seleciona_elemetos(elemento):
    ano_campeonato = elemento[1]
    time_man = elemento[9]
    gols_man = elemento[19]
    gols_vis = elemento[20]

    return (ano_campeonato, time_man, gols_man, gols_vis)

def elemento_para_lista(elemento):
    return list(elemento)

futebol = (
    pipeline
    | "Leitura do Dataeset" >> ReadFromText('../data/futebol.csv')
    | "De texto para lista" >> beam.Map(texto_para_lista, delimitador =',')
    | "Delata alguns elementos" >> beam.Map(seleciona_elemetos)
    | "Elemento para lista" >> beam.Map(elemento_para_lista)
    | "" >> beam.GroupByKey()
    | "Mostrar dataset" >> beam.Map(print)

IOTypeHints[inputs=None, outputs=((Tuple[TypeVariable[K], Iterable[TypeVariable[V]]],), {})] File "", line 671, in loadunlocked File "", line 848, in exec_module File "", line 219, in callwith_frames_removed File "C:\Users\Lucas\anaconda3\envs\futebol\lib\site-packages\apache_beam\transforms\core.py", line 2315, in class GroupByKey(PTransform): File "C:\Users\Lucas\anaconda3\envs\futebol\lib\site-packages\apache_beam\typehints\decorators.py", line 984, in annotate_output_types f.typehints = th.with_output_types(return_type_hint) # pylint: disable=protected-access

4 respostas

Olá, tudo bem?

Você pode mandar todo o código e todo o log para eu verificar?

import apache_beam as beam
import re
from apache_beam.io.textio import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.core import CombinePerKey, GroupBy, GroupByKey, Map

pipeline_options = PipelineOptions(argv=None)
pipeline = beam.Pipeline(options=pipeline_options)

### Time Mandante ###

def texto_para_lista_m(elemento, delimitador=','):
    return elemento.split(delimitador)

def seleciona_elemetos_m(elemento):
    indice = elemento[0]
    ano_campeonato = elemento[1]
    time_man = elemento[9]
    gols_man = (elemento[19])
    gols_vis = (elemento[20])

    gols_man = gols_man.replace(',','.')
    gols_vis = gols_vis.replace(',','.')

    return (indice, ano_campeonato, time_man, gols_man, gols_vis)

def elemento_para_lista_m(elemento):
    return list(elemento)

colunas_fut_m = [
    'indice',
    'ano_campeonato',
    'time_man',
    'gols_man',
    'gols_vis'
]

def elemento_para_dicionario_m(elemento, colunas):
    return dict(zip(colunas, elemento))  

def chave_elemento_m(elemento):
    chave = elemento['time_man']
    return (chave, elemento)

def partida_m(elemento):
    """ Recebe uma tupla ('RS',[{}, {}]) 
        Retorna uma tupla ('RS-201-02', 8.0 ) """
    time , registros = elemento
    for registro in registros:
        if bool(re.search(r'\d', registro['gols_man'])):
            yield (f"{time}_{registro['ano_campeonato']}", float(registro['gols_man']))
        else:
            yield (f"{time}_{registro['ano_campeonato']}", 0.0)

def arredonda_numero_m(elemento):
    chave = elemento[0]
    num = elemento[1]
    return (chave, round(num, 2))

### Time Visitante ###

def texto_para_lista_v(elemento, delimitador=','):
    return elemento.split(delimitador)

def seleciona_elemetos_v(elemento):
    indice = elemento[0]
    ano_campeonato = elemento[1]
    time_vis = elemento[10]
    gols_man = (elemento[19])
    gols_vis = (elemento[20])

    gols_man = gols_man.replace(',','.')
    gols_vis = gols_vis.replace(',','.')

    return (indice, ano_campeonato, time_vis, gols_man, gols_vis)

colunas_fut_v = [
    'indice',
    'ano_campeonato',
    'time_vis',
    'gols_man',
    'gols_vis'
]

def elemento_para_dicionario_v(elemento, colunas):
    return dict(zip(colunas, elemento)) 

def chave_elemento_v(elemento):
    chave = elemento['time_vis']
    return (chave, elemento)

def partida_v(elemento):
    """ Recebe uma tupla ('RS',[{}, {}]) 
        Retorna uma tupla ('RS-201-02', 8.0 ) """
    time , registros = elemento
    for registro in registros:
        if bool(re.search(r'\d', registro['gols_vis'])):
            yield (f"{time}_{registro['ano_campeonato']}", float(registro['gols_vis']))
        else:
            yield (f"{time}_{registro['ano_campeonato']}", 0.0) 

def arredonda_numero_v(elemento):
    chave = elemento[0]
    num = elemento[1]
    return (chave, round(num, 2))

### Unir ###

def passando_para_lista(elemento):
    a = elemento[0]
    b = elemento[1][-1]
    c = elemento[1][0]
    d = b + c 
    return a, c, b, d

def descompactar_elemetos(elemento):
    """ Recebe uma tupla
        Retorna uma tupla """
    chave = elemento[0]
    golmc = elemento[1]
    golmv = elemento[2]
    golt = elemento[3]
    time = chave.split('_')
    time1 = time[0]
    ano = time[1]
    return str(time1), ano, str(golmc), str(golmv), str(golt)

def preparar_csv(elemento, delimitador=';'):
    """ Recebe um tupla
        Retorna uma string delimitada """   
    return f"{delimitador}".join(elemento)
futebol_mandante = (
    pipeline
    | "Leitura do Dataeset - Madante" >> ReadFromText('../data/futebol.csv', skip_header_lines=1)
    | "De texto para lista - Madante"  >> beam.Map(texto_para_lista_m, delimitador =',')
    | "Seleciona alguns elementos - Madante" >> beam.Map(seleciona_elemetos_m)
    | "Transforma Lista para dicionário - Madante" >> beam.Map(elemento_para_dicionario_m, colunas_fut_m)
    | "Cria a chave - Madante" >> beam.Map(chave_elemento_m)
    | "Agrupa os elementos - Madante" >> beam.GroupByKey()
    | "Cria elemento time-ano - Madante" >> beam.FlatMap(partida_m)
    | "Soma o número de gols - Madante" >> beam.CombinePerKey(sum)
    | "Arredonda a soma do número de gols - Madante" >> beam.Map(arredonda_numero_m)
    #| "Mostrar dataset" >> beam.Map(print)
)

futebol_visitante = (
    pipeline
    | "Leitura do Dataeset - Visitante" >> ReadFromText('../data/futebol.csv', skip_header_lines=1)
    | "De texto para lista - Visitante"  >> beam.Map(texto_para_lista_v, delimitador =',')
    | "Seleciona alguns elementos - Visitante" >> beam.Map(seleciona_elemetos_v)
    | "Transforma Lista para dicionário - Visitante" >> beam.Map(elemento_para_dicionario_v, colunas_fut_v)
    | "Cria a chave - Visitante" >> beam.Map(chave_elemento_v)
    | "Agrupa os elementos - Visitante" >> beam.GroupByKey()
    | "Cria elemento time-ano - Visitante" >> beam.FlatMap(partida_v)
    | "Soma o número de gols - Visitante" >> beam.CombinePerKey(sum)
    | "Arredonda a soma do número de gols - Visitante" >> beam.Map(arredonda_numero_v)
    #| "Mostrar dataset" >> beam.Map(print)
)

unir = (
    (futebol_mandante, futebol_visitante)
    | "Empilha Pipelines" >> beam.Flatten()
    | "Agrupa as pipilines" >> beam.GroupByKey()
    | "Passando para lista" >> beam.Map(passando_para_lista)
    | "Descompactar" >> beam.Map(descompactar_elemetos)
    | "Prepara CSV" >> beam.Map(preparar_csv)
    | "Mostra resultado" >> beam.Map(print)
)

header = 'TIME,ANO,GOLS_MARCADOS_CASA,GOLS_MARCADOS_FORA,TOTAL_GOLS_MARCADOS'

unir | "Criar arquivo CSV" >> WriteToText('../data/resultado', file_name_suffix='.csv', header=header)

pipeline.run()

Agora consegui até aguparar , mais não estou conseguindo salvar o arquivo em .csv - na verdade ele até salva mais os dados aparecem como none.