0
respostas

Erro ao rodar ti.run()

Rodando a nova versão do Airflow (2.2.4) estava obtendo um erro ao executar o código igual ao da aula.

Traceback (most recent call last):
  File "/Users/rafa/Documents/projects/Pipeline2/airflow/plugins/operators/twitter_operator.py", line 62, in <module>
    ti.run()
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1616, in run
    res = self.check_and_change_state_before_execution(
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1199, in check_and_change_state_before_execution
    if not self.are_dependencies_met(
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1024, in are_dependencies_met
    for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1045, in get_failed_dep_statuses
    for dep_status in dep.get_dep_statuses(self, session, dep_context):
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 101, in get_dep_statuses
    yield from self._get_dep_statuses(ti, session, dep_context)
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/ti_deps/deps/runnable_exec_date_dep.py", line 36, in _get_dep_statuses
    logical_date = ti.get_dagrun(session).execution_date
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
    dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
  File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 3500, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

Apenas alertando aos alunos que também trancarem nessa parte, como eu tranquei, que é só substituir o trecho do código onde se instancia novamente a TaskInstance() para carregar o Jinja adequadamente, trocando:

    ti = TaskInstance(task=to, execution_date=datetime.now())
    ti.run()

por apenas:

    to.run()

O jinja irá funcionar no Airflow 2