Tudo certo até o momento do Airflow encontrar minha nova DAG, é como se não "enxergasse" a pasta DAGS ou o get_stocks.py
Tudo certo até o momento do Airflow encontrar minha nova DAG, é como se não "enxergasse" a pasta DAGS ou o get_stocks.py
Oii Marcelo, tudo bem contigo?
Vou te pedir que compartilhe comigo algumas informações para tentarmos descobrir o motivo do seu erro:
Além disso, vou te pedir que execute o comando airflow dags list
no seu terminal. Esse comando serve para listar todos os DAGs que estão no Airflow:
Depois de executar esse comando, confira no log de execução se o seu DAG aparece na lista ou não.
Aguardo seu retorno :)
Olá, segue o print do dag list e abaixo o cód do get_stocks.py !
import yfinance
from airflow.decorators import dag, task
from airflow.macros import ds_add
from pathlib import Path
import pendulum
TICKERS = [
"DIS",
"MCD",
"AAPL",
"AMZN",
"TSLA",
"BAC",
"MSFT",
"WFC",
"KO",
"NFLX",
"TWTR",
"NKE",
"SNAP"
]
dag(
scheduler_interval = "0 0 * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup = True
)
def get_stocks_dag():
for ticker in TICKERS:
get_history.override(task_id=ticker)(ticker)
@task()
def get_history(ticker,ds,ds_nodash=None):
file_path = f"/home/lmarceloc/airflow/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
yfinance.Ticker(ticker).history(
period = "1d",
interval = "1h",
start = ds_add(ds, -1),
end = ds,
prepost = "True"
).to_csv(file_path)
dag = get_stocks_dag()
Oii Marcelo!
Analisando o código do seu DAG aqui, eu notei alguns erros:
@dag
você esqueceu de colocar o @
antes do termo dag
scheduler_interval
quando o correto seria schedule_interval
, sem esse "r".get_history
, você esqueceu de colocar ds=None
Acredito que corrigindo esses erros seu DAG deve aparecer normalmente na lista de DAGs do Airflow.
De todo modo, sugiro que dê uma uma olhadinha na atividade Faça como eu fiz e confira cada parte do seu código pra ver se está tudo certinho :)
Qualquer dúvida estou à disposição.
Fiz as correções indicadas e mesmo assim não aparece o dag.
import yfinance
from airflow.decorators import dag, task
from airflow.macros import ds_add
from pathlib import Path
import pendulum
TICKERS = [
"DIS",
"MCD",
"AAPL",
"AMZN",
"TSLA",
"BAC",
"MSFT",
"WFC",
"KO",
"NFLX",
"TWTR",
"NKE",
"SNAP"
]
@task()
def get_history(ticker,ds=None, ds_nodash=None):
file_path = f"/home/lmarceloc/airflowmarcelo/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
yfinance.Ticker(ticker).history(
period = "1d",
interval = "1h",
start = ds_add(ds, -1),
end = ds,
prepost = "True"
).to_csv(file_path)
@dag(
schedule_interval = "0 0 * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
start_data = pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup = True
)
def get_stocks_dag():
for ticker in TICKERS:
get_history.override(task_id=ticker)(ticker)
dag = get_stocks_dag()
Oii Marcelo!
Executando seu DAG aqui, percebi que nos parâmetros dele você passou start_data
, no entanto o correto é start_date
:
@dag(
schedule_interval = "0 0 * * 2-6", #00 horas / **=> todos os dias do mês / 2-6 => Terça a Sexta
start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup = True
)
Acredito que corrigindo esse erro seu DAG deve funcionar corretamente :)
Bons estudos!
Vou marcar como concluído pq deve ser algo simples que está passando despercebido aqui localmente e nada referente a erro no cód. obrigado pelo tempo para correção.