Solucionado (ver solução)
Solucionado
(ver solução)
3
respostas

DAG final executa, mas fica em loop

Boa tarde. Finalizei o desenvolvimento porém as tasks dos DAGs não concluem. Chega a criar os diretórios do datalake, gerar o arquivo.json, mas o arquivo fica zerado e o Operator não conclui.

Link para repositório git: https://github.com/barguia/alura-airflow.git

Obs.: Executando o Operator manualmente ele gera o arquivo normalmente:

Code twitter_dag.py

import sys
sys.path.append("airflow_pipeline")

from airflow.models import DAG
from datetime import datetime, timedelta
from operators.twitter_operator import TwitterOperator
from os.path import join   
from airflow.utils.dates import days_ago
        
with DAG(dag_id = "TwitterDAG", start_date=days_ago(2), schedule_interval="@daily") as dag:

    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    query = "datascience"

    to = TwitterOperator(file_path=join("datalake/twitter_datascience",
        "extract_data={{ ds }}",
        "datascience_{{ ds_nodash }}.json"),
        query=query, 
    start_time="{{ data_interval_start.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}", 
    end_time="{{ data_interval_end.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}", 
    task_id="twitter_datascience")

Log Task Instance: twitter_datascience

[2024-02-11T19:53:59.509-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-09T00:00:00.00Z&end_time=2024-02-10T00:00:00.00Z
[2024-02-11T19:57:04.197-0300] {local_task_job_runner.py:302} WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2024-02-11T19:57:04.230-0300] {process_utils.py:131} INFO - Sending 15 to group 20771. PIDs of all processes in the group: [20771]
[2024-02-11T19:57:04.231-0300] {process_utils.py:86} INFO - Sending the signal 15 to group 20771
[2024-02-11T19:58:04.241-0300] {process_utils.py:149} WARNING - process psutil.Process(pid=20771, name='Python', status='running', started='19:53:59') did not respond to SIGTERM. Trying SIGKILL
[2024-02-11T19:58:04.257-0300] {process_utils.py:86} INFO - Sending the signal 9 to group 20771
[2024-02-11T19:58:04.265-0300] {process_utils.py:79} INFO - Process psutil.Process(pid=20771, name='Python', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='19:53:59') (20771) terminated with exit code -9
[2024-02-11T19:58:04.267-0300] {standard_task_runner.py:175} ERROR - Job 7 was killed before it finished (likely due to running out of memory)
[2024-02-11T19:58:23.578-0300] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [queued]>
[2024-02-11T19:58:23.606-0300] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [queued]>
[2024-02-11T19:58:23.606-0300] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-11T19:58:23.641-0300] {taskinstance.py:2191} INFO - Executing <Task(TwitterOperator): twitter_datascience> on 2024-02-09 00:00:00+00:00
[2024-02-11T19:58:23.644-0300] {standard_task_runner.py:60} INFO - Started process 20934 to run task
[2024-02-11T19:58:23.659-0300] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'TwitterDAG', 'twitter_datascience', 'scheduled__2024-02-09T00:00:00+00:00', '--job-id', '8', '--raw', '--subdir', 'DAGS_FOLDER/twitter_dag.py', '--cfg-path', '/var/folders/71/k9sksnzs6rj0j7mjc8fjnb100000gn/T/tmpfjr0etnc']
[2024-02-11T19:58:23.664-0300] {standard_task_runner.py:88} INFO - Job 8: Subtask twitter_datascience
[2024-02-11T19:58:23.815-0300] {task_command.py:423} INFO - Running <TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [running]> on host eduardos-air.home
[2024-02-11T19:58:24.043-0300] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='TwitterDAG' AIRFLOW_CTX_TASK_ID='twitter_datascience' AIRFLOW_CTX_EXECUTION_DATE='2024-02-09T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-09T00:00:00+00:00'
[2024-02-11T19:58:24.045-0300] {twitter_operator.py:31} INFO - {'start': '2024-02-09T00:00:00.00Z', 'end': '2024-02-10T00:00:00.00Z'}
[2024-02-11T19:58:24.087-0300] {base.py:83} INFO - Using connection ID 'twitter_default' for task execution.
[2024-02-11T19:58:24.090-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-09T00:00:00.00Z&end_time=2024-02-10T00:00:00.00Z
3 respostas

Oi Eduardo, tudo bem?

Nas mensagens geradas há o erro ERROR - Job 7 was killed before it finished (likely due to running out of memory), o que significa que o job foi morto antes de terminar (provavelmente devido à falta de memória).

Esse erro de Job killed geralmente ocorre quando um processo consome mais memória do que está disponível no sistema, e isso pode acontecer por várias razões, uma delas, pode ser a limitação do programa vmplayer, máquina virtual utilizada neste curso.

Uma alternativa, seria instalar o WSL, Windows Subsystem for Linux, ele permite a execução da plataforma Linux dentro do Windows via terminal, de forma rápida. No entanto, você terá que configurar essa ferramenta para o uso do Airflow. Embora seja um processo mais trabalhoso, você terá mais mobilidade para executar a automação da dag.

A sua solução é válida, apenas ressalto que não é a ideal porque o benefício do airflow é justamente automatizar. De toda forma, no dia a dia de um engenheiro de dados, utilizaremos máquinas mais potentes do que máquinas virtuais, o que não ocasionará erros de memória.

Espero ter esclarecido à dúvida.

Qualquer outra questão, é só compartilhar no fórum.

Abraços e bons estudos!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Bom dia Monalisa. Tudo bem, muito obrigado pela resposta.

Estou rodando o projeto no MacOs Monterey e já tentei rodar em um Ubuntu Server e tenho o mesmo problema. Debugando encontrei o trecho onde o script está parando. Ele para na classe TwitterHook no método connect_to_endpoint.

Obs.: Quando executo os comandos pelo terminal tudo funciona normalmente. O problema está quando o Airflow está gerenciando as tasks

# Executando o TwitterHook
python airflow_pipeline/hook/twitter_hook.py 
[2024-02-15T05:28:44.666-0300] {base.py:83} INFO - Using connection ID 'twitter_default' for task execution.
[2024-02-15T05:28:44.676-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-08T05:28:43.00-03:00&end_time=2024-02-15T05:28:43.00-03:00
[2024-02-15T05:28:45.967-0300] {twitter_hook.py:35} INFO - after response
[2024-02-15T05:28:45.968-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-08T05:28:43.00-03:00&end_time=2024-02-15T05:28:43.00-03:00&next_token=1234567890abcdef
{"data": [....

# Excutando o TwitterOperator
python airflow_pipeline/operators/twitter_operator.py 
[2024-02-15T05:31:33.967-0300] {twitter_operator.py:31} INFO - {'start': '2024-02-05T05:31:33.00-03:00', 'end': '2024-02-15T05:31:33.00-03:00'}
[2024-02-15T05:31:33.993-0300] {base.py:83} INFO - Using connection ID 'twitter_default' for task execution.
[2024-02-15T05:31:33.996-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-05T05:31:33.00-03:00&end_time=2024-02-15T05:31:33.00-03:00
[2024-02-15T05:31:34.645-0300] {twitter_hook.py:35} INFO - after response
[2024-02-15T05:31:34.649-0300] {twitter_operator.py:37} INFO - {'data': ...

Método connect_to_endpoint do TwitterHook, onde o Script acaba parando via Airflow.

def connect_to_endpoint(self, url, session):
        request = requests.Request("GET", url)
        prep = session.prepare_request(request)
        self.log.info(f"URL: {url}")
        response = self.run_and_check(session, prep, {'timeout': 2.75, 'check_response': False}) # O script para aqui
       self.log.info("Esse log não é executado :(")
        return response

Log da TwitterDAG manual e scheduled via Airflow

[2024-02-15T05:23:40.758-0300] {taskinstance.py:2191} INFO - Executing <Task(TwitterOperator): twitter_datascience> on 2024-02-15 08:22:11.083266+00:00
[2024-02-15T05:23:40.760-0300] {standard_task_runner.py:60} INFO - Started process 3184 to run task
[2024-02-15T05:23:40.769-0300] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'TwitterDAG', 'twitter_datascience', 'manual__2024-02-15T08:22:11.083266+00:00', '--job-id', '58', '--raw', '--subdir', 'DAGS_FOLDER/twitter_dag.py', '--cfg-path', '/var/folders/71/k9sksnzs6rj0j7mjc8fjnb100000gn/T/tmpkyb5xnb5']
[2024-02-15T05:23:40.774-0300] {standard_task_runner.py:88} INFO - Job 58: Subtask twitter_datascience
[2024-02-15T05:23:40.827-0300] {task_command.py:423} INFO - Running <TaskInstance: TwitterDAG.twitter_datascience manual__2024-02-15T08:22:11.083266+00:00 [running]> on host Eduardos-MacBook-Air.local
[2024-02-15T05:23:40.912-0300] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='TwitterDAG' AIRFLOW_CTX_TASK_ID='twitter_datascience' AIRFLOW_CTX_EXECUTION_DATE='2024-02-15T08:22:11.083266+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-15T08:22:11.083266+00:00'
[2024-02-15T05:23:40.914-0300] {twitter_operator.py:31} INFO - {'start': '2024-02-14T00:00:00.00Z', 'end': '2024-02-15T00:00:00.00Z'}
[2024-02-15T05:23:40.924-0300] {base.py:83} INFO - Using connection ID 'twitter_default' for task execution.
[2024-02-15T05:23:40.926-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-14T00:00:00.00Z&end_time=2024-02-15T00:00:00.00Z

Cheguei a pensar que poderia ser o timeout da chamada self.run_and_check, mas ele só é aplicado quando executado pela linha de comando.

response = self.run_and_check(session, prep, {'timeout': 2.75, 'check_response': False})
solução!

Bom dia. Continuei pesquisando e percebi que era necessário adicionar uma variável de ambiente de NO_PROXY

export NO_PROXY="*"

Consegui através desse link: https://stackoverflow.com/questions/75980623/why-is-my-airflow-hanging-up-if-i-send-a-http-request-inside-a-task