1
resposta

[Dúvida] The conn_id `twitter_default` isn't defined

Olá pessoal!

Após digitar o comando no terminal, o seguinte erro apareceu: "The conn_id twitter_default isn't defined", apesar de ela estar aparecendo no airflow. O que pode ser?

Aqui esta meu código:

twitter_operator.py:

import sys
sys.path.append("airflow_pipeline")

from airflow.models import BaseOperator, DAG, TaskInstance
from hook.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta

class TwitterOperator(BaseOperator):

    def __init__(self, end_time, start_time, query, **kwargs):
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        super().__init__(**kwargs)

    def execute(self, context):
        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        with open("extract_twitter.json", "w") as file:
            for pg in TwitterHook(end_time, start_time, query).run():
                print(json.dump(pg, file, ensure_ascii=False))
                file.write("\n")

if __name__ == "__main__":

    time_zone = datetime.now().astimezone().tzname()
    TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    query = "data science"

    with DAG(dag_id = "TwitterTest", start_date=datetime.now()) as dag:
        to = TwitterOperator(query=query, start_time=start_time, end_time=end_time, task_id="test_run")
        ti = TaskInstance(task=to)
        to.execute(ti.task_id) 

twitter_hook.py

import requests
import json
from datetime import datetime, timedelta
from airflow.providers.http.hooks.http import HttpHook

class TwitterHook(HttpHook):

    def __init__(self,end_time, start_time, query, conn_id=None):
        self.conn_id = conn_id or "twitter_default"
        self.end_time = end_time
        self.start_time = start_time
        self.query = query
        super().__init__(http_conn_id=self.conn_id)

    def create_url(self):

        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        tweet_fields = "tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text"
        user_fields = "expansions=author_id&user.fields=id,name,username,created_at"

        url_raw = f"{self.base_url}/2/tweets/search/recent?query={query}&{tweet_fields}&{user_fields}&start_time={start_time}&end_time={end_time}"

        return url_raw
    
    def connect_to_endpoint(self, url, session):
        response = requests.Request("GET", url)
        prep = session.prepare_request(response)
        self.log.info(f"URL: {url}")
        return self.run_and_check(session, prep, {})
    
    def paginate(self, url_raw, session):
        json_response_list = []
        response = self.connect_to_endpoint(url_raw, session)
        json_response = response.json()
        json_response_list.append(json_response)

        while "next_token" in json_response.get("meta",{}):
            next_token = json_response['meta']['next_token']
            url = f"{url_raw}&next_token={next_token}"
            response = self.connect_to_endpoint(url, session)
            json_response = response.json()
            json_response_list.append(json_response)

        return json_response_list
    

    def run(self):
        session = self.get_conn()
        url_raw = self.create_url()

        return self.paginate(url_raw, session)

if __name__ == "__main__":

    time_zone = datetime.now().astimezone().tzname()
    TIMESTAMP_FORMAT = f"%Y-%m-%dT%H:%M:%S.00{time_zone}:00"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(-1)).date().strftime(TIMESTAMP_FORMAT)
    query = "data science"

    for pg in TwitterHook(end_time, start_time, query).run():
        print(json.dumps(pg,indent=4, sort_keys=True))

1 resposta

Oii, Allan, tudo bem?

No seu print aparenta que você está no diretório datapipeline_tweets ao criar a variável, contudo, chamando essa variável pro caminho airflow_pipeline (que foi a outra pasta que você criou, de acordo com o print do VSCode), o bug pode ser que talvez você precisa estar nela - há uma possibilidade do erro ter sido por isso.

Você pode dar um cd e entrar no airflow_pipeline e tentar rodar com essa nova variável de ambiente ali, por favor? Só pra testarmos essa opção. Senão buscamos outra coisa, tá bem?

Abraços!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓.

Quer mergulhar em tecnologia e aprendizagem?

Receba a newsletter que o nosso CEO escreve pessoalmente, com insights do mercado de trabalho, ciência e desenvolvimento de software