1
resposta

Dúvidas de paralelismo com celery executor

Na aula é mostrado esse código que funciona bem:

TICKERS = [
    "AAPL",
    "MSFT",
    "GOOG",
    "TSLA"
]

@task()
def get_history(ticker, ds=None, ds_nodash=None):
    file_path = f"/home/millenagena/Documents/airflowalura/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
    Path(file_path).parent.mkdir(parents=True, exist_ok=True)
    yfinance.Ticker(ticker).history(
        period="1d", 
        interval="1h",
        start=ds_add(ds, -1),
        end=ds,
        prepost=True,
    ).to_csv(file_path)

@dag(
    schedule_interval = "0 0 * * 2-6",
    start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup = True
)
def get_stocks_dag():
    for ticker in TICKERS:
        get_history.override(task_id=ticker)(ticker)

dag = get_stocks_dag() 

Porém, supomos que TICKERS = [ "AAPL", "MSFT", "GOOG","TSLA"] fosse uma lista com 5 milhões de posições,(Ou dentro de def get_stocks_dag(): Alguma ação demorada) resulta no erro:

File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
  raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/g/testedag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.7.1/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.7.1/best-practices.html#reducing-dag-complexity, PID: 87

Qual a forma correta de se lidar nessa situação? Pelo visto o airflow faz uma pré importação.

Tentei algo como, um @task chamar outro @task(Resolve o erro de DAG import time) mas não funciona o paralelismo do celery executor. Estou preso nisso. Preciso que o código continue da mesma forma com paralelismo.

Código que tentei mas não funcionou o paralelismo:

TICKERS = [
    "AAPL",
    "MSFT",
    "GOOG",
    "TSLA"
] * 5000000 # imaginar aqui como uma lista de 5 milhões de posições

@task()
def get_history(ticker, ds=None, ds_nodash=None):
    file_path = f"/home/millenagena/Documents/airflowalura/stocks/{ticker}/{ticker}_{ds_nodash}.csv"
    Path(file_path).parent.mkdir(parents=True, exist_ok=True)
    yfinance.Ticker(ticker).history(
        period="1d", 
        interval="1h",
        start=ds_add(ds, -1),
        end=ds,
        prepost=True,
    ).to_csv(file_path)


@task()
def funcao_intermediaria():
      for ticker in TICKERS:
        get_history.override(task_id=ticker)(ticker)



@dag(
    schedule_interval = "0 0 * * 2-6",
    start_date = pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup = True
)
def get_stocks_dag():
    funcao_intermediaria()
  

dag = get_stocks_dag() 

Não sei se fui claro, mas o meu problema é que tenho uma chamada de API está sendo executada durante a importação da DAG. E isso pode levar muito tempo, dependendo do tamanho do conjunto de dados. Gostaria de saber qual o jeito correto de se fazer isso e manter o paralelismo.

Agradeço qualquer ajuda.

1 resposta

Oi

O problema que você está enfrentando está relacionado ao fato de que a execução de código na importação de uma DAG pode demorar muito, especialmente quando você tem operações demoradas, como chamadas de API, durante a definição da DAG. O Airflow, por padrão, executa todo o código de importação da DAG quando inicializa.

A melhor prática é evitar a execução de código demorado durante a importação da DAG. Aqui estão algumas abordagens para lidar com isso:

  1. Move a lógica demorada para o corpo da DAG: Em vez de executar chamadas demoradas durante a importação, você pode mover essa lógica para dentro das tarefas da DAG. Você pode usar a tarefa PythonOperator para executar funções Python arbitrárias como tarefas da DAG. Isso permitirá que você mantenha o paralelismo.

    Exemplo:

    from airflow.operators.python_operator import PythonOperator
    
    TICKERS = ["AAPL", "MSFT", "GOOG", "TSLA"] * 5000000
    
    def get_history(ticker, ds=None, ds_nodash=None):
        # Lógica para obter histórico
    
    def process_ticker(ticker, **kwargs):
        get_history(ticker, ds=kwargs['ds'], ds_nodash=kwargs['ds_nodash'])
    
    with DAG(
        'get_stocks_dag',
        schedule_interval="0 0 * * 2-6",
        start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
        catchup=True,
    ) as dag:
        for ticker in TICKERS:
            process_ticker_task = PythonOperator(
                task_id=f'process_ticker_{ticker}',
                python_callable=process_ticker,
                provide_context=True,
                op_args=[ticker],
            )
    
    dag = get_stocks_dag()
    

    Dessa forma, a chamada para obter o histórico é feita durante a execução da DAG e não durante a importação.

  2. Divida a lógica em várias DAGs menores: Se a quantidade de tarefas for muito grande, você pode considerar dividir a lógica em várias DAGs menores, cada uma lidando com um subconjunto dos tickers. Isso pode ajudar a reduzir o tempo de importação.

  3. Use a funcionalidade de schedule_interval=None: Se a DAG não precisar ser executada automaticamente em um cronograma regular, você pode definir schedule_interval=None e desencadear manualmente quando necessário. Isso evita a execução demorada durante a importação.

Lembre-se de ajustar essas abordagens conforme necessário para atender aos requisitos específicos do seu caso.