Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

ultimo operaçao de DAG falha

from os.path import join
from pathlib import Path
from airflow.utils.dates import days_ago
from operators.twitter_operator import TwitterOperator

with DAG(dag_id="TwitterDAG", start_date=days_ago(2), schedule_interval="@daily") as dag:
    BASE_FOLDER = join(
        str(Path("~/Documents").expanduser()),
        "curso2/datalake/{stage}/twitter_datascience/{partition}",
    )
    PARTITION_FOLDER_EXTRACT = "extract_date={{ data_interval_start.strftime('%Y-%m-%d') }}"

    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    query = "datascience"

    twitter_operator = TwitterOperator(
        file_path=join(BASE_FOLDER.format(stage="Bronze", partition=PARTITION_FOLDER_EXTRACT),
                       "datascience_{{ ds_nodash }}.json"),
        query=query, start_time="{{ data_interval_start.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}",
        end_time="{{ data_interval_end.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}", task_id="twitter_datascience")

    twitter_transform = SparkSubmitOperator(task_id="transform_twitter_datascience",
                                            application="/home/lia/Documents/curso2/src/spark/transformation.py",
                                            name="twittter_transformation",
                                            application_args=["--src", BASE_FOLDER.format(stage="Bronze",
                                                                                          partition=PARTITION_FOLDER_EXTRACT),
                                                              "--dest",
                                                              BASE_FOLDER.format(stage="Silver", partition=""),
                                                              "--process-date", "{{ds}}"]
                                            )
    twitter_insight = SparkSubmitOperator(task_id="insight_twitter",
                                          application="/home/lia/Documents/curso2/src/spark/insight_tweet.py",
                                          name="insight_twitter",
                                          application_args=["--src", BASE_FOLDER.format(stage="Silver", partition=""),
                                                            "--dest", BASE_FOLDER.format(stage="Gold", partition=""),
                                                            "--process-date", "{{ ds }}"])

    # partition é vazio quando ler o dataframe inteiro
    twitter_operator >> twitter_transform >> twitter_insight
    ###########################################################################################
    

ERRO

2023-09-08, 14:21:31 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/lia/Documents/curso2/venv/lib/python3.9/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 156, in execute
    self._hook.submit(self._application)
  File "/home/lia/Documents/curso2/venv/lib/python3.9/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 422, in submit
    raise AirflowException(
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master local --name insight_twitter /home/lia/Documents/curso2/src/spark/insight_tweet.py --src /home/lia/Documents/curso2/datalake/Silver/twitter_datascience/ --dest /home/lia/Documents/curso2/datalake/Gold/twitter_datascience/ --process-date 2023-09-06. Error code is: 1.
[2023-09-08, 14:21:31 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=TwitterDAG, task_id=insight_twitter, execution_date=20230906T000000, start_date=20230908T142127, end_date=20230908T142131
[2023-09-08, 14:21:31 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 40 for task insight_twitter (Cannot execute: spark-submit --master local --name insight_twitter /home/lia/Documents/curso2/src/spark/insight_tweet.py --src /home/lia/Documents/curso2/datalake/Silver/twitter_datascience/ --dest /home/lia/Documents/curso2/datalake/Gold/twitter_datascience/ --process-date 2023-09-06. Error code is: 1.; 15660)
[2023-09-08, 14:21:31 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-09-08, 14:21:31 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check
2 respostas
solução!

Consegui resolver era uma semântica diferente na task anterior

Olá Elis, tudo bem ? Espero que sim.

Fico feliz em saber que conseguiu resolver.

Qualquer duvida não hesite em perguntar.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!