Para o desafio da aula 3 de Pipeline de dados: integrando Python com MongoDB e MySQL, criei o código abaixo. Pensei em desenvolver uam função que criasse as tabelas de forma mais dinâmica, mas via prompt seria complexo e entediante, além disso estou pensando em uma forma de garantir que o arquivo para importar para o banco esteja na mesma ordem que os campos no banco, aconteceu comigo e tive que ajustar o arquivo
import mysql.connector as db
from dotenv import load_dotenv
import os
import pandas as pd
from time import sleep
class connect_mysql:
def __init__(self):
load_dotenv()
self.db_user = os.getenv('mysql_user')
self.db_pass = os.getenv('mysql_pass')
self.db_host = 'localhost'
self.cnx = self.__connect_mysql()
self.cur = self.__create_cursor()
self.df = None
def __connect_mysql(self):
cnx = db.connect(host=self.db_host,user=self.db_user,password=self.db_pass)
return cnx
def __create_cursor(self):
cur = self.cnx.cursor()
return cur
def create_database(self,db_name):
self.cur.execute(f'Create database if not exists {db_name};')
print(f'Database {db_name} criado com sucesso')
def show_databases(self):
self.cur.execute('show databases;')
dbases = [base for base in self.cur]
return dbases
def create_product_table(self,db_name,tb_name):
self.cur.execute(f'''
create table if not exists {db_name}.{tb_name}(
id VARCHAR(100),
Produto VARCHAR(100),
Categoria_Produto VARCHAR(100),
Preco FLOAT(10,2),
Frete FLOAT(10,2),
Data_Compra DATE,
Vendedor VARCHAR(100),
Local_Compra VARCHAR(100),
Avaliacao_Compra INT,
Tipo_Pagamento VARCHAR(100),
Qntd_Parcelas INT,
Latitude FLOAT(10,2),
Longitude FLOAT(10,2),
primary key (id)
);
''')
print(f'Tabela {tb_name} criado com sucesso')
def show_tables(self,db_name):
self.cur.execute(f'use {db_name}')
self.cur.execute('show tables;')
tables = [table for table in self.cur]
return tables
def read_csv(self,path):
df = pd.read_csv(path)
self.df = df
return df
def add_product_data(self,db_name,tb_name):
data = [tuple(value) for i,value in self.df.iterrows()]
query = f'Insert into {db_name}.{tb_name} () values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);'
self.cur.executemany(query,data)
self.cnx.commit()
def close_cnx(self):
self.cur.close()
self.cnx.close()
if __name__ == '__main__':
print('Conectando ao Mysql....')
cnx_mysql = connect_mysql()
sleep(1.5)
os.system('clear')
print('==== Conexão Mysql====')
while True:
print('\nEscolha uma opção:')
print('1 - Visualizar databases')
print('2 - Criar database')
print('3 - Visualizar tabelas')
print('4 - Criar tabela de Produtos')
print('5 - Importar arquivo csv para um db')
print('0 - SAIR')
print('-----------------------------')
option = input('\nOpção: ')
if option == '1':
databases = cnx_mysql.show_databases()
print('\nDatabases disponíveis: ')
for database in databases:
print(database)
sleep(2)
elif option == '2':
dbname = input('\nQual o nome para o database: \n')
cnx_mysql.create_database(dbname)
sleep(2)
elif option == '3':
dbname = input('\nVisualizar tabelas de qual database? \n')
tables = cnx_mysql.show_tables(dbname)
print(f'\nTabelas disponíveis no database {dbname}')
for table in tables:
print(table)
sleep(2)
elif option == '4':
dbname = input('\nCriar tabela em qual database?\n')
tbname = input('Nome da tabela:\n')
cnx_mysql.create_product_table(dbname,tbname)
elif option == '5':
file = input('\nDigite o caminho com o nome do arquivo:\n')
dbname = input('Nome do database:\n')
tbname = input('Nome da tabela:\n')
data = cnx_mysql.read_csv(file)
print('Criado dataframe com o arquivo informado')
cnx_mysql.add_product_data(dbname,tbname)
print(f'Dados inseridos na tabela {tbname}')
elif option == '0':
cnx_mysql.close_cnx()
break
else:
print('\nOpção inválida')
sleep(2)
print('Programa encerrado')