3
respostas

The conn_id `twitter_default` isn't defined

Está dando seguinte erro quando rodo o 'twitter_hook.py':

Traceback (most recent call last):
  File "/home/kauesantana/Documents/Airflow/airflow_pipeline/hook/twitter_hook.py", line 71, in <module>
    for pg in TwitterHook(end_time, start_time, query).run():
  File "/home/kauesantana/Documents/Airflow/airflow_pipeline/hook/twitter_hook.py", line 56, in run
    session = self.get_conn()
  File "/home/kauesantana/Documents/Airflow/venv/lib/python3.9/site-packages/airflow/providers/http/hooks/http.py", line 68, in get_conn
    conn = self.get_connection(self.http_conn_id)
  File "/home/kauesantana/Documents/Airflow/venv/lib/python3.9/site-packages/airflow/hooks/base.py", line 67, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
  File "/home/kauesantana/Documents/Airflow/venv/lib/python3.9/site-packages/airflow/models/connection.py", line 430, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `twitter_default` isn't defined

Meu código 'twitter.hook':

from airflow.providers.http.hooks.http import HttpHook
from datetime import datetime, timedelta
import requests
import json

class TwitterHook(HttpHook):

    def __init__(self, start_time, end_time, query, conn_id = None,):
        self.conn_id = conn_id or "twitter_default"
        self.start_time = start_time
        self.end_time = end_time
        self.query = query

        super().__init__(http_conn_id=self.conn_id)

    def create_url(self):

        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

        start_time = self.start_time
        end_time = self.end_time
        query = self.query
 
        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"

        return url_raw
    
    def connect_to_endpoint(self, url, session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})
    
    def paginate(self, url_raw, session):
        # Imprimit Json
        lista_json_response = []
        
        response = self.connect_to_endpoint(url_raw, session)
        json_response = response.json()

        lista_json_response.append(json_response)

        while "next_token" in json_response.get("meta", {}):
            next_token = json_response["meta"]["next_token"]
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            lista_json_response.append(json_response)

        return lista_json_response
    
    def run(self):
        session = self.get_conn()
        url_raw = self.create_url()

        return self.paginate(url_raw, session)
    

if __name__ == "__main__":
    # Construindo URL
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)

    query = "datascience"

    for pg in TwitterHook(end_time, start_time, query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))

Meu código 'twitter_operators.py':

import sys
sys.path.append("airflow_pipeline")

from airflow.models import BaseOperator, DAG, TaskInstance
from hook.twitter_hook import TwitterHook
from datetime import datetime, timedelta
import json


class TwitterOperator(BaseOperator):
    def __init__(self, start_time, end_time, query, **kwargs):
        self.start_time = start_time
        self.end_time = end_time
        self.query = query
        super().__init__(**kwargs)
    
    def execute(self, context):

        start_time = self.start_time
        end_time = self.end_time
        query = self.query

        with open("extract_twitter.json", "w") as file:
            for pg in TwitterHook(end_time, start_time, query).run():
                json.dump(pg, file, ensure_ascii=False)
                file.write("\n")

if __name__ == "__main__":
    # Construindo URL
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)

    query = "datascience"

    with DAG(dag_id = "TwitterTEst", start_date=datetime.now()) as dag:
        to = TwitterOperator(query=query, start_time=start_time, end_time=end_time, task_id="teste_run")
        ti = TaskInstance(task=to)
        to.execute(ti.task_id)
3 respostas

Foto da arvore de pastas:

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Oii, Kauesantana! Tudo bem?

Esse erro pode ser devido a várias causas, mas podemos começar pela principal que é o local onde está executando o comando.

Sendo assim, recomendo que verifique alguns pontos:

  • A exportação da variável AIRFLOW_HOME antes de executar seu código. É essencial fazer isso dentro da pasta do Airflow. O código simplificado para exportação é:
export AIRFLOW_HOME=/home/seuUsuário/caminho/até/a/pasta/raiz
  • Vá à estrutura das pastas e certifique-se que abaixo dela estão as pastas hook, dag, logs, data_twitter e outras. E após definir o AIRFLOW_HOME , execute o servidor web do Airflow:
airflow webserver
  • É importante verificar se a conexão twitter_default continua configurada, caso não esteja será preciso criar a conexão novamente.
  • Para rodar o script do hook, uso o seguinte comando no terminal:
python3.9 CAMINHO_DO_SEU_ARQUIVO_PY

Após seguir esses passos, faça o teste e observe se o código funciona como esperado. Se o problema persistir, peço que nos envie um print do seu log mostrando o erro da execução para buscarmos outras alternativas.

Bons estudos, Kauesantana!

Continua o mesmo erro:

Insira aqui a descrição dessa imagem para ajudar na acessibilidade