Estou tentando exibir as páginas do twitter, porém, está aparecendo o seguinte erro (print). O que pode ser ? refiz o código e também copiei conforme a aula, mas mesmo assim o erro acontece. Obrigado pela atenção!
Você está vendo a versão anterior da nova experiência da Alura que estamos preparando para você. Em breve, ela ganha uma identidade visual novinha totalmente pensada em potencializar seus estudos!
Estou tentando exibir as páginas do twitter, porém, está aparecendo o seguinte erro (print). O que pode ser ? refiz o código e também copiei conforme a aula, mas mesmo assim o erro acontece. Obrigado pela atenção!
Eu tambem estou tendo o mesmo problema, professor teria como nos sinalizar alguma solução?

from airflow.hooks.http_hook import HttpHook
import requests
import json
class TwitterHook(HttpHook):
def __init__(self, query, conn_id=None, start_time=None, end_time=None):
self.query = query
self.conn_id = conn_id or "twitter_default"
self.start_time = start_time
self.end_time = end_time
super().__init__(http_conn_id=self.conn_id)
def connect_to_endpoint(self, url, session):
response = requests.Request("GET", url)
prep = session.prepare_request(response)
self.log.info(f"URL: {url}")
return self.run_and_check(session, prep, {}).json()
def paginate(self, url, session, next_token=""):
if next_token:
full_url = f"{url}&next_token={next_token}"
else:
full_url = url
data = self.connect_to_endpoint(full_url, session)
yield data
if 'next_token' in data.get('meta', {}):
yield from self.paginate(url, session, data['meta']['next_token'])
def create_url(self):
query = self.query
tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,text"
user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
start_time = (
f"&start_time={self.start_time}"
if self.start_time
else ""
)
end_time = (
f"&end_time={self.end_time}"
if self.end_time
else ""
)
url = "{}/2/tweets/search/recent?query={}&{}&{}{}{}".format(
self.base_url, query, tweet_fields, user_fields, start_time, end_time
)
return url
def run(self):
session = self.get_conn()
url = self.create_url()
yield from self.paginate(url, session)
if __name__ == '__main__':
for pg in TwitterHook("AluraOnline").run():
print(json.dumps(pg, indent=4, sort_keys=True))
Oi Bruno tudo bem, cara eu consegui resolver meu problema instalando o airflow no docker, estou usando a versão 2.2.0 do airflow nele. é tranquilo pra instalar basta seguir esse passo a passo https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html quando você for rodar o twitter_hook pra testar você pode usar o container.