10
respostas

Airflow: criar conexão com Twitter

Referente ao curso: https://cursos.alura.com.br/course/engenharia-dados-apache-airflow

Olá, colegas.

404: Not Found

Criei uma conexão com o Twitter tipo HTTP apontando para a url da API conf. passo a passo do curso (video Capítulo 2 , video 08, Eng. de Dados Apache Airflow), mas ao informar o dicionário json {“Authorization”: “Bearer} e salvar a conexão, dá o seguinte erro: 404:Not Found.

Acredito que não é o formato do dicionário json do campo "Extra", pois quando coloquei com aspas simples, deu a mensagem: The Extra connection field contained an invalid value for Conn ID: twitter_default. If connection parameters need to be added to Extra, please make sure they are in the form of a single, valid JSON object.

Então, voltei para aspas duplas. Testei o bearer token num caderno do Jupyter e funcionou a extração do Twitter, mas desejo criar uma conexão no Airflow para utilizá-la com gancho e poder gerenciar o fluxo do trabalho.

10 respostas

Olá Luciana, tudo bem ? Espero que sim.

Quanto a mensagem de 404 Not Found, ela não é um problema, porque a maneira como o Airflow está testando a conexão é incorreta, então não é um teste valido para nós. Então mesmo com essa mensagem podemos seguir com o código do curso normalmente.

Então basta salvar a conexão e depois pode seguir para o teste do Hook.

Nesse teste você está recebendo algum erro ?

Se sim, poderia compartilhar um print aqui, obrigado.

Fico no aguardo, para resolvermos isso juntos.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Bom dia, Igor.

Mais uma vez, obrigada por interagir!

Sim, segue print de como configurei minha conexão do tipo http sem url e senha, apenas passando o dicionário json com o bearer token no campo Extra, bem como a mensagem de retorno com 404: Not Found.

Conexão HTTPInsira aqui a descrição dessa imagem para ajudar na acessibilidade

Já tentei passa a URL completa no campo HOST da conexão: https://api.twitter.com/2/tweets/search/recent?query=AluraOnline&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2023-01-13T00:00:00.00Z&end_time=2023-01-14T00:00:00.00Z

Já tentei passar a URL com as chaves {} dos parâmetros passados em tempo de conexão: https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}

O que pode ser? Você consegue enxergar alguma configuração errada nesta tela de edição da conexão?

O Dicionário Json a ser passado no campo EXTRA é este mesmo?

{"Authorization": "Bearer Espaço SeuToken FechaAspasDuplas"}

Aqui não seria "Bearer Token", ou é só "Bearer mesmo seguido do número do Token"?

Por favor, veja os meus prints.

Mais uma vez, obrigada colega!

Olá Luciana.

Essa mensagem de 404 Not Found não é um problema.

Para realmente testar sua conexão você deve executar o Hook ou até a DAG dependendo da etapa que está do projeto do curso.

Poderia me dizer em qual aula você está e se está recebendo erros nessas etapas ?

Fico no aguardo, para resolvermos isso juntos.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Olá, Igor.

Então foi isso, eu estava tentando testar a conexão isoladamente na aba Connections do Airflow. Neste momento, estou criando a DAG para testar a chamada do Hook. Tomara que dê certo. Eu volto aqui para dar um feedback.

Grata pelo retorno. Luciana

Consegui executar, professor. Mas não sei onde gerou o arquivo json de saída. Configurei para salvar assim:

imort logging
from datetime import datetime
from os.path import join
from airflow.models import DAG
from libs.twitter.operators.twitter_operator import TwitterOperator
#MinhaPesquisa é o nome do plugin no arquivo airflow_plugin.py

with DAG(dag_id="dag_twitter", start_date=datetime.now()) as dag:
    twitter_operator = TwitterOperator(
        task_id="twitter_MinhaPesquisa",
        query="MinhaPesquisa",
        file_path=join(
            "/libs/twitter",
            "twitter_MinhaPesquisa", #nome da tabela
            "extract_date={{ ds }} ",                  #particoes            
            "MinhaPesquisa_{{ ds_nodash }}.json"
        )
    )

logging.getLogger().setLevel(logging.INFO)
logging.info("Building pipeline ...")

A tela de execução da DAG está assim... Já foram 9 execuções com sucesso... Mas acho que meu json foi para o limbo

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Também não entendi a finalidade neste momento do arquivo airflow_plugin.py, pois a dag não usa ele.

Como eu sei se o gancho gerou ou não arquivo .json?

Será que este parâmetro está correto?

with DAG(dag_id="dag_twitter", start_date=datetime.now()) as dag:
twitter_operator = TwitterOperator(
task_id="twitter_MinhaPesquisaBR",
query="Alura",
file_path=join(
"/libs/twitter",
"twitter_MinhaPesquisaBR", #nome da tabela
"extract_date={{ ds }} ", #particoes
"MinhaPesquisa_{{ ds_nodash }}.json"
)
)

Era para o arquivo json ter sido gerado na pasta libs/twitter Não é mesmo?

Olá Luciana, desculpa a demora no retorno.

Só agora que notei que está fazendo a versão antiga do curso de Airflow que temos na plataforma, peço desculpas por isso.

Para que você tenha menos erros e aprenda da melhor forma sobre Airflow, sugiro que faça os novos cursos de Airflow que lançamos recentemente aqui na plataforma. Esse curso que está fazendo atualmente utiliza uma versão mais antiga do Airflow e por isso pode acontecer esses problemas.

Dessa forma, ao invés de prosseguir com ele, você pode fazer o curso:

Que vai passar os primeiros conceitos do Airflow e depois pode seguir a formação onde vai encontrar conceitos mais profundos sobre como utilizar o Airflow

Mas caso queria continuar com esse curso, eu vou te ajudar a resolver essas questões e vamos juntos terminar esse projeto, tudo bem ?

Vou reproduzir esse código na minha maquina e te retorno sobre o resultado.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Oi, Igor. Obrigada pelo retorno e disponibilidade em ajudar. Até fiz o curso "Apache Airflow: orquestrando seu primeiro pipeline de dados" Mas ainda estou tentando descobrir por que via curl consigo conectar no site passando o bearer token, mas via conexão do Airflow não funciona:

curl "https://api.twitter.com" \
  -H "Authorization: Bearer AquiTemUmaSequenciaDeCaracteresQueÉSegredo"

Pensei em passar a variável conn_id para o meu hook direto na Dag, como faço isso? Grata.