Estou com um problema que não consigo resolver de forma alguma.
No processo de fusão de dois arquivos CSV, acontece um comportamento inesperado no número de registros resultantes. Os arquivos de entrada possuem, respectivamente, 3123 e 1323 registros, totalizando 4446 após a junção. No entanto, ao exibir a quantidade de registros processados, o sistema retorna 11061, enquanto o arquivo gerado final salvo, nomeado como "dados_combinados.csv" contém apenas 7 linhas e os registro ainda se repetem.
A função responsável pela junção dos dados utiliza o método extend para combinar os registros de ambas as fontes. Durante a execução do script, as quantidades individuais de cada conjunto são impressas corretamente antes da fusão, mas o total final diverge significativamente do esperado.
Segue meu códigos para análise:
from processamento_dados import Dados
path_json = 'data_raw/dados_empresaA.json'
path_csv = 'data_raw/dados_empresaB.csv'
#Extract
dados_empresaA = Dados(path_json, 'json')
print(dados_empresaA.nome_colunas)
print(dados_empresaA.qtd_linhas)
print("Len A =", len(dados_empresaA.dados))
dados_empresaB = Dados(path_csv, 'csv')
print(dados_empresaB.nome_colunas)
print(dados_empresaB.qtd_linhas)
print("Len B =", len(dados_empresaB.dados))
# Transform
key_mapping = {'Nome do Item': 'Nome do Produto',
'Classificação do Produto': 'Categoria do Produto',
'Valor em Reais (R$)': 'Preço do Produto (R$)',
'Quantidade em Estoque': 'Quantidade em Estoque',
'Nome da Loja': 'Filial',
'Data da Venda': 'Data da Venda'}
dados_empresaB.rename_columns(key_mapping)
print(f'renomeação de colunas = {dados_empresaB.nome_colunas}')
dados_fusao = Dados.join(dados_empresaA, dados_empresaB)
print(f'Nome das colunas = {dados_fusao.nome_colunas}')
print(f'Quantidade de linhas das duas tabelas = {dados_fusao.qtd_linhas}')
# Load
path_dados_combinados = 'data_processed/dados_combinados.csv'
dados_fusao.salvando_dados(path_dados_combinados)
print(f'Arquivo dos dados combinados = {path_dados_combinados}')
import json
import csv
class Dados:
def __init__(self, path, tipo_dados):
self.path = path
self.tipo_dados = tipo_dados
self.dados = self.leitura_dados()
self.nome_colunas = self.get_columns()
self.qtd_linhas = self.size_data()
def leitura_json(self):
dados_json = []
with open(self.path, 'r') as file:
dados_json = json.load(file)
return dados_json
def leitura_csv(self):
dados_csv = []
with open(self.path, 'r') as file:
spamreader = csv.DictReader(file, delimiter=',')
for row in spamreader:
dados_csv.append(row)
return dados_csv
def leitura_dados(self):
dados = []
if self.tipo_dados == 'csv':
dados = self.leitura_csv()
elif self.tipo_dados == 'json':
dados = self.leitura_json()
elif self.tipo_dados == 'list':
dados = self.path
self.path = 'lista em memoria'
return dados
def get_columns(self):
return list(self.dados[-1].keys())
def rename_columns(self, key_mapping):
new_dados = []
for old_dict in self.dados:
dict_temp = {}
for old_key, value in old_dict.items():
dict_temp[key_mapping[old_key]] = value
new_dados.append(dict_temp)
self.dados = new_dados
self.nome_colunas = self.get_columns()
def size_data(self):
return len(self.dados)
def join(dadosA, dadosB):
combined_list = []
combined_list.extend(dadosA.dados)
combined_list.extend(dadosB.dados)
return Dados(combined_list, 'list')
def transformando_dados_tabela(self):
dados_combinados_tabela = [self.nome_colunas]
for row in self.dados:
linha = []
for coluna in self.nome_colunas:
linha.append(row.get(coluna, 'Indisponivel'))
dados_combinados_tabela.append(linha)
return dados_combinados_tabela
def salvando_dados(self, path):
dados_combinados_tabela = self.transformando_dados_tabela()
with open(path, 'w') as file:
writer = csv.writer(file)
writer.writerows(dados_combinados_tabela)