Bom dia! Estou com um problema que não sei resolver, aparentemente estou com um problema no DAG do Airflow:
Esse erro está sendo levantado, já chequei a conexão do airflow! !
A solução aqui do fórum seria executar no mesmo terminal onde executei o comando export do Airflow porém isso não solucionou meu problema, talvez eu esteja fazendo algo errado na execução do airflow, não sei. Já voltei nas aulas anteriores e refiz os passos e o Bearer token funciona no código recent_search.
Pesquisei no Stackoverflow e tbm não encontrei a solução do problema.
Desde já obrigado.
from airflow.hooks.http_hook import HttpHook
import requests
import json
class TwitterHook(HttpHook):
def __init__(self, query, conn_id=None, start_time=None, end_time=None):
self.query = query
self.conn_id = conn_id or "twitter_default"
self.start_time = start_time
self.end_time = end_time
super().__init__(http_conn_id=self.conn_id)
def create_url(self):
query = self.query
tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,public_metrics,text"
user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
start_time = (
f"&start_time={self.start_time}"
if self.start_time
else ""
)
end_time = (
f"&end_time={self.end_time}"
if self.end_time
else ""
)
url = "{}/2/tweets/search/recent?query={}&{}&{}{}{}".format(
self.base_url, query, tweet_fields, user_fields, start_time, end_time
)
return url
def connect_to_endpoint(self, url, session):
response = requests.Request("GET", url)
prep = session.prepare_request(response)
self.log.info(f"URL: {url}")
return self.run_and_check(session, prep, {}).json()
def paginate(self, url, session, next_token=""):
if next_token:
full_url = f"{url}&next_token={next_token}"
else:
full_url = url
data = self.connect_to_endpoint(full_url, session)
yield data
if "next_token" in data.get("meta", {}):
yield from self.paginate(url, session, data['meta']['next_token'])
def run(self, **kwargs):
session = self.get_conn()
url = self.create_url()
yield from self.paginate(url, session)
if __name__ == "__main__":
for pg in TwitterHook("AluraOnline").run():
print(json.dumps(pg, ident=4, sort_keys=True))
(.env) lucas@ubuntu:~/Documents/datapipeline/airflow/plugins/hooks$ python3 twitter_hook.py
Traceback (most recent call last):
File "twitter_hook.py", line 65, in
for pg in TwitterHook("AluraOnline").run():
File "twitter_hook.py", line 57, in run
session = self.get_conn()
File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/http_hook.py", line 62, in get_conn
conn = self.get_connection(self.http_conn_id)
File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/base_hook.py", line 87, in get_connection
conn = random.choice(list(cls.get_connections(conn_id)))
File "/home/lucas/.env/lib/python3.8/site-packages/airflow/hooks/base_hook.py", line 83, in get_connections
return secrets.get_connections(conn_id)
File "/home/lucas/.env/lib/python3.8/site-packages/airflow/secrets/init.py", line 59, in get_connections
raise AirflowException("The conn_id {0}
isn't defined".format(conn_id))
airflow.exceptions.AirflowException: The conn_id twitter_default
isn't defined