Olá, pessoal!
Estou com o problema abaixo ao executar o meu pipeline, ou seja, algumas atividades são executadas em completo e outras não.
Abaixo segue o log, mas pelo que percebi, comparando com um código que funcionou, o erro ocorre a partir da linha 87 que coloco aqui: (87, 88 e 89)
[2022-08-04 16:40:10,515] {spark_submit_hook.py:479} INFO - Traceback (most recent call last):
[2022-08-04 16:40:10,515] {spark_submit_hook.py:479} INFO - File "/home/amadeus/ama/airflow-pipeline-alura/datapipeline/spark/transfortmation.py", line 50, in <module>
[2022-08-04 16:40:10,515] {spark_submit_hook.py:479} INFO - twitter_transform(spark, args.src, args.dest, args.process_date)
Linhas finais:
airflow.exceptions.AirflowException: Cannot execute: /home/amadeus/Downloads/spark-3.3.0-bin-hadoop2/bin/spark-submit --master local --name twitter_transformation /home/amadeus/ama/airflow-pipeline-alura/datapipeline/spark/transfortmation.py --src /home/amadeus/ama/airflow-pipeline-alura/datapipeline/datalake/bronze/twitter_flamengomalvadao/extract_date=2022-07-30 --dest /home/amadeus/ama/airflow-pipeline-alura/datapipeline/datalake/silver/twitter_flamengomalvadao/ --process-date 2022-07-30. Error code is: 1.
[2022-08-04 16:40:10,638] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=twitter_dag, task_id=transform_twitter_flamengomalvadao, execution_date=20220730T090000, start_date=20220804T194002, end_date=20220804T194010
[2022-08-04 16:40:12,097] {local_task_job.py:102} INFO - Task exited with return code 1
Código total (twitter_dag.py):
from datetime import datetime
from os.path import join
from pathlib import Path
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import DAG
from airflow.operators.amadeus import TwitterOperator
from airflow.utils.dates import days_ago
ARGS = {
"owner": "airflow", # Nome do responsável pelo DAG
"depends_on_past": False, # Vai depender de uma inst. anterior ou não (Nosso caso nao precisa da data anterior)
"start_date": days_ago(6) # Quando iniciar a tarefa? (Nosso caso, seis dias atrás da data que eu iniciar a execucao)
}
BASE_FOLDER = join(
"/home/amadeus/ama/airflow-pipeline-alura/datapipeline/datalake/{stage}/twitter_flamengomalvadao/{partition}"
) # Deixando o codigo para que possa ser executado em outro ambiente
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z" # Timestamp aceito pelo Twitter
PARTITION_FOLDER = "extract_date={{ ds }}"
with DAG(
dag_id="twitter_dag",
default_args=ARGS,
schedule_interval="0 9 * * *", # Frequencia da execucao / Cron min hora diames meses semanas (todo dia 9 da manha)
max_active_runs=1 # Executa uma instancia por vez
) as dag:
twitter_operator = TwitterOperator(
task_id="twitter_flamengomalvadao",
query="FlamengoMalvadao",
file_path=join(
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER), # Caminho do meu DL com a particao e tabela
"FlamengoMalvadao_{{ ds_nodash }}.json"
),
start_time = (
"{{"
f" execution_date.strftime('{ TIMESTAMP_FORMAT }') "
"}}" # Timestamp do momento de execucao adaptado para str repassada na variavel TIMESTAMP_FORMAT
),
end_time = (
"{{"
f" next_execution_date.strftime('{ TIMESTAMP_FORMAT }') "
"}}" # Da a proxima data de execucao ate a proxima data de exec
)
)
twitter_transform = SparkSubmitOperator(
task_id = "transform_twitter_flamengomalvadao",
application=join(
str(Path(__file__).parents[2]), # volta duas pastas atras
"spark/transfortmation.py"
),
name="twitter_transformation", # Nome spark chama o job
application_args = [
"--src",
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
"--dest",
BASE_FOLDER.format(stage="silver", partition=""),
"--process-date",
"{{ ds }}",
]
)
twitter_operator >> twitter_transform # Conecto o primeiro operador ao segundo (em sequencia)
Sei que é complexo, mas podem ajudar, por favor?