Fiz o seguinte código baseado na aula "Criando operadores conectados a ganchos":
from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime
class TwitterOperator(BaseOperator):
def __init__(self, query, conn_id = None, start_time = None, end_time = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query = query
self.conn_id = conn_id
self.start_time = start_time
self.end_time = end_time
def execute(self, context):
hook = TwitterHook(query=self.query, conn_id=self.conn_id, start_time=self.start_time, end_time=self.end_time)
for pg in hook.run():
print(json.dumps(pg, indent=4, sort_keys=True))
if __name__ == "__main__":
with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:
to = TwitterOperator(query="TedLasso", task_id="test_run")
ti = TaskInstance(task=to, execution_date=datetime.now())
to.execute(ti.get_template_context())
E estou tendo o seguinte erro:
twitter_operator.py:27 DeprecationWarning: Passing an execution_date to TaskInstance()
is deprecated in favour of passing a run_id
Traceback (most recent call last):
File "twitter_operator.py", line 27, in
ti = TaskInstance(task=to, execution_date=datetime.now())
File "", line 4, in init
File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 437, in initializeinstance
manager.dispatch.init_failure(self, args, kwargs)
File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in exit
with_traceback=exc_tb,
File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 434, in initializeinstance
return manager.original_init(mixed[1:], *kwargs)
File "/Users/luiz.leao/Documents/Study/datapipeline/.env/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 585, in init
) from None
airflow.exceptions.DagRunNotFound: DagRun for 'TwitterTest' with date 2022-05-31T16:35:35.948550+00:00 not found
Já procurei em diversos lugares como resolver, mas ainda não encontrei. Se puderem ajudar, agradeço muito!