1
resposta

The conn_id `twitter_default` isn't defined

Fala galera, estou com um problema na conexão do twitter_default, eu conectei ele no airflow, copiei e colei o nome pra garantir que seja o mesmo id, mas está com a mensagem The conn_id twitter_default isn't defined

hook:

from airflow.providers.http.hooks.http import HttpHook
import requests
from datetime import datetime, timedelta
import json


class TwitterHook(HttpHook):

    def __init__(self, end_time, start_time, query,  conn_id=None):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        self.conn_id = conn_id or "twitter_default"
        super().__init__(http_conn_id=self.conn_id)

    def create_url(self):

        TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"

        return url_raw

    def connect_to_endpoint(self, url, session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})

    def paginate(self, url_raw, session):
        
        lista_json_response = []
        #imprimir json
        response = self.connect_to_endpoint(url_raw, session)
        json_response = response.json()
        lista_json_response.append(json_response)
        contador = 1

        # paginate
        while "next_token" in json_response.get("meta",{}) and contador<10:
            next_token = json_response['meta']['next_token']
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            lista_json_response.append(json_response)
            contador += 1

        return lista_json_response
    
    def run(self):
        session = self.get_conn()
        url_raw = self.create_url()

        return self.paginate(url_raw, session)

if __name__ == "__main__":
    #montando url
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    query = "datascience"

    for pg in TwitterHook(end_time, start_time, query).run():
        print(json.dumps(pg, indent=4, sort_keys=True))

Operator:

import sys
sys.path.append("/home/daniel/Documents/airflow_dados_twiter")  
from hook.twitter_hook import TwitterHook
from airflow.models import BaseOperator, DAG, TaskInstance
import json
from datetime import datetime, timedelta

class TwitterOperator(BaseOperator):
    def __init__(self, query, start_time, end_time, **Kwargs):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        super().__init__(**Kwargs)

    def execute(self, context):
        end_time = self.end_time
        start_time = self.start_time
        query = self.query
        with open("Extract_twiter", "w") as output_file:
            for pg in TwitterHook(query, start_time, end_time).run():
                json.dump(pg, output_file, ensure_ascii= False)
                output_file.write("/n")

if __name__ == "__main__":
    #montando url
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    query = "datascience"

    with DAG(dag_id = "TwitterTest", start_date=datetime.now()):
        to = TwitterOperator(query=query, start_time=start_time, end_time=end_time, task_id="test_run")
        ti = TaskInstance(task=to)
        to.execute(ti.task_id)
1 resposta

Fala Daniel, boa noite! Espero que você esteja bem!

Para poder te ajudar, precisaria de uma imagem de sua arvore de pastas e aonde você está exportando o AIRFLOW_HOME pois isso vai influenciar para você, vou tentar passar um caminho "generico" da solução do seu erro e vamos trocando figurinhas para resolver seu problema beleza?

É muito importante você verificar aonde você está exportando a variável do seu airflow antes de executar seu código, por exemplo, essa é a minha arvore de pastas ![Insira aqui a descrição dessa imagem para ajudar na acessibilidade]. Insira aqui a descrição dessa imagem para ajudar na acessibilidadeRepara que eu tenho duas exportações do airflow, uma pasta raiz AIRFLOW e uma na subpasta AIRFLOW. Repare também, que é dentro da minha pasta raiz AIRFLOW que estão meu hook, dag e etc..., então quando eu for fazer a exportação do AIRFLOW_HOME, eu preciso necessariamente fazer dentro da minha pasta AIRFLOW. O código simplificado fica assim

export AIRFLOW_HOME=/home/gabrielgalani/Documents/Airflow export AIRFLOW_HOME=/home/seuUsuário/caminho até a pasta raiz, onde abaixo dela estarão as pastas hook, dag e etc.

No meu caso seria essa que grifei de roxo: Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Após isso, roda o airflow web e verifica se ainda está lá a conexão twitter-default, caso esteja ótimo, caso não basta criar.

Depois, no seu terminal basta rodar o script do seu hook python3.9 CAMINHO DO SEU ARQUIVO PY

Acredito que funcionará.

Espero tê-lo ajudado e caso não funcione, fala aqui para a gente resolver beleza? Abraço!