Olá pessoal!
Após digitar o comando no terminal, o seguinte erro apareceu: "The conn_id twitter_default
isn't defined", apesar de ela estar aparecendo no airflow. O que pode ser?
Aqui esta meu código:
twitter_operator.py:
import sys
sys.path.append("airflow_pipeline")
from airflow.models import BaseOperator, DAG, TaskInstance
from hook.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
class TwitterOperator(BaseOperator):
def __init__(self, end_time, start_time, query, **kwargs):
self.end_time = end_time
self.start_time = start_time
self.query = query
super().__init__(**kwargs)
def execute(self, context):
end_time = self.end_time
start_time = self.start_time
query = self.query
with open("extract_twitter.json", "w") as file:
for pg in TwitterHook(end_time, start_time, query).run():
print(json.dump(pg, file, ensure_ascii=False))
file.write("\n")
if __name__ == "__main__":
time_zone = datetime.now().astimezone().tzname()
TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"
end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
query = "data science"
with DAG(dag_id = "TwitterTest", start_date=datetime.now()) as dag:
to = TwitterOperator(query=query, start_time=start_time, end_time=end_time, task_id="test_run")
ti = TaskInstance(task=to)
to.execute(ti.task_id)
twitter_hook.py
import requests
import json
from datetime import datetime, timedelta
from airflow.providers.http.hooks.http import HttpHook
class TwitterHook(HttpHook):
def __init__(self,end_time, start_time, query, conn_id=None):
self.conn_id = conn_id or "twitter_default"
self.end_time = end_time
self.start_time = start_time
self.query = query
super().__init__(http_conn_id=self.conn_id)
def create_url(self):
end_time = self.end_time
start_time = self.start_time
query = self.query
tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
user_fields = "expansions=author_id&user.fields=id,name,username,created_at"
url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"
return url_raw
def connect_to_endpoint(self, url, session):
response = requests.Request("GET", url)
prep = session.prepare_request(response)
self.log.info(f"URL: {url}")
return self.run_and_check(session, prep, {})
def paginate(self, url_raw, session):
json_response_list = []
response = self.connect_to_endpoint(url_raw, session)
json_response = response.json()
json_response_list.append(json_response)
while "next_token" in json_response.get("meta",{}):
next_token = json_response['meta']['next_token']
url = f"{url_raw}&next_token={next_token}"
response = self.connect_to_endpoint(url, session)
json_response = response.json()
json_response_list.append(json_response)
return json_response_list
def run(self):
session = self.get_conn()
url_raw = self.create_url()
return self.paginate(url_raw, session)
if __name__ == "__main__":
time_zone = datetime.now().astimezone().tzname()
TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"
end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
query = "data science"
for pg in TwitterHook(end_time, start_time, query).run():
print(json.dumps(pg,indent=4, sort_keys=True))