import pandas as pd
from urllib.request import urlopen
from urllib.parse import urlparse
import io
import csv
import logging
import os
from datetime import datetime
# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
OUTPUT_DIR = 'output'
# Classe para carregar dados de várias extensões
class DataLoader:
@staticmethod
def detectar_separador(bytes_conteudo):
try:
amostra = bytes_conteudo[:1024].decode('utf-8')
return csv.Sniffer().sniff(amostra).delimiter
except Exception:
for sep in [',', ';', '\t']:
try:
pd.read_csv(io.BytesIO(bytes_conteudo), sep=sep, nrows=5)
return sep
except Exception:
continue
return ','
@staticmethod
def extrair_extensao(url):
return urlparse(url).path.split('.')[-1].lower()
@staticmethod
def carregar(url):
extensao = DataLoader.extrair_extensao(url)
try:
conteudo = urlopen(url).read()
except Exception as e:
raise RuntimeError(f'Erro ao acessar a URL: {e}')
buffer = io.BytesIO(conteudo)
if extensao == 'csv':
sep = DataLoader.detectar_separador(conteudo)
for enc in ['utf-8', 'latin1', 'ISO-8859-1']:
try:
return pd.read_csv(io.BytesIO(conteudo), sep=sep, encoding=enc)
except Exception:
pass
raise UnicodeDecodeError('Falha na decodificação CSV.')
elif extensao in ['xls', 'xlsx']:
return pd.read_excel(buffer)
elif extensao == 'json':
return pd.read_json(buffer)
else:
raise ValueError(f'Extensão não suportada: .{extensao}')
# Valida presença das colunas necessárias
def validar_colunas(df, cols):
ausentes = [c for c in cols if c not in df.columns]
if ausentes:
raise ValueError(f'Colunas ausentes: {ausentes}')
logging.info('Colunas obrigatórias presentes.')
# Trata valores nulos substituindo por valor definido
def tratar_nulos(df, val=0):
nulos = df.isnull().sum()
logging.info('Valores nulos:\n' + str(nulos[nulos > 0]))
df.fillna(val, inplace=True)
if df.isnull().values.any():
logging.error('Ainda há valores nulos.')
else:
logging.info('Nulos substituídos.')
return df
# Remove colunas e ordena DataFrame
def limpar_organizar(df, cols_remover, ordenar_por):
df.drop(columns=cols_remover, inplace=True)
df.sort_values(by=ordenar_por, inplace=True)
df.reset_index(drop=True, inplace=True)
logging.info(f'Dados organizados por {ordenar_por}.')
return df
# Cria pasta de saída se não existir
def criar_pasta(pasta=OUTPUT_DIR):
if not os.path.exists(pasta):
os.makedirs(pasta)
logging.info(f'Pasta criada: {pasta}')
else:
logging.info(f'Pasta já existe: {pasta}')
# Salva DataFrame em CSV com timestamp no nome
def salvar_csv(df, nome_base, pasta=OUTPUT_DIR):
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
arquivo = os.path.join(pasta, f'{nome_base}_{ts}.csv')
df.to_csv(arquivo, index=False)
logging.info(f'Arquivo salvo: {arquivo}')
# Função principal que executa todo o fluxo
def main(url, pasta_saida=OUTPUT_DIR):
logging.info('Carregando dados...')
df = DataLoader.carregar(url)
validar_colunas(df, ['Tipo', 'Valor', 'Condominio', 'Bairro'])
df = tratar_nulos(df)
df = df.query("Tipo == 'Apartamento' and Valor > 0 and Condominio > 0").copy()
validar_colunas(df, ['Quartos', 'Area'])
if df.isnull().values.any():
logging.warning('Existem nulos após filtragem.')
df = limpar_organizar(df, ['Tipo'], ['Bairro', 'Valor'])
criar_pasta(pasta_saida)
salvar_csv(df, 'apartamentos_completo', pasta_saida)
filtro_1q = df[(df['Quartos'] == 1) & (df['Valor'] < 1200)]
salvar_csv(filtro_1q, 'apartamentos_1quarto_menor_1200', pasta_saida)
filtro_2q = df[(df['Quartos'] == 2) & (df['Valor'] < 3000) & (df['Area'] > 70)]
salvar_csv(filtro_2q, 'apartamentos_2quartos_menor_3000_area_maior_70', pasta_saida)
logging.info('Primeiros registros do DataFrame final:')
logging.info(df.head())
return df
# Execução do script
if __name__ == '__main__':
url = 'https://raw.githubusercontent.com/alura-cursos/pandas-conhecendo-a-biblioteca/main/base-de-dados/aluguel.csv'
main(url)