from os.path import join
from pathlib import Path
from airflow.utils.dates import days_ago
from operators.twitter_operator import TwitterOperator
with DAG(dag_id="TwitterDAG", start_date=days_ago(2), schedule_interval="@daily") as dag:
BASE_FOLDER = join(
str(Path("~/Documents").expanduser()),
"curso2/datalake/{stage}/twitter_datascience/{partition}",
)
PARTITION_FOLDER_EXTRACT = "extract_date={{ data_interval_start.strftime('%Y-%m-%d') }}"
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
query = "datascience"
twitter_operator = TwitterOperator(
file_path=join(BASE_FOLDER.format(stage="Bronze", partition=PARTITION_FOLDER_EXTRACT),
"datascience_{{ ds_nodash }}.json"),
query=query, start_time="{{ data_interval_start.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}",
end_time="{{ data_interval_end.strftime('%Y-%m-%dT%H:%M:%S.00Z') }}", task_id="twitter_datascience")
twitter_transform = SparkSubmitOperator(task_id="transform_twitter_datascience",
application="/home/lia/Documents/curso2/src/spark/transformation.py",
name="twittter_transformation",
application_args=["--src", BASE_FOLDER.format(stage="Bronze",
partition=PARTITION_FOLDER_EXTRACT),
"--dest",
BASE_FOLDER.format(stage="Silver", partition=""),
"--process-date", "{{ds}}"]
)
twitter_insight = SparkSubmitOperator(task_id="insight_twitter",
application="/home/lia/Documents/curso2/src/spark/insight_tweet.py",
name="insight_twitter",
application_args=["--src", BASE_FOLDER.format(stage="Silver", partition=""),
"--dest", BASE_FOLDER.format(stage="Gold", partition=""),
"--process-date", "{{ ds }}"])
# partition é vazio quando ler o dataframe inteiro
twitter_operator >> twitter_transform >> twitter_insight
###########################################################################################
ERRO
2023-09-08, 14:21:31 UTC] {taskinstance.py:1943} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/lia/Documents/curso2/venv/lib/python3.9/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 156, in execute
self._hook.submit(self._application)
File "/home/lia/Documents/curso2/venv/lib/python3.9/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 422, in submit
raise AirflowException(
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master local --name insight_twitter /home/lia/Documents/curso2/src/spark/insight_tweet.py --src /home/lia/Documents/curso2/datalake/Silver/twitter_datascience/ --dest /home/lia/Documents/curso2/datalake/Gold/twitter_datascience/ --process-date 2023-09-06. Error code is: 1.
[2023-09-08, 14:21:31 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=TwitterDAG, task_id=insight_twitter, execution_date=20230906T000000, start_date=20230908T142127, end_date=20230908T142131
[2023-09-08, 14:21:31 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 40 for task insight_twitter (Cannot execute: spark-submit --master local --name insight_twitter /home/lia/Documents/curso2/src/spark/insight_tweet.py --src /home/lia/Documents/curso2/datalake/Silver/twitter_datascience/ --dest /home/lia/Documents/curso2/datalake/Gold/twitter_datascience/ --process-date 2023-09-06. Error code is: 1.; 15660)
[2023-09-08, 14:21:31 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-09-08, 14:21:31 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check