Para o desafio da aula 3 de Pipeline de dados: integrando Python com MongoDB e MySQL, criei o código abaixo que utiliza o código do desafio anterior Desafio Aula 2
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import pandas as pd
import time
from extract_and_save import mongodb
class transform_data:
def __init__(self,dbcol):
self.col = dbcol
self.data = None
self.df = None
def visualize_collection(self):
data = [doc for doc in self.col.find({},{'_id':0})]
self.data = data
return data
def see_columns(self):
columns = list({column for doc in self.col.find({},{'_id':0}) for column in doc.keys()})
columns.sort()
return columns
def rename_column(self,old_name,new_name):
self.col.update_many({},{'$rename':{old_name:new_name}})
print(f'Coluna "{old_name}" alterado para "{new_name}"')
def select_category(self,key,filter):
if key == '':
query = {}
else:
query = {key:filter}
data = [doc for doc in self.col.find(query,{'_id':0})]
self.data = data
return data
def make_regex(self,key,regex):
data = [doc for doc in self.col.find({key:{'$regex':regex}},{'_id':0})]
self.data = data
return data
def create_dataframe(self):
df = pd.DataFrame(self.data)
self.df = df
return df
def format_date(self,campo):
try:
self.df[campo] = pd.to_datetime(self.df[campo],format='%d/%m/%Y')
except Exception as e:
print(e)
self.df[campo] = self.df[campo].dt.strftime('%Y-%m-%d')
def save_csv(self,nome_arquivo):
self.df.to_csv(f'../data/{nome_arquivo}.csv',index=False)
self.data = None
print(f'Arquivo salvo com sucesso (../data/{nome_arquivo}.csv)\n')
if __name__ == '__main__':
print('Transformação de Dados')
uri = input('Informe sua URI:\n')
db_connector = mongodb(uri)
print('\nEscolha o db:')
print(db_connector.list_databases())
database = input('\ndb: ')
db_connector.create_connect_db(database)
if len(db_connector.list_collections()) == 0:
print('Não há collections\nPrograma encerrado!')
exit()
else:
print('\nEscolha o collection:')
print(db_connector.list_collections())
collection = input()
db_connector.create_connect_collection(collection)
print(f'\nConectado ao collection {collection} no banco {database}',end='. ')
transf = transform_data(db_connector.col)
while True:
print('Escolha uma opção:')
print('1 - Visualizar documentos do collection')
print('2 - Visualizar nome das chaves do collection')
print('3 - Renomear uma chave do collection')
print('4 - Filtrar por um valor de chave especifico')
print('5 - Filtrar por um valor de chave com expressão regular')
if transf.data is not None:
print('6 - Criar dataframe de arquivo armazenado na memória')
if transf.df is not None:
print('7 - Alterar formato de data de um campo')
print('8 - Salvar arquivo armazenado na memória')
print('0 - SAIR')
option = input('\nOpção: ')
if option == '1':
data = transf.visualize_collection()
data_qty = len(data)
print(data)
print(f'\n{data_qty} documentos salvos na memória\n')
elif option == '2':
col_keys = transf.see_columns()
print(f'\nChaves do collection {collection}:')
print(col_keys,'\n')
elif option == '3':
old_key = input('\nQual o nome da chave que deseja alterar?\n')
new_key = input('Qual será o novo nome:\n')
transf.rename_column(old_key,new_key)
print('\nChaves alteradas com sucesso\n')
elif option == '4':
key = input('\nQual chave deseja filtrar:\n')
filter = input('Qual o filtro:\n')
transf.select_category(key,filter)
print(f'\n{len(transf.data)} documentos salvos na memória\n')
elif option == '5':
key = input('\nQual chave deseja filtrar:\n')
filter = input('Digite a expressão regular:\n')
transf.make_regex(key,filter)
print(f'\n{len(transf.data)} documentos salvos na memória\n')
elif option == '6':
if transf.data is None:
print('Opção inválida!\n')
time.sleep(2)
else:
transf.create_dataframe()
print(transf.df.head())
print('\nDataframe criado com sucesso com dados da memória\n')
elif option == '7':
if transf.df is None:
print('Opção inválida!\n')
time.sleep(2)
else:
campo = input('\nInforme o campo que esteja com data no formato dia/mês/ano:\n')
transf.format_date(campo)
print(f'\nFormato de data do campo {campo} alterado com sucesso\n')
elif option == '8':
if transf.df is None:
print('Opção inválida!\n')
time.sleep(2)
else:
nome_arquivo = input('\nDigite o nome do arquivo para salvar o dataframe (sem extensão):\n')
transf.save_csv(nome_arquivo)
elif option == '0':
break
else:
print('Opção inválida!\n')
print('Programa encerrado!')