Rodando a nova versão do Airflow (2.2.4) estava obtendo um erro ao executar o código igual ao da aula.
Traceback (most recent call last):
File "/Users/rafa/Documents/projects/Pipeline2/airflow/plugins/operators/twitter_operator.py", line 62, in <module>
ti.run()
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1616, in run
res = self.check_and_change_state_before_execution(
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1199, in check_and_change_state_before_execution
if not self.are_dependencies_met(
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1024, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1045, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 101, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, dep_context)
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/ti_deps/deps/runnable_exec_date_dep.py", line 36, in _get_dep_statuses
logical_date = ti.get_dagrun(session).execution_date
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
File "/Users/rafa/Documents/projects/Pipeline2/venv/lib/python3.10/site-packages/sqlalchemy/orm/query.py", line 3500, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
Apenas alertando aos alunos que também trancarem nessa parte, como eu tranquei, que é só substituir o trecho do código onde se instancia novamente a TaskInstance() para carregar o Jinja adequadamente, trocando:
ti = TaskInstance(task=to, execution_date=datetime.now())
ti.run()
por apenas:
to.run()
O jinja irá funcionar no Airflow 2