Olá, tudo bem?
Estou com problemas ao executar a DAG, o código está igual o da instrutora (pelo menos acredito que esteja igual kkkk só se passou algo...) e a DAG está funcionando.
Eu não entendi muito bem como e onde identificar os erros...
Abaixo o meu código:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.macros import ds_add
import pendulum
from os.path import join
import pandas as pd
with DAG(
"dados_climaticos",
start_date = pendulum.datetime(2023, 12, 25, tz = "UTC"),
schedule_interval = '0 0 * * 1', # executar toda segunda-deira
) as dag:
tarefa_1 = BashOperator(
task_id = 'cria_pasta',
bash_command = 'mkdir -p "/home/nikaao/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d)}}"'
)
def extrai_dados(data_interval_end):
city = 'Boston'
key = 'NAO_VOU_FALAR_NE'
url = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
f'{city}/{data_interval_end}/{ds_add(data_interval_end, 7)}?unitGroup=metric&include=days&key={key}&contentType=csv')
dados = pd.read_csv(url)
file_path = f'/home/nikaao/Documents/airflowalura/semana={data_interval_end}/'
dados.to_csv(file_path + 'dados_brutos.csv')
dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')
tarefa_2 = PythonOperator(
task_id = 'extrai_dados',
python_callable = extrai_dados,
op_kwargs = {'data_interval_end': '{{data_interval_end.strftime("%Y-%m-%d)}}"'}
)
tarefa_1 >> tarefa_2
Aparentemente o erro é na tarefa_1, veja:
Obs.: A única coisa que alterei foi a data, mas, testei com a data da instrutora e também não funcionou.