Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

[bug] strftime não esta formatando data_interval_start do jinja

Olá pessoal,

Estou com o mesmo problema do colega Enrico.

Testei as 3 soluções propostas pela Nathalia Queiroz e nenhuma delas funcionou :(

Deixo aqui a versão atual.

import sys
sys.path.append("airflow_pipeline")

from airflow.models import DAG, TaskInstance
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from operators.twitter_operator import TwitterOperator
from os.path import join

with DAG(dag_id="TwitterDAG", start_date=days_ago(45), schedule_interval="@daily") as dag:
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    start_time = "{{ data_interval_start | ds_format('%Y-%m-%dT%H:%M:%S.00Z') }}"
    end_time = "{{ data_interval_end | ds_format('%Y-%m-%dT%H:%M:%S.00Z') }}"

    query = "data science"

    to = TwitterOperator(file_path=join("datalake/twitter_datascience",
                                        "extract_date={{ ds }}",
                                        "datascience_{{ ds_nodash }}.json"),
                                        query=query,
                                        start_time=start_time,
                                        end_time=end_time,
                                        task_id="twitter_datascience")
    ti = TaskInstance(task=to)
    to.execute(ti.task_id)

Por causa desse erro, o airflow dá erro ao importar a DAG:

Broken DAG: [/Users/oallanmendes/code/alura/airflow/airflow_pipeline/dags/twitter_dag.py]
Traceback (most recent call last):
  File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 198, in check_response
    response.raise_for_status()
  File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/requests/models.py", line 1024, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 502 Server Error: Bad Gateway for url: https://labdados.com/2/tweets/search/recent?query=data%20science&author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&author_id&user.fields=id,name,username,created_at&start_time=%7B%7B%20data_interval_start%20%7C%20ds_format('%25Y-%25m-%25dT%25H:%25M:%25S.00Z')%20%7D%7D&end_time=%7B%7B%20data_interval_end%20%7C%20ds_format('%25Y-%25m-%25dT%25H:%25M:%25S.00Z')%20%7D%7D

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 239, in run_and_check
    self.check_response(response)
  File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 202, in check_response
    raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 502:Bad Gateway

PS: As versões instaladas no meu projeto não são as mesmas usadas no projeto do vídeo. Estou com as versões mais atualizadas do python e do airflow.

1 resposta
solução!

Revisando os vídeos do curso, não encontrei nenhum erro evidente no código da DAG. Considerando isso, acredito que o problema esteja relacionado à API mockada que substitui a API do Twitter (agora paga).

Minha hipótese é que a API do Twitter original era mais tolerante ao receber templates Jinja, como {{ data_interval_start }}, sem lançar um erro imediato. Por outro lado, a API mockada parece não interpretar corretamente esses templates, resultando no erro 502: Bad Gateway.

O resultado disso é o airflow não deixar importar a DAG. Aparentemente essa trava está correta, já que não faz sentido ir para produção um código que na "test run" já dá erro 502. Por esse motivo, vou seguir para os outros vídeos da formação e deixar esse pequeno inconveniente para trás.