Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

The conn_id `twitter_default` isn't defined

Bom dia! Estou com um problema que não sei resolver, aparentemente estou com um problema no DAG do Airflow:

Esse erro está sendo levantado, já chequei a conexão do airflow! !

A solução aqui do fórum seria executar no mesmo terminal onde executei o comando export do Airflow porém isso não solucionou meu problema, talvez eu esteja fazendo algo errado na execução do airflow, não sei. Já voltei nas aulas anteriores e refiz os passos e o Bearer token funciona no código recent_search.

Pesquisei no Stackoverflow e tbm não encontrei a solução do problema.

Desde já obrigado.

from airflow.hooks.http_hook import HttpHook
import requests
import json


class TwitterHook(HttpHook):

    def __init__(self, query, conn_id=None, start_time=None, end_time=None):
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        self.start_time = start_time
        self.end_time = end_time
        super().__init__(http_conn_id=self.conn_id)

    def create_url(self):
        query = self.query
        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,public_metrics,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
        start_time = (
            f"&start_time={self.start_time}"
            if self.start_time
            else ""
        )
        end_time = (
            f"&end_time={self.end_time}"
            if self.end_time
            else ""
        )
        url = "{}/2/tweets/search/recent?query={}&{}&{}{}{}".format(
            self.base_url, query, tweet_fields, user_fields, start_time, end_time
        )
        return url

    def connect_to_endpoint(self, url, session):
        response = requests.Request("GET", url)
        prep = session.prepare_request(response)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {}).json()

    def paginate(self, url, session, next_token=""):
        if next_token:
            full_url = f"{url}&next_token={next_token}"
        else:
            full_url = url
        data = self.connect_to_endpoint(full_url, session)
        yield data
        if "next_token" in data.get("meta", {}):
            yield from self.paginate(url, session, data['meta']['next_token'])

    def run(self, **kwargs):
        session = self.get_conn()

        url = self.create_url()

        yield from self.paginate(url, session)


if __name__ == "__main__":
    for pg in TwitterHook("AluraOnline").run():
        print(json.dumps(pg, ident=4, sort_keys=True))

(.env) lucas@ubuntu:~/Documents/datapipeline/airflow/plugins/hooks$ python3 twitter_hook.py Traceback (most recent call last): File "twitter_hook.py", line 65, in for pg in TwitterHook("AluraOnline").run(): File "twitter_hook.py", line 57, in run session = self.get_conn() File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/http_hook.py", line 62, in get_conn conn = self.get_connection(self.http_conn_id) File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/base_hook.py", line 87, in get_connection conn = random.choice(list(cls.get_connections(conn_id))) File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/base_hook.py", line 83, in get_connections return secrets.get_connections(conn_id) File "/home/lucas/.env/lib/python3.8/site-packages/airflow/secrets/init.py", line 59, in get_connections raise AirflowException("The conn_id {0} isn't defined".format(conn_id)) airflow.exceptions.AirflowException: The conn_id twitter_default isn't defined

2 respostas

Refiz tudo, só faltou formatar o computador, o meu erro estava na inicialização do WebServer, com o comando: lsof -i tcp:8080, dei um kill -9 no processo e resolveu.

Agora depois de refazer tudo, me deparo com esse erro:

(.env) lucas@ubuntu:~/Documents/datapipeline/airflow/plugins/hooks$ python3 twitter_hook.py [2022-04-21 12:36:00,170] {base_hook.py:89} INFO - Using connection to: id: twitter_default. Host: https://api.twitter.com, Port: None, Schema: None, Login: None, Password: None, extra: XXXXXXXX [2022-04-21 12:36:00,171] {twitter_hook.py:37} INFO - URL: https://api.twitter.com/2/tweets/search/recent?query=AluraOnline&tweet.fields=author_id,conversation_id,created_at,id,public_metrics,text&expansions=author_id&user.fields=id,name,username,created_at /home/lucas/Documents/datapipeline/.env/lib/python3.8/site-packages/urllib3/connectionpool.py:981: InsecureRequestWarning: Unverified HTTPS request is being made to host 'api.twitter.com'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings warnings.warn( Traceback (most recent call last): File "twitter_hook.py", line 60, in print(json.dumps(pg, ident=4, sort_keys=True)) File "/usr/lib/python3.8/json/init.py", line 234, in dumps return cls( TypeError: init() got an unexpected keyword argument 'ident'

solução!

Eu solucionei o erro, o primeiro erro era o WebServer que não estava inicializando pq a porta 8080 estava ocupada. Resolvi com:

lsof -i tcp:8080

kill -9 numero_do_PID

O Warning resolvi com:

import urllib3

urllib3.disable_warnings()

E o erro do JSON era a falta de 1 'n' em indent. Estava escrito ident.