Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

[Sugestão] 09 Desafio: salvando os dados filtrados

import pandas as pd
from urllib.request import urlopen
from urllib.parse import urlparse
import io
import csv
import logging
import os
from datetime import datetime

# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
OUTPUT_DIR = 'output'

# Classe para carregar dados de várias extensões
class DataLoader:
    @staticmethod
    def detectar_separador(bytes_conteudo):
        try:
            amostra = bytes_conteudo[:1024].decode('utf-8')
            return csv.Sniffer().sniff(amostra).delimiter
        except Exception:
            for sep in [',', ';', '\t']:
                try:
                    pd.read_csv(io.BytesIO(bytes_conteudo), sep=sep, nrows=5)
                    return sep
                except Exception:
                    continue
            return ','

    @staticmethod
    def extrair_extensao(url):
        return urlparse(url).path.split('.')[-1].lower()

    @staticmethod
    def carregar(url):
        extensao = DataLoader.extrair_extensao(url)
        try:
            conteudo = urlopen(url).read()
        except Exception as e:
            raise RuntimeError(f'Erro ao acessar a URL: {e}')
        buffer = io.BytesIO(conteudo)
        if extensao == 'csv':
            sep = DataLoader.detectar_separador(conteudo)
            for enc in ['utf-8', 'latin1', 'ISO-8859-1']:
                try:
                    return pd.read_csv(io.BytesIO(conteudo), sep=sep, encoding=enc)
                except Exception:
                    pass
            raise UnicodeDecodeError('Falha na decodificação CSV.')
        elif extensao in ['xls', 'xlsx']:
            return pd.read_excel(buffer)
        elif extensao == 'json':
            return pd.read_json(buffer)
        else:
            raise ValueError(f'Extensão não suportada: .{extensao}')

# Valida presença das colunas necessárias
def validar_colunas(df, cols):
    ausentes = [c for c in cols if c not in df.columns]
    if ausentes:
        raise ValueError(f'Colunas ausentes: {ausentes}')
    logging.info('Colunas obrigatórias presentes.')

# Trata valores nulos substituindo por valor definido
def tratar_nulos(df, val=0):
    nulos = df.isnull().sum()
    logging.info('Valores nulos:\n' + str(nulos[nulos > 0]))
    df.fillna(val, inplace=True)
    if df.isnull().values.any():
        logging.error('Ainda há valores nulos.')
    else:
        logging.info('Nulos substituídos.')
    return df

# Remove colunas e ordena DataFrame
def limpar_organizar(df, cols_remover, ordenar_por):
    df.drop(columns=cols_remover, inplace=True)
    df.sort_values(by=ordenar_por, inplace=True)
    df.reset_index(drop=True, inplace=True)
    logging.info(f'Dados organizados por {ordenar_por}.')
    return df

# Cria pasta de saída se não existir
def criar_pasta(pasta=OUTPUT_DIR):
    if not os.path.exists(pasta):
        os.makedirs(pasta)
        logging.info(f'Pasta criada: {pasta}')
    else:
        logging.info(f'Pasta já existe: {pasta}')

# Salva DataFrame em CSV com timestamp no nome
def salvar_csv(df, nome_base, pasta=OUTPUT_DIR):
    ts = datetime.now().strftime('%Y%m%d_%H%M%S')
    arquivo = os.path.join(pasta, f'{nome_base}_{ts}.csv')
    df.to_csv(arquivo, index=False)
    logging.info(f'Arquivo salvo: {arquivo}')

# Função principal que executa todo o fluxo
def main(url, pasta_saida=OUTPUT_DIR):
    logging.info('Carregando dados...')
    df = DataLoader.carregar(url)
    validar_colunas(df, ['Tipo', 'Valor', 'Condominio', 'Bairro'])
    df = tratar_nulos(df)
    df = df.query("Tipo == 'Apartamento' and Valor > 0 and Condominio > 0").copy()
    validar_colunas(df, ['Quartos', 'Area'])
    if df.isnull().values.any():
        logging.warning('Existem nulos após filtragem.')
    df = limpar_organizar(df, ['Tipo'], ['Bairro', 'Valor'])
    criar_pasta(pasta_saida)

    salvar_csv(df, 'apartamentos_completo', pasta_saida)

    filtro_1q = df[(df['Quartos'] == 1) & (df['Valor'] < 1200)]
    salvar_csv(filtro_1q, 'apartamentos_1quarto_menor_1200', pasta_saida)

    filtro_2q = df[(df['Quartos'] == 2) & (df['Valor'] < 3000) & (df['Area'] > 70)]
    salvar_csv(filtro_2q, 'apartamentos_2quartos_menor_3000_area_maior_70', pasta_saida)

    logging.info('Primeiros registros do DataFrame final:')
    logging.info(df.head())

    return df

# Execução do script
if __name__ == '__main__':
    url = 'https://raw.githubusercontent.com/alura-cursos/pandas-conhecendo-a-biblioteca/main/base-de-dados/aluguel.csv'
    main(url)
1 resposta
solução!

Olá, Marinaldo! Como vai?

Muito bem! Continue resolvendo os desafios e compartilhando com a comunidade Alura.

Observei que você explorou o uso de funções reutilizáveis para tornar o seu código mais modular, utilizou muito bem o tratamento de arquivos com diferentes extensões e ainda compreendeu a importância do logging para facilitar o monitoramento e a depuração.

Permaneça postando as suas soluções, com certeza isso ajudará outros estudantes e tem grande relevância para o fórum.

Uma dica interessante para o futuro é usar o parâmetro errors='ignore' na leitura de arquivos CSV para evitar falhas causadas por caracteres mal formados. Dessa forma:

pd.read_csv('arquivo.csv', encoding='utf-8', errors='ignore')

Isso faz com que seu script fique ainda mais resiliente em situações com arquivos mal formatados.

Ícone de sugestão Para saber mais:

Sugestão de conteúdo para você mergulhar ainda mais sobre o tema:

Alguns materiais estão em inglês, mas é possível compreendê-los usando o recurso de tradução de páginas do próprio navegador.

Fico à disposição! E se precisar, conte sempre com o apoio do fórum.

Abraço e bons estudos!

AluraConte com o apoio da comunidade Alura na sua jornada. Abraços e bons estudos!