3
respostas

[Dúvida] DAG não funciona

Olá, tudo bem?

Estou com problemas ao executar a DAG, o código está igual o da instrutora (pelo menos acredito que esteja igual kkkk só se passou algo...) e a DAG está funcionando.

Eu não entendi muito bem como e onde identificar os erros...

Abaixo o meu código:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.macros import ds_add
import pendulum
from os.path import join
import pandas as pd


with DAG(
    "dados_climaticos",
    start_date  = pendulum.datetime(2023, 12, 25, tz = "UTC"),
    schedule_interval = '0 0 * * 1', # executar toda segunda-deira
) as dag:
    
    tarefa_1 = BashOperator(
    task_id = 'cria_pasta',
    bash_command = 'mkdir -p "/home/nikaao/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d)}}"'
    )

    def extrai_dados(data_interval_end):
        city = 'Boston'
        key = 'NAO_VOU_FALAR_NE'

        url = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
                f'{city}/{data_interval_end}/{ds_add(data_interval_end, 7)}?unitGroup=metric&include=days&key={key}&contentType=csv')

        dados = pd.read_csv(url)

        file_path = f'/home/nikaao/Documents/airflowalura/semana={data_interval_end}/'

        dados.to_csv(file_path + 'dados_brutos.csv')
        dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
        dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')

    tarefa_2 = PythonOperator(
        task_id = 'extrai_dados',
        python_callable = extrai_dados,
        op_kwargs = {'data_interval_end': '{{data_interval_end.strftime("%Y-%m-%d)}}"'}
    )

    tarefa_1 >> tarefa_2

Aparentemente o erro é na tarefa_1, veja:

Erro na DAG

Obs.: A única coisa que alterei foi a data, mas, testei com a data da instrutora e também não funcionou.

3 respostas

Olá, Nícolas!

Analisando o seu código e a imagem do erro, parece que o problema está na forma como você está passando a data para o comando mkdir dentro do BashOperator. Tenta assim.

No seu BashOperator, você definiu o bash_command assim:

bash_command = 'mkdir -p "/home/nikaao/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d)}}"'

Parece que há um pequeno erro na interpolação da string de data. O correto seria usar aspas simples em volta do comando e aspas duplas para a interpolação da variável da data, assim:

bash_command = 'mkdir -p /home/nikaao/Documents/airflowalura/semana={{ data_interval_end.strftime("%Y-%m-%d") }}'

Observe que também removi as aspas duplas em volta do caminho do diretório, pois elas não são necessárias e podem causar problemas na execução do comando.

Além disso, na tarefa_2, dentro do op_kwargs, você adicionou uma aspa extra no final da interpolação da data. Vamos corrigir isso também:

op_kwargs = {'data_interval_end': '{{ data_interval_end.strftime("%Y-%m-%d") }}'}

Com essas correções, seu código deve funcionar corretamente. Tente fazer essas alterações e execute a DAG novamente para ver se o problema foi resolvido. Qualquer coisa manda aqui de novo.

Bons estudos!

Tabém estou com o mesmo problema... Também já testei as outras soluções postas aqui e não deu bom.

No meu caso, até cria as pastas, sabe? Mas não baixa o csv da API. O engraçado é que quando eu executo o código que baixa o csv sem ser pelo airflow, ele baixa o csv certinho e tal. Mas pelo airflow não tem dado certo. Tou pra ter um infarto com isso