Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

Erro com key ao executar DAG

Minha DAG apresenta erro ao abrir o Airflow. O erro e':

Broken DAG: [/home/vini/Documents/airflowalura/dags/dados_climaticos.py] Traceback (most recent call last): File "/home/vini/Documents/airflowalura/venv/lib/python3.9/site-packages/airflow/models/dag.py", line 366, in init validate_key(dag_id) File "/home/vini/Documents/airflowalura/venv/lib/python3.9/site-packages/airflow/utils/helpers.py", line 63, in validate_key raise AirflowException( airflow.exceptions.AirflowException: The key 'dados climaticos' has to be made of alphanumeric characters, dashes, dots and underscores exclusively

Meu programa e':

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.macros import ds_add
import pendulum
from os.path import join
import pandas as pd

with DAG(
    "dados climaticos",
    start_date = pendulum.datetime(2023, 3, 6, tz="UTC"),
    schedule_interval='0 0 * * 1', #executa toda segunda feira, ultimo argumento e' o dia da semana, dos dois primeiros sao a hora e minuto e o terceiro e quarto sao o dia do mes e mes 
) as dag:

    tarefa_1 = BashOperator(
        task_id = 'cria_pasta',
        bash_command = 'mkdir -p "/home/vini/Documents/airflowalura/semana={{data_interval_end.strftime("%Y-%m-%d")}}"'
    )

    def extrai_dados(data_interval_end):
        city = 'Boston'
        key = 'EX43E3F5FNUECLK6GCZ9YYJRY'

        URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
                    f'{city}/{data_interval_end}/{ds_add(data_interval_end, 7)}?unitGroup=metric&include=days&key={key}&contentType=csv')

        dados = pd.read_csv(URL)

        file_path = f'/home/vini/Documents/airflowalura/semana={data_interval_end}/'

        dados.to_csv(file_path + 'dados_brutos.csv')
        dados[['datetime', 'tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
        dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')


    tarefa_2 = PythonOperator(
        task_id = 'extrai_dados',
        python_callable = extrai_dados,
        op_kwargs = {'data_interval_end': '{{data_interval_end.strftime("%Y-%m-%d")}}'}
    )

    tarefa_1 >> tarefa_2

To estudando num Windows, utilizando maquina virtual.

1 resposta
solução!

Oii Ronaldo, tudo bem contigo?

Pelo erro apresentado, parece que o nome do seu DAG contém um espaço em branco ("dados climaticos"). O Airflow exige que o nome do DAG seja composto apenas por caracteres alfanuméricos, traços, pontos e sublinhados. Portanto, sugiro que você altere o nome do seu DAG para algo como "dados_climaticos":

with DAG(
    "dados_climaticos",
    # continuação do código

Espero ter ajudado, qualquer dúvida estou à disposição :)

Bons estudos!