Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

[Sugestão] Monitoramento de download

import asyncio
import random
import string
import time
import os

def gerar_e_salvar_arquivo(nome, tamanho_mb):
    """Gera um arquivo escrevendo blocos de 1MB para economizar memória RAM."""
    bloco_1mb = ''.join(random.choices(string.ascii_letters + string.digits, k=1024 * 1024))
    with open(nome, 'w') as f:
        for _ in range(tamanho_mb):
            f.write(bloco_1mb)

async def criar_arquivo(nome, tamanho):
    print(f'Criando {nome} ({tamanho}MB)...')
    await asyncio.to_thread(gerar_e_salvar_arquivo, nome, tamanho)

async def download_arquivo(nome):
    # Gera o tempo aleatório e guarda na variável
    tempo_download = random.randint(1, 5)
    
    print(f'Iniciando download de {nome} (Tempo estimado: {tempo_download}s)...')
    await asyncio.sleep(tempo_download)
    print(f'Download de {nome} concluido em {tempo_download} segundos!')

async def main():
    arquivos = []
    
    # Etapa 1: Criação dos arquivos
    for i in range(1, 6):
        nome_arquivo = f'arquivo{i}.txt'
        tamanho_arquivo = random.randint(10, 50)
        await criar_arquivo(nome_arquivo, tamanho_arquivo)
        arquivos.append(nome_arquivo)

    print("\nTodos os arquivos criados. Iniciando downloads simultaneos...\n")

    # Marca o tempo de início dos downloads
    tempo_inicio = time.time()

    # Etapa 2: Executa os downloads simultaneamente
    await asyncio.gather(*(download_arquivo(arquivo) for arquivo in arquivos))
    
    # Calcula o tempo total
    tempo_total = time.time() - tempo_inicio
    
    print("-" * 40)
    print(f"Processo de download finalizado! Tempo total: {tempo_total:.2f} segundos.\n")

    # Etapa 3: Limpeza dos arquivos criados
    print("Iniciando limpeza do disco...")
    for arquivo in arquivos:
        # Verifica se o arquivo realmente existe antes de tentar apagar
        if os.path.exists(arquivo):
            os.remove(arquivo)
            print(f"- {arquivo} apagado com sucesso.")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nProcesso cancelado pelo usuario.")
1 resposta
solução!

Oi, Marinaldo. Como vai?

Ótimo trabalho ao usar asyncio.gather() para executar os downloads de forma simultânea e asyncio.to_thread() para criar os arquivos sem travar o fluxo assíncrono. Também ficou muito bom o cuidado com a limpeza usando os.path.exists() antes de remover os arquivos. Uma dica interessante para o futuro é usar time.perf_counter() para medir intervalos de tempo com mais precisão em testes de desempenho:


import time

inicio = time.perf_counter()

# codigo que sera medido
time.sleep(2)

fim = time.perf_counter()
print(f'Tempo total: {fim - inicio:.2f} segundos')

Esse código mede quanto tempo uma parte do programa levou para executar, sendo uma boa opção para comparar tarefas assíncronas, downloads simulados e processos de leitura ou escrita de arquivos.

Alura Conte com o apoio da comunidade Alura na sua jornada. Abraços e bons estudos!