Fiz todos os passos conforme o video, porém ao testa minha conexão ela da erro 404.!
Fiz todos os passos conforme o video, porém ao testa minha conexão ela da erro 404.!
import datetime
import json
import requests
from airflow.providers.http.hooks.http import HttpHook
class TwitterHook(HttpHook):
def __init__(self,end_time,start_time ,query,conn_id=None):
self.end_time = end_time
self.start_time = start_time
self.query = query
self.conn_id = conn_id or "twitter_default"
super().__init__(http_conn_id = self.conn_id)
def create_url(self):
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
end_time = self.end_time
start_time = self.end_time
query = self.query
# campos de pesquisa
tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
# campos de busca da api
url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"
return url_raw
def connect_to_endpoint(self,url,session):
request = requests.Request("GET", url)
prep = session.prepare_request(request)
self.log.info(f"URL: {url}")
return self.run_and_check(session, prep, {})
def paginate(self, url_raw, session):
lista_json_response = []
#imprimir json
response = self.connect_to_endpoint(url_raw, session)
json_response = response.json()
lista_json_response.append(json_response)
# paginate
cont=1
while "next_token" in json_response.get("meta",{}) and cont < 100:
next_token = json_response['meta']['next_token']
url = f"{url_raw}&next_token={next_token}"
response = self.connect_to_endpoint(url, session)
json_response = response.json()
lista_json_response.append(json_response)
cont+=1
return lista_json_response
def run(self):
session = self.get_conn()
url_raw = self.create_url()
return self.paginate(url_raw,session)
if "__name__"=="__main__":
#montando url
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
query = "datascience"
for pg in TwitterHook(end_time, start_time, query).run():
print(json.dumps(pg, indent=4, sort_keys=True))
Olá Carlos, tudo bem ? Espero que sim.
Sinto muito pela demora da resposta.
Quanto a mensagem de 404 Not Found, ela não é um problema, porque a maneira como o Airflow está testando a conexão é incorreta, então não é um teste valido para nós. Vamos colocar uma atividade no curso deixando esse ponto mais claro, obrigado pelo aviso.
Agora, analisando o código encontrei alguns pontos que podem estar gerando o erro.
O primeiro é o if __name__=="__main__"
, a parte do __name__ deve ficar sem aspas mesmo, porque não é uma string e sim uma palavra reservada do Python que vai informar se o script está sendo executado ou se ele foi importado por outro código.
Então nesse código:
if "__name__"=="__main__":
Podemos remover as aspas duplas da palavra __name__
if __name__=="__main__":
Outro ponto é a utilização do now. Como foi importado todo o pacote datetime, a função now fica dentro do módulo datetime, então devemos repetir o nome datetime duas vezes.
Onde temos datetime.now() :
end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
devemos incluir mais um datetime
end_time = datetime.datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.datetime.now() + datetime.timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
Uma outra solução, que eu acredito ser mais indicada é importar apenas os módulos de interesse.
Então invés de importar todo o datetime:
import datetime
Importamos apenas datetime e timedelta:
from datetime import datetime, timedelta
Então no momento de criar as datas, retiramos o datetime do timedelta:
end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
O ultimo ponto que encontrei foi um detalhe, onde o end_time aparece atribuído em duas variáveis. Em uma das variáveis devemos usar o self.start_time.
Então na função create_url vamos mudar a atribuição do valor da variavel start_time
def create_url(self):
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
end_time = self.end_time
start_time = self.end_time
Para self.star_time e não self.end_time.
def create_url(self):
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
end_time = self.end_time
start_time = self.start_time
Acredito que com essa modificações nós teremos o resultado esperado.
Novamente, peço desculpas pela demora e agradeço por ter compartilhado suas duvidas aqui no fórum.
Espero ter ajudado, mas qualquer duvida não hesite em perguntar.