1
resposta

[Dúvida] Problema para execução da Tarefa 2

Olá, estou tendo problemas na execução da Tarefa 2, as pastas são criadas porem a extração de dados não ocorre. Alguem conseguiria me dar uma luz?

from airflow import DAG 
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.macros import ds_add
from os.path import join
import pandas as pd

with DAG(
    "dados_climaticos",
    start_date=pendulum.datetime(2024,2,22, tz="UTC"),
    schedule_interval='0 0 * * 1',
) as dag:
    
    tarefa_1=BashOperator(
        task_id='cria_pasta',
        bash_command='mkdir -p "/home/vinicius/Documentos/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d")}}"'
    )


    def extraiDados(data_interval_end):
        city = 'Boston'
        key = 'G6SHGYFVKN5KY25GFE68S2BYA'

        URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
                    f'{city}/{data_interval_end}/{ds_add(data_interval_end,7)}?unitGroup=metric&include=days&key={key}&contentType=csv')

        dados = pd.read_csv(URL)

        file_path = f'/home/vinicius/Documentos/airflow/semana={data_interval_end}/'

        dados.to_csv(file_path + 'dados_brutos.csv')
        dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
        dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')

    tarefa_2=PythonOperator(
        task_id='Extrai_dados',
        python_callable=extraiDados,
        op_kwargs={'data_interval_end':'{{data_interval_end.strftime("%Y-%m-%d")}}'}
    )


tarefa_1>>tarefa_2

Desde já agradeço

1 resposta

Olá Vinicius, tudo bem?

Analisando o seu código, ele parece estar correto! Mas, para identificar o que pode estar causando esse comportamento, sugiro algumas verificações. Primeiro, certifique de que o caminho da pasta onde os dados devem ser salvos está correto.

Além disso, a URL pode não estar sendo gerada corretamente. Para verificar isso, adicione um print(URL) dentro da função extraiDados, antes de pd.read_csv(URL), para confirmar que a URL está correta.

def extraiDados(data_interval_end):
        #códigos anteriores omitidos
        URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
                    f'{city}/{data_interval_end}/{ds_add(data_interval_end,7)}?unitGroup=metric&include=days&key={key}&contentType=csv')

        print(f"URL gerada: {URL}")  # Verificar a URL gerada        
        dados = pd.read_csv(URL)
        

Também é importante garantir que a chave da API (key) está correta e ainda válida. A chave da API pode expirar ou ter restrições que impeçam o acesso aos dados.

Espero ter ajudado e fico à disposição.

Abraços e bons estudos!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!