Boa tarde. Finalizei o desenvolvimento porém as tasks dos DAGs não concluem. Chega a criar os diretórios do datalake, gerar o arquivo.json, mas o arquivo fica zerado e o Operator não conclui.
Link para repositório git: https://github.com/barguia/alura-airflow.git
Obs.: Executando o Operator manualmente ele gera o arquivo normalmente:
Code twitter_dag.py
import sys
sys.path.append("airflow_pipeline")
from airflow.models import DAG
from datetime import datetime, timedelta
from operators.twitter_operator import TwitterOperator
from os.path import join
from airflow.utils.dates import days_ago
with DAG(dag_id = "TwitterDAG", start_date=days_ago(2), schedule_interval="@daily") as dag:
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
query = "datascience"
to = TwitterOperator(file_path=join("datalake/twitter_datascience",
"extract_data={{ ds }}",
"datascience_{{ ds_nodash }}.json"),
query=query,
start_time="{{ data_interval_start.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}",
end_time="{{ data_interval_end.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}",
task_id="twitter_datascience")
Log Task Instance: twitter_datascience
[2024-02-11T19:53:59.509-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-09T00:00:00.00Z&end_time=2024-02-10T00:00:00.00Z
[2024-02-11T19:57:04.197-0300] {local_task_job_runner.py:302} WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2024-02-11T19:57:04.230-0300] {process_utils.py:131} INFO - Sending 15 to group 20771. PIDs of all processes in the group: [20771]
[2024-02-11T19:57:04.231-0300] {process_utils.py:86} INFO - Sending the signal 15 to group 20771
[2024-02-11T19:58:04.241-0300] {process_utils.py:149} WARNING - process psutil.Process(pid=20771, name='Python', status='running', started='19:53:59') did not respond to SIGTERM. Trying SIGKILL
[2024-02-11T19:58:04.257-0300] {process_utils.py:86} INFO - Sending the signal 9 to group 20771
[2024-02-11T19:58:04.265-0300] {process_utils.py:79} INFO - Process psutil.Process(pid=20771, name='Python', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='19:53:59') (20771) terminated with exit code -9
[2024-02-11T19:58:04.267-0300] {standard_task_runner.py:175} ERROR - Job 7 was killed before it finished (likely due to running out of memory)
[2024-02-11T19:58:23.578-0300] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [queued]>
[2024-02-11T19:58:23.606-0300] {taskinstance.py:1956} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [queued]>
[2024-02-11T19:58:23.606-0300] {taskinstance.py:2170} INFO - Starting attempt 1 of 1
[2024-02-11T19:58:23.641-0300] {taskinstance.py:2191} INFO - Executing <Task(TwitterOperator): twitter_datascience> on 2024-02-09 00:00:00+00:00
[2024-02-11T19:58:23.644-0300] {standard_task_runner.py:60} INFO - Started process 20934 to run task
[2024-02-11T19:58:23.659-0300] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'TwitterDAG', 'twitter_datascience', 'scheduled__2024-02-09T00:00:00+00:00', '--job-id', '8', '--raw', '--subdir', 'DAGS_FOLDER/twitter_dag.py', '--cfg-path', '/var/folders/71/k9sksnzs6rj0j7mjc8fjnb100000gn/T/tmpfjr0etnc']
[2024-02-11T19:58:23.664-0300] {standard_task_runner.py:88} INFO - Job 8: Subtask twitter_datascience
[2024-02-11T19:58:23.815-0300] {task_command.py:423} INFO - Running <TaskInstance: TwitterDAG.twitter_datascience scheduled__2024-02-09T00:00:00+00:00 [running]> on host eduardos-air.home
[2024-02-11T19:58:24.043-0300] {taskinstance.py:2480} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='TwitterDAG' AIRFLOW_CTX_TASK_ID='twitter_datascience' AIRFLOW_CTX_EXECUTION_DATE='2024-02-09T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-02-09T00:00:00+00:00'
[2024-02-11T19:58:24.045-0300] {twitter_operator.py:31} INFO - {'start': '2024-02-09T00:00:00.00Z', 'end': '2024-02-10T00:00:00.00Z'}
[2024-02-11T19:58:24.087-0300] {base.py:83} INFO - Using connection ID 'twitter_default' for task execution.
[2024-02-11T19:58:24.090-0300] {twitter_hook.py:29} INFO - URL: https://labdados.com/2/tweets/search/recent?query=data science&tweet.fields=author_id,conversation_id,created_at,id,in_reply_to_user_id,public_metrics,lang,text&expansions=author_id&user.fields=id,name,username,created_at&start_time=2024-02-09T00:00:00.00Z&end_time=2024-02-10T00:00:00.00Z