0
respostas

twitter_operator apresenta erro ao fim da execução.

Li em no tópico [https://cursos.alura.com.br/forum/topico-problema-na-execucao-do-twitter_operator-py-224309](Problema na execução do twitter_operator.py) que na versão 2 do airflow a forma de criar o operador é diferente, porém não consegui criar um codigo que funcione, será que alguém ou o professor poderia mostrar como funcionaria na nova versão? ou ao menos uma documentação mais detalhada para eu poder tentar fazer funcionar

Obs: o código imprime no terminal as consultas, e retorna as informações como deveria fazer, porém este erro interfere na hora de seguir os próximos passos do curso. já procurei em vários lugares, porém não achei a solução.

Erro do terminal:

Traceback (most recent call last):
  File "/home/otikdeb/data/airflow/plugins/operators/twitter_operator.py", line 68, in <module>
    ti.run()
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1387, in run
    self._run_raw_task(
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1176, in _run_raw_task
    self._run_mini_scheduler_on_child_tasks(session)
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1186, in _run_mini_scheduler_on_child_tasks
    dag_run = with_row_locks(
  File "/home/otikdeb/data/.venv/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 3500, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

Meu código:

import sys
from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults 
#uma DAG você pode ter parâmetros padrão que você vai
# mandar para todos operadores e esse decorator nos ajuda a aplicar eles
sys.path.append("/home/otikdeb/data/airflow/plugins")
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
from pathlib import Path
from os.path import join

template_fields = [
    "query",
    "file_path",
    "start_time",
    "end_time"
]

class TwitterOperator(BaseOperator):
    @apply_defaults   
    def __init__(
        self,
        query,
        file_path,
        conn_id = None,
        start_time = None,
        end_time = None,
        *args, **kargs
        ):
        super().__init__(*args, **kargs)
        self.query = query
        self.file_path = file_path
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time

    #Vai olhar todo o caminho, verificar as pastas faltantes e criá-la
    def create_parent_folder(self):
        Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)

    def execute(self, context):
        hook = TwitterHook(
            query = self.query,
            conn_id = self.conn_id,
            start_time = self.start_time,
            end_time = self.end_time
        )
        self.create_parent_folder()
        with open(self.file_path, "w") as output_file:
            for pg in hook.run():
                json.dump(pg, output_file, ensure_ascii=False)
                output_file.write("\n")

if __name__ == "__main__":
    #Vamos criar um DAG teste, instanciar o DAG, criar uma instância de tarefas fictícias.
    with DAG(dag_id="TwitterTest",start_date=datetime.now()) as dag:
        to = TwitterOperator(
            query="AluraOnline",
            file_path=join(
                "/home/otikdeb/data/datalake",
                "twitter_aluraonline",
                "extract_date = {{ds}}",
                "AluraOnline_{{ds_nodash}}.json"),
            task_id="test_run"
        )
        ti = TaskInstance(task=to, execution_date=datetime.now() - timedelta(days=1))
        ti.run()

Quer mergulhar em tecnologia e aprendizagem?

Receba a newsletter que o nosso CEO escreve pessoalmente, com insights do mercado de trabalho, ciência e desenvolvimento de software