3
respostas

[Dúvida] Problema Download CSV API

Quando executo o DAG no airflow, com base no código abaixo, a task 1 é até executada, porém a task 2 não é, ou seja, as pastas até são criadas, mas não baixa o CSV da API.

Até encontrei o log da task, mas não ficou claro do que se trata o erro.

O mais engraçado é que quando eu executo o arquivo que baixa o csv sem ser criando um DAG, ele funciona normalmente. Tá difícil resolver isso =C

Segue código:

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from os.path import join
import pandas as pd
from airflow.macros import ds_add

with DAG(
        "dados_climaticos",
        start_date=pendulum.datetime(2024, 1, 29, tz="UTC"),
        schedule_interval='0 0 * * 1', # executar toda segunda feira
) as dag:
    tarefa_1 = BashOperator(
            task_id = 'cria_pasta',
            bash_command = 'mkdir -p "/home/felipe/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d")}}"'
            )
    def extrai_dados(data_interval_end):
        city = 'Boston'
        key = '...'
        URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
        f'{city}/{data_interval_end}/{ds_add(data_interval_end, 7)}?unitGroup=metric&include=days&key={key}&contentType=csv')
        dados = pd.read_csv(URL)

        file_path = f'home/felipe/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d")}}/'

        dados.to_csv(file_path + 'dados_brutos.csv')
        dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
        dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')
    
    tarefa_2 = PythonOperator(
        task_id = 'extrai_dados',
        python_callable = extrai_dados,
        op_kwargs = {'data_interval_end': '{{data_interval_end.strftime("%Y-%m-%d")}}'}
    )
    tarefa_1 >> tarefa_2
3 respostas

Olá, Felipe!

Pelo que entendi, você está tendo problemas com a execução da segunda tarefa (tarefa_2) do seu DAG no Apache Airflow. Isso pode ser causado por várias razões, incluindo problemas com o código Python, problemas com a configuração do DAG ou problemas com o servidor Airflow.

Aqui estão algumas coisas que você pode verificar:

  • Código Python: Verifique se o código Python na sua função extrai_dados está correto. Certifique-se de que a URL está correta, que você está conseguindo acessar a API e que os dados estão sendo lidos corretamente para o DataFrame.
  • Configuração do DAG: Verifique se a configuração do seu DAG está correta. Certifique-se de que a função extrai_dados está sendo chamada corretamente na PythonOperator e que o argumento data_interval_end está sendo passado corretamente.
  • Servidor Airflow: Verifique se o servidor Airflow está funcionando corretamente. Você pode fazer isso tentando executar outro DAG ou verificando os logs do servidor Airflow.
  • Permissões de arquivo: Verifique se o Airflow tem permissões para escrever no diretório especificado. Se o Airflow não tiver permissões para escrever no diretório, ele não será capaz de criar o arquivo CSV.

Espero que isso ajude! Se você tiver mais perguntas, fique à vontade para perguntar.


Pois é! O Airflow até diz que não rodou a task, mas não achei onde que ele informa qual o erro específico. Onde que vejo isso? Seria interessante uma forma de debbugar o código. Não sei se tem como rsrs

Segue Log

[2024-02-22 08:34:36,589] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: dados_climaticos.extrai_dados manual__2024-02-22T11:34:29.209800+00:00 [queued]>
[2024-02-22 08:34:36,597] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: dados_climaticos.extrai_dados manual__2024-02-22T11:34:29.209800+00:00 [queued]>
[2024-02-22 08:34:36,598] {taskinstance.py:1356} INFO - 
--------------------------------------------------------------------------------
[2024-02-22 08:34:36,598] {taskinstance.py:1357} INFO - Starting attempt 1 of 1
[2024-02-22 08:34:36,598] {taskinstance.py:1358} INFO - 
--------------------------------------------------------------------------------
[2024-02-22 08:34:36,616] {taskinstance.py:1377} INFO - Executing <Task(PythonOperator): extrai_dados> on 2024-02-22 11:34:29.209800+00:00
[2024-02-22 08:34:36,624] {standard_task_runner.py:52} INFO - Started process 17770 to run task
[2024-02-22 08:34:36,633] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'dados_climaticos', 'extrai_dados', 'manual__2024-02-22T11:34:29.209800+00:00', '--job-id', '68', '--raw', '--subdir', 'DAGS_FOLDER/dados_climaticos.py', '--cfg-path', '/tmp/tmpd37ppgvi', '--error-file', '/tmp/tmp_hd0vrsb']
[2024-02-22 08:34:36,634] {standard_task_runner.py:80} INFO - Job 68: Subtask extrai_dados
[2024-02-22 08:34:36,694] {task_command.py:370} INFO - Running <TaskInstance: dados_climaticos.extrai_dados manual__2024-02-22T11:34:29.209800+00:00 [running]> on host felipe-virtual-machine
[2024-02-22 08:34:36,756] {taskinstance.py:1569} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dados_climaticos
AIRFLOW_CTX_TASK_ID=extrai_dados
AIRFLOW_CTX_EXECUTION_DATE=2024-02-22T11:34:29.209800+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2024-02-22T11:34:29.209800+00:00
[2024-02-22 08:34:36,758] {logging_mixin.py:115} INFO - 2024-02-26
[2024-02-22 08:34:36,758] {logging_mixin.py:115} INFO - 2024-02-19
[2024-02-22 08:34:37,252] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/felipe/Documents/airflowalura/dags/dados_climaticos.py", line 31, in extrai_dados
    dados.to_csv(file_path + 'dados_brutos.csv')
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/util/_decorators.py", line 333, in wrapper
    return func(*args, **kwargs)
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/core/generic.py", line 3961, in to_csv
    return DataFrameRenderer(formatter).to_csv(
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/io/formats/format.py", line 1014, in to_csv
    csv_formatter.save()
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/io/formats/csvs.py", line 251, in save
    with get_handle(
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/io/common.py", line 749, in get_handle
    check_parent_directory(str(handle))
  File "/home/felipe/Documents/airflowalura/venv/lib/python3.9/site-packages/pandas/io/common.py", line 616, in check_parent_directory
    raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'")
OSError: Cannot save file into a non-existent directory: 'home/felipe/Documents/airflowalura/semana={data_interval_end.strftime("%Y-%m-%d")}'
[2024-02-22 08:34:37,273] {taskinstance.py:1395} INFO - Marking task as FAILED. dag_id=dados_climaticos, task_id=extrai_dados, execution_date=20240222T113429, start_date=20240222T113436, end_date=20240222T113437
[2024-02-22 08:34:37,286] {standard_task_runner.py:92} ERROR - Failed to execute job 68 for task extrai_dados (Cannot save file into a non-existent directory: 'home/felipe/Documents/airflowalura/semana={data_interval_end.strftime("%Y-%m-%d")}'; 17770)
[2024-02-22 08:34:37,298] {local_task_job.py:156} INFO - Task exited with return code 1
[2024-02-22 08:34:37,323] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check