Na aula é mostrado esse código que funciona bem:
TICKERS = [
"AAPL",
"MSFT",
"GOOG",
"TSLA"
]
@task()
def get_history(ticker, ds=None, ds_nodash=None):
file_path = f"/home/millenagena/Documents/airflowalura/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
yfinance.Ticker(ticker).history(
period="1d",
interval="1h",
start=ds_add(ds, -1),
end=ds,
prepost=True,
).to_csv(file_path)
@dag(
schedule_interval = "0 0 * * 2-6",
start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup = True
)
def get_stocks_dag():
for ticker in TICKERS:
get_history.override(task_id=ticker)(ticker)
dag = get_stocks_dag()
Porém, supomos que TICKERS = [ "AAPL", "MSFT", "GOOG","TSLA"] fosse uma lista com 5 milhões de posições,(Ou dentro de def get_stocks_dag(): Alguma ação demorada) resulta no erro:
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/g/testedag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.7.1/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.7.1/best-practices.html#reducing-dag-complexity, PID: 87
Qual a forma correta de se lidar nessa situação? Pelo visto o airflow faz uma pré importação.
Tentei algo como, um @task chamar outro @task(Resolve o erro de DAG import time) mas não funciona o paralelismo do celery executor. Estou preso nisso. Preciso que o código continue da mesma forma com paralelismo.
Código que tentei mas não funcionou o paralelismo:
TICKERS = [
"AAPL",
"MSFT",
"GOOG",
"TSLA"
] * 5000000 # imaginar aqui como uma lista de 5 milhões de posições
@task()
def get_history(ticker, ds=None, ds_nodash=None):
file_path = f"/home/millenagena/Documents/airflowalura/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
yfinance.Ticker(ticker).history(
period="1d",
interval="1h",
start=ds_add(ds, -1),
end=ds,
prepost=True,
).to_csv(file_path)
@task()
def funcao_intermediaria():
for ticker in TICKERS:
get_history.override(task_id=ticker)(ticker)
@dag(
schedule_interval = "0 0 * * 2-6",
start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup = True
)
def get_stocks_dag():
funcao_intermediaria()
dag = get_stocks_dag()
Não sei se fui claro, mas o meu problema é que tenho uma chamada de API está sendo executada durante a importação da DAG. E isso pode levar muito tempo, dependendo do tamanho do conjunto de dados. Gostaria de saber qual o jeito correto de se fazer isso e manter o paralelismo.
Agradeço qualquer ajuda.