1
resposta

Erro "The conn_id `{conn_id}` isn't defined"

import json

import requests
from datetime import datetime, timedelta
from airflow.providers.http.hooks.http import  HttpHook



class TwitterHook(HttpHook):
    def _init_(self, end_time, start_time,query,conn_id=None):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id=self.conn_id)


    def create_url(self):
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"
        return url_raw

    def connect_to_endpoint(self,url, session):
        request = requests.Request("GET",url)
        prep = session.prepare_request(request)
        self.log.info(f"URL:{url}")
        return self.run_and_check(session,prep,{})

    def paginate(self, url_raw, session):
        list_json_response = []
        response = self.connect_to_endpoint(url_raw,session)
        json_response = response.json()
        list_json_response.append( json_response)
        contador=1
        while "next_token" in json_response.get("meta", {}) and contador <100:
            next_token = json_response['meta']['next_token']
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url,session)
            json_response = response.json()
            list_json_response.append( json_response)
            contador+=1

        return list_json_response

    def run(self):
        session = self.get_conn()
        url_raw =self.create_url()
        return self.paginate(url_raw,session)

if __name__ == "__main__":
    # os pra pegar variavel de ambiente
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(days=-7)).date().strftime(TIMESTAMP_FORMAT)
    query = "data science"

    for pg in TwitterHook(end_time,start_time,query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))

--- O export eu fiz assim

export AIRFLOW_HOME=$(pwd)/airflow_pipeline

Insira aqui a descrição dessa imagem para ajudar na acessibilidadeERRO

python airflow_pipeline/hook/twitter_hook.py
Traceback (most recent call last):
  File "/home/lia/Documents/dataextract/airflow_pipeline/hook/twitter_hook.py", line 64, in <module>
    for pg in TwitterHook(end_time,start_time,query).run():
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lia/Documents/dataextract/airflow_pipeline/hook/twitter_hook.py", line 53, in run
    session = self.get_conn()
              ^^^^^^^^^^^^^^^
  File "/home/lia/Documents/dataextract/venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 98, in get_conn
    conn = self.get_connection(self.http_conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lia/Documents/dataextract/venv/lib/python3.11/site-packages/airflow/hooks/base.py", line 72, in get_connection
    conn = Connection.get_connection_from_secrets(conn_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lia/Documents/dataextract/venv/lib/python3.11/site-packages/airflow/models/connection.py", line 434, in get_connection_from_secrets
    raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
airflow.exceptions.AirflowNotFoundException: The conn_id `2023-06-21T00:00:00.00Z` isn't defined
1 resposta

Olá Elis, tudo bem ? Espero que sim.

Isso está acontecendo por um detalhe, no método construtor __init__ está faltando um underline no inicio e no final.

Então só precisa mudar essa parte:

class TwitterHook(HttpHook):
    def _init_(self, end_time, start_time,query,conn_id=None):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id=self.conn_id)

Para isso:

class TwitterHook(HttpHook):
    def __init__(self, end_time, start_time,query,conn_id=None):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id=self.conn_id)

Acredito que isso vai resolver o erro, mas qualquer duvida não hesite em perguntar.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!