Li em no tópico [https://cursos.alura.com.br/forum/topico-problema-na-execucao-do-twitter_operator-py-224309](Problema na execução do twitter_operator.py) que na versão 2 do airflow a forma de criar o operador é diferente, porém não consegui criar um codigo que funcione, será que alguém ou o professor poderia mostrar como funcionaria na nova versão? ou ao menos uma documentação mais detalhada para eu poder tentar fazer funcionar
Obs: o código imprime no terminal as consultas, e retorna as informações como deveria fazer, porém este erro interfere na hora de seguir os próximos passos do curso. já procurei em vários lugares, porém não achei a solução.
Erro do terminal:
Traceback (most recent call last):
File "/home/otikdeb/data/airflow/plugins/operators/twitter_operator.py", line 68, in <module>
ti.run()
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1387, in run
self._run_raw_task(
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1176, in _run_raw_task
self._run_mini_scheduler_on_child_tasks(session)
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1186, in _run_mini_scheduler_on_child_tasks
dag_run = with_row_locks(
File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 3500, in one
raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()
Meu código:
import sys
from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
#uma DAG você pode ter parâmetros padrão que você vai
# mandar para todos operadores e esse decorator nos ajuda a aplicar eles
sys.path.append("/home/otikdeb/data/airflow/plugins")
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
from pathlib import Path
from os.path import join
template_fields = [
"query",
"file_path",
"start_time",
"end_time"
]
class TwitterOperator(BaseOperator):
@apply_defaults
def __init__(
self,
query,
file_path,
conn_id = None,
start_time = None,
end_time = None,
*args, **kargs
):
super().__init__(*args, **kargs)
self.query = query
self.file_path = file_path
self.conn_id = conn_id
self.start_time = start_time
self.end_time = end_time
#Vai olhar todo o caminho, verificar as pastas faltantes e criá-la
def create_parent_folder(self):
Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)
def execute(self, context):
hook = TwitterHook(
query = self.query,
conn_id = self.conn_id,
start_time = self.start_time,
end_time = self.end_time
)
self.create_parent_folder()
with open(self.file_path, "w") as output_file:
for pg in hook.run():
json.dump(pg, output_file, ensure_ascii=False)
output_file.write("\n")
if __name__ == "__main__":
#Vamos criar um DAG teste, instanciar o DAG, criar uma instância de tarefas fictícias.
with DAG(dag_id="TwitterTest",start_date=datetime.now()) as dag:
to = TwitterOperator(
query="AluraOnline",
file_path=join(
"/home/otikdeb/data/datalake",
"twitter_aluraonline",
"extract_date = {{ds}}",
"AluraOnline_{{ds_nodash}}.json"),
task_id="test_run"
)
ti = TaskInstance(task=to, execution_date=datetime.now() - timedelta(days=1))
ti.run()