Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

Erro Prepare_Request

Gente, rodei o código e obtive esse erro:

INFO - Using connection ID 'twitter_default' for task execution.
Traceback (most recent call last):
  File "/home/felipe/Documents/airflowalura/hook/twitter_hook.py", line 63, in <module>
    for pg in TwitterHook(end_time,start_time,query).run():
  File "/home/felipe/Documents/airflowalura/hook/twitter_hook.py", line 55, in run
    return self.paginate(url_raw, session)
  File "/home/felipe/Documents/airflowalura/hook/twitter_hook.py", line 37, in paginate
    response = self.connect_to_endpoint(url_raw, session)
  File "/home/felipe/Documents/airflowalura/hook/twitter_hook.py", line 31, in connect_to_endpoint
    prep = session.prepare_request(request)
AttributeError: 'Connection' object has no attribute 'prepare_request'

Segue código que escrevi:

from airflow.providers.http.hooks.http import HttpHook
import requests
from datetime import datetime, timedelta
import json

class TwitterHook(HttpHook):

    def __init__(self, end_time, start_time, query,conn_id = None):
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id = self.conn_id)
        self.end_time = end_time
        self.start_time = start_time
        self.query = query

    def create_url(self):
        # Criando URL
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"

        return url_raw
    
    def connect_to_endpoint(self, url, session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})
    
    def paginate(self, url_raw, session):
        lista_json_response = []
        response = self.connect_to_endpoint(url_raw, session)
        json_response = response.json()
        lista_json_response.append(json_response)

        contador = 1
        while ("next_token" in json_response.get("meta",{})) and (contador < 100):
            next_token = json_response['meta']['next_token']
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            lista_json_response.append(json_response)
            contador += 1

        return lista_json_response
    
    def run(self):
        session = self.get_connection(self.conn_id)
        url_raw = self.create_url()
        return self.paginate(url_raw, session)

if __name__ == '__main__':
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    end_time = (datetime.now() + timedelta(hours=3, seconds=-30)).strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(days=-7, hours=3)).strftime(TIMESTAMP_FORMAT)
    query = "data science"

    for pg in TwitterHook(end_time,start_time,query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))

O que seria esse erro?

1 resposta
solução!

Fiz alguns ajustes com base nas sugestões do ChatGPT e funcionou.

Segue código ajustado:

from airflow.providers.http.hooks.http import HttpHook
import requests
from datetime import datetime, timedelta
import json

class TwitterHook(HttpHook):

    def __init__(self, end_time, start_time, query,conn_id = None):
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id = self.conn_id)
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        

    def create_url(self):
        # Criando URL
        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"https://labdados.com/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"
        url = url_raw

        return url
    
    def connect_to_endpoint(self, url, session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})
    
    def paginate(self, url, session):
        lista_json_response = []
        response = self.connect_to_endpoint(url, session)
        json_response = response.json()
        lista_json_response.append(json_response)

        contador = 1
        while ("next_token" in json_response.get("meta",{})) and (contador < 100):
            next_token = json_response['meta']['next_token']
            url = f"{url}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            lista_json_response.append(json_response)
            contador += 1

        return lista_json_response
    
    #def run(self):
        #session = self.get_connection(self.conn_id)
        #url_raw = self.create_url()
        #return self.paginate(url_raw, session)

    def run(self):
        with requests.Session() as session:
            url_raw = self.create_url()
            return self.paginate(url_raw, session)

if __name__ == '__main__':
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    end_time = (datetime.now() + timedelta(hours=3, seconds=-30)).strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(days=-7, hours=3)).strftime(TIMESTAMP_FORMAT)
    query = "data science"

    for pg in TwitterHook(end_time,start_time,query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))

Por outro lado, o "self.base_url" não está funcionando. Não sei por qual o motivo o Airflow não está pegando essa informação que coloquei no "host". Daí, então, coloquei a url no código mesmo só pra ele rodar rsrs