Olá pessoal,
Estou com o mesmo problema do colega Enrico.
Testei as 3 soluções propostas pela Nathalia Queiroz e nenhuma delas funcionou :(
Deixo aqui a versão atual.
import sys
sys.path.append("airflow_pipeline")
from airflow.models import DAG, TaskInstance
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from operators.twitter_operator import TwitterOperator
from os.path import join
with DAG(dag_id="TwitterDAG", start_date=days_ago(45), schedule_interval="@daily") as dag:
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
start_time = "{{ data_interval_start | ds_format('%Y-%m-%dT%H:%M:%S.00Z') }}"
end_time = "{{ data_interval_end | ds_format('%Y-%m-%dT%H:%M:%S.00Z') }}"
query = "data science"
to = TwitterOperator(file_path=join("datalake/twitter_datascience",
"extract_date={{ ds }}",
"datascience_{{ ds_nodash }}.json"),
query=query,
start_time=start_time,
end_time=end_time,
task_id="twitter_datascience")
ti = TaskInstance(task=to)
to.execute(ti.task_id)
Por causa desse erro, o airflow dá erro ao importar a DAG:
Broken DAG: [/Users/oallanmendes/code/alura/airflow/airflow_pipeline/dags/twitter_dag.py]
Traceback (most recent call last):
File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 198, in check_response
response.raise_for_status()
File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/requests/models.py", line 1024, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 502 Server Error: Bad Gateway for url: https://labdados.com/2/tweets/search/recent?query=data%20science&author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&author_id&user.fields=id,name,username,created_at&start_time=%7B%7B%20data_interval_start%20%7C%20ds_format('%25Y-%25m-%25dT%25H:%25M:%25S.00Z')%20%7D%7D&end_time=%7B%7B%20data_interval_end%20%7C%20ds_format('%25Y-%25m-%25dT%25H:%25M:%25S.00Z')%20%7D%7D
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 239, in run_and_check
self.check_response(response)
File "/Users/oallanmendes/code/alura/airflow/.venv/lib/python3.11/site-packages/airflow/providers/http/hooks/http.py", line 202, in check_response
raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 502:Bad Gateway
PS: As versões instaladas no meu projeto não são as mesmas usadas no projeto do vídeo. Estou com as versões mais atualizadas do python e do airflow.