Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

[Bug] Erro ao testa a conexão com a API do twitter

Fiz todos os passos conforme o video, porém ao testa minha conexão ela da erro 404.! imagem error 404 ao conectar com  a api do airflow

2 respostas
import datetime
import json
import requests
from airflow.providers.http.hooks.http import HttpHook

class TwitterHook(HttpHook):

    def __init__(self,end_time,start_time ,query,conn_id=None):

        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id = self.conn_id)

    def create_url(self):
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.end_time
        query = self.query
        # campos de pesquisa
        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"


        # campos de busca da api
        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"

        return url_raw

    def connect_to_endpoint(self,url,session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})

    def paginate(self, url_raw, session):
        lista_json_response = []

        #imprimir json
        response = self.connect_to_endpoint(url_raw, session)
        json_response = response.json()
        lista_json_response.append(json_response)

        # paginate
        cont=1
        while "next_token" in json_response.get("meta",{}) and cont < 100:
            next_token = json_response['meta']['next_token']
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            lista_json_response.append(json_response)
            cont+=1

        return lista_json_response

    def run(self):
        session = self.get_conn()
        url_raw = self.create_url()

        return self.paginate(url_raw,session)


if "__name__"=="__main__":
    #montando url
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)   
    query = "datascience"

    for pg in TwitterHook(end_time, start_time, query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))
solução!

Olá Carlos, tudo bem ? Espero que sim.

Sinto muito pela demora da resposta.

Quanto a mensagem de 404 Not Found, ela não é um problema, porque a maneira como o Airflow está testando a conexão é incorreta, então não é um teste valido para nós. Vamos colocar uma atividade no curso deixando esse ponto mais claro, obrigado pelo aviso.

Agora, analisando o código encontrei alguns pontos que podem estar gerando o erro.

O primeiro é o if __name__=="__main__", a parte do __name__ deve ficar sem aspas mesmo, porque não é uma string e sim uma palavra reservada do Python que vai informar se o script está sendo executado ou se ele foi importado por outro código.

Então nesse código:

if "__name__"=="__main__":

Podemos remover as aspas duplas da palavra __name__

if __name__=="__main__":

Outro ponto é a utilização do now. Como foi importado todo o pacote datetime, a função now fica dentro do módulo datetime, então devemos repetir o nome datetime duas vezes.

Onde temos datetime.now() :

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)  

devemos incluir mais um datetime

    end_time = datetime.datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)   

Uma outra solução, que eu acredito ser mais indicada é importar apenas os módulos de interesse.

Então invés de importar todo o datetime:

import datetime

Importamos apenas datetime e timedelta:

from datetime import datetime, timedelta

Então no momento de criar as datas, retiramos o datetime do timedelta:

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)  

O ultimo ponto que encontrei foi um detalhe, onde o end_time aparece atribuído em duas variáveis. Em uma das variáveis devemos usar o self.start_time.

Então na função create_url vamos mudar a atribuição do valor da variavel start_time

    def create_url(self):
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.end_time

Para self.star_time e não self.end_time.

    def create_url(self):
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.start_time

Acredito que com essa modificações nós teremos o resultado esperado.

Novamente, peço desculpas pela demora e agradeço por ter compartilhado suas duvidas aqui no fórum.

Espero ter ajudado, mas qualquer duvida não hesite em perguntar.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!