Solucionado (ver solução)
Solucionado
(ver solução)
6
respostas

Airflow não acha o DAG

Tudo certo até o momento do Airflow encontrar minha nova DAG, é como se não "enxergasse" a pasta DAGS ou o get_stocks.py

Insira aqui a descrição dessa imagem para ajudar na acessibilidadeInsira aqui a descrição dessa imagem para ajudar na acessibilidade

6 respostas

Oii Marcelo, tudo bem contigo?

Vou te pedir que compartilhe comigo algumas informações para tentarmos descobrir o motivo do seu erro:

  • compartilhe o comando que está utilizando para exportar a variável AIRFLOW_HOME
  • compartilhe o código completo do seu DAG para eu testar na minha máquina

Além disso, vou te pedir que execute o comando airflow dags list no seu terminal. Esse comando serve para listar todos os DAGs que estão no Airflow:

Alt text: print do terminal onde o comando airflow dags list está sendo executado

Depois de executar esse comando, confira no log de execução se o seu DAG aparece na lista ou não.

Aguardo seu retorno :)

Olá, segue o print do dag list e abaixo o cód do get_stocks.py !Insira aqui a descrição dessa imagem para ajudar na acessibilidade Insira aqui a descrição dessa imagem para ajudar na acessibilidade

import yfinance
from airflow.decorators import dag, task
from airflow.macros import ds_add
from pathlib import Path
import pendulum

TICKERS = [
  "DIS",
  "MCD",
  "AAPL",
  "AMZN",
  "TSLA",
  "BAC",
  "MSFT",
  "WFC",
  "KO",
  "NFLX",
  "TWTR",
  "NKE",
  "SNAP"
]

dag(
  scheduler_interval = "0 0  * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
  start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
  catchup = True
)

def get_stocks_dag():
  for ticker in TICKERS:
    get_history.override(task_id=ticker)(ticker)

@task()
def get_history(ticker,ds,ds_nodash=None):
  file_path = f"/home/lmarceloc/airflow/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
  Path(file_path).parent.mkdir(parents=True, exist_ok=True)
  yfinance.Ticker(ticker).history(
    period = "1d",
    interval = "1h",
    start = ds_add(ds, -1),
    end = ds,
    prepost = "True"
  ).to_csv(file_path)


dag = get_stocks_dag()

Oii Marcelo!

Analisando o código do seu DAG aqui, eu notei alguns erros:

  • ao utilizar o decorator @dag você esqueceu de colocar o @ antes do termo dag
  • nos parâmetros do DAG você escreveu scheduler_interval quando o correto seria schedule_interval, sem esse "r".
  • ao especificar os parâmetros da função get_history, você esqueceu de colocar ds=None

Acredito que corrigindo esses erros seu DAG deve aparecer normalmente na lista de DAGs do Airflow.

De todo modo, sugiro que dê uma uma olhadinha na atividade Faça como eu fiz e confira cada parte do seu código pra ver se está tudo certinho :)

Qualquer dúvida estou à disposição.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Fiz as correções indicadas e mesmo assim não aparece o dag.

import yfinance
from airflow.decorators import dag, task
from airflow.macros import ds_add
from pathlib import Path
import pendulum

TICKERS = [
  "DIS",
  "MCD",
  "AAPL",
  "AMZN",
  "TSLA",
  "BAC",
  "MSFT",
  "WFC",
  "KO",
  "NFLX",
  "TWTR",
  "NKE",
  "SNAP"
]


@task()
def get_history(ticker,ds=None, ds_nodash=None):
  file_path = f"/home/lmarceloc/airflowmarcelo/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
  Path(file_path).parent.mkdir(parents=True, exist_ok=True)
  yfinance.Ticker(ticker).history(
    period = "1d",
    interval = "1h",
    start = ds_add(ds, -1),
    end = ds,
    prepost = "True"
  ).to_csv(file_path)

@dag(
  schedule_interval = "0 0  * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
  start_data = pendulum.datetime(2022, 1, 1, tz="UTC"),
  catchup = True
)

def get_stocks_dag():
  for ticker in TICKERS:
    get_history.override(task_id=ticker)(ticker)

dag = get_stocks_dag()

Insira aqui a descrição dessa imagem para ajudar na acessibilidadeInsira aqui a descrição dessa imagem para ajudar na acessibilidade

solução!

Oii Marcelo!

Executando seu DAG aqui, percebi que nos parâmetros dele você passou start_data, no entanto o correto é start_date:

@dag(
    schedule_interval = "0 0  * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
    start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup = True
)

Acredito que corrigindo esse erro seu DAG deve funcionar corretamente :)

Bons estudos!

Vou marcar como concluído pq deve ser algo simples que está passando despercebido aqui localmente e nada referente a erro no cód. obrigado pelo tempo para correção.