1
resposta

Problema na execução do twitter_operator.py

Fiz o seguinte código baseado na aula "Criando operadores conectados a ganchos":

from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime

class TwitterOperator(BaseOperator):


    def __init__(self, query, conn_id = None, start_time = None, end_time = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.query = query
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time


    def execute(self, context):
        hook = TwitterHook(query=self.query, conn_id=self.conn_id, start_time=self.start_time, end_time=self.end_time)
        for pg in hook.run():
            print(json.dumps(pg, indent=4, sort_keys=True))


if __name__ == "__main__":
    with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:
        to = TwitterOperator(query="TedLasso", task_id="test_run")
        ti = TaskInstance(task=to, execution_date=datetime.now())
        to.execute(ti.get_template_context())

E estou tendo o seguinte erro:

twitter_operator.py:27 DeprecationWarning: Passing an execution_date to TaskInstance() is deprecated in favour of passing a run_id Traceback (most recent call last): File "twitter_operator.py", line 27, in ti = TaskInstance(task=to, execution_date=datetime.now()) File "", line 4, in init File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 437, in initializeinstance manager.dispatch.init_failure(self, args, kwargs) File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in exit with_traceback=exc_tb, File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 434, in initializeinstance return manager.original_init(mixed[1:], *kwargs) File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 585, in init ) from None airflow.exceptions.DagRunNotFound: DagRun for 'TwitterTest' with date 2022-05-31T16:35:35.948550+00:00 not found

Já procurei em diversos lugares como resolver, mas ainda não encontrei. Se puderem ajudar, agradeço muito!

1 resposta

Boa noite, tive o mesmo problema, consegui contornar da seguinte forma.

Ajuste o código da main:

if __name__ == "__main__":

    # criando dag para teste, para que assim seja possível executar o operator
    with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:

        to = TwitterOperator(query="AluraOline", task_id="test_run")

        # todos funcionaram, aparentemente tudo redundante, preciso entender melhor esse etapa.
        # ti = TaskInstance(task=to, run_id=to.task_id)
        # ti = TaskInstance(task=to, run_id=dag.dag_id)
        ti = TaskInstance(task=to)

        # também funcionou passando o próprio id da task to.execute(to.task_id)
        to.execute(ti.task_id)

Deixei alguns comentários no código. Não compreendi bem o ajutes, porém, entendi que o contexto era obtido atráves das id de identificação das tarefas, fui por esse raciocínio e deu certo. Espero que te ajude.