Estou tentando rodar o código da do curso Apache Airflow - extração de dados mas estou tendo algum problema de autorização:
Traceback (most recent call last):
File "/home/carlos/Documentos/Curso1/airflow_pipeline/hook/twitter_hook.py", line 68, in <module>
for pg in TwitterHook(query="data science", start_time=start_time, end_time=end_time).run():
File "/home/carlos/Documentos/Curso1/airflow_pipeline/hook/twitter_hook.py", line 59, in run
return self.paginate(url_raw,session)
File "/home/carlos/Documentos/Curso1/airflow_pipeline/hook/twitter_hook.py", line 40, in paginate
response=self.connect_to_endpoint(url_raw,session)
File "/home/carlos/Documentos/Curso1/airflow_pipeline/hook/twitter_hook.py", line 34, in connect_to_endpoint
prep=session.prepare_request(request)
File "/home/carlos/Documentos/Curso1/venv/lib/python3.9/site-packages/requests/sessions.py", line 438, in prepare_request
auth = request.auth
AttributeError: 'Response' object has no attribute 'auth'
meu código é o seguinte
from airflow.providers.http.hooks.http import HttpHook
from datetime import datetime, timedelta
import requests
import json
import os
class TwitterHook(HttpHook):
def __init__(self, end_time,start_time,query,conn_id=None):
self.end_time=end_time
self.start_time=start_time
self.query=query
self.conn_id = conn_id or "twitter_default"
super().__init__(http_conn_id=self.conn_id)
def create_url(self):
time_zone = datetime.now().astimezone().tzname()
TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"
end_time=self.end_time
start_time=self.start_time
query=self.query
tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"
return url_raw
def connect_to_endpoint(self,url,session):
request=requests.get(url)
prep=session.prepare_request(request)
self.log.info(f"URL:{url}")
return self.run_and_check(session,prep,{})
def paginate(self,url_raw,session):
lista_json_response=[]
response=self.connect_to_endpoint(url_raw,session)
json_response=response.json()
lista_json_response.append(json_response)
contador=1
while "next_token" in json_response.get("meta",{}) and contador<100:
next_token=json_response['meta']['next_token']
url=f"{url_raw}&next_token={next_token}"
response=self.connect_to_endpoint(url,session)
json_response=response.json()
lista_json_response.append(json_response)
contador+=1
return lista_json_response
def run(self):
session = self.get_conn()
url_raw = self.create_url()
return self.paginate(url_raw,session)
if __name__ == "__main__":
time_zone = datetime.now().astimezone().tzname()
TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"
end_time=datetime.now().strftime(TIMESTAMP_FORMAT)
start_time=(datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
query = "data science"
for pg in TwitterHook(query, start_time, end_time).run():
print(json.dumps(pg, indent=4, sort_keys=True))
minha configuração no airflow está desta forma: Tentei varias coisas mas ainda não consegui solucionar o problema