Estou com o seguinte erro ao tentar testar a transformação pela linha de comando:
airflow tasks test twitter_dag transform_twitter_aluraonline 2021-10-07
/home/alcsaw/workspace/Alura/datapipeline/airflow/plugins/airflow_plugin.py:5: FutureWarning: Registering operators or sensors in plugins is deprecated -- these should be treated like 'plain' python modules, and imported normally in DAGs.
Airflow 2.0 has removed the ability to register these types in plugins. See <http://airflow.apache.org/docs/stable/howto/custom-operator.html>.
class AluraAirflowPlugin(AirflowPlugin):
[2021-10-07 06:31:05,968] {__init__.py:50} INFO - Using executor SequentialExecutor
[2021-10-07 06:31:05,969] {dagbag.py:417} INFO - Filling up the DagBag from /home/alcsaw/workspace/Alura/datapipeline/airflow/dags
[2021-10-07 06:31:06,020] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: twitter_dag.transform_twitter_aluraonline 2021-10-07T00:00:00+00:00 [None]>
[2021-10-07 06:31:06,034] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: twitter_dag.transform_twitter_aluraonline 2021-10-07T00:00:00+00:00 [None]>
[2021-10-07 06:31:06,034] {taskinstance.py:880} INFO -
--------------------------------------------------------------------------------
[2021-10-07 06:31:06,034] {taskinstance.py:881} INFO - Starting attempt 1 of 1
[2021-10-07 06:31:06,034] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2021-10-07 06:31:06,034] {taskinstance.py:901} INFO - Executing <Task(SparkSubmitOperator): transform_twitter_aluraonline> on 2021-10-07T00:00:00+00:00
[2021-10-07 06:31:06,095] {base_hook.py:89} INFO - Using connection to: id: spark_default. Host: local, Port: None, Schema: None, Login: None, Password: None, extra: XXXXXXXX
[2021-10-07 06:31:06,096] {taskinstance.py:1150} ERROR - sequence item 5: expected str instance, tuple found
Traceback (most recent call last):
File "/home/alcsaw/workspace/Alura/datapipeline/.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/alcsaw/workspace/Alura/datapipeline/.env/lib/python3.8/site-packages/airflow/contrib/operators/spark_submit_operator.py", line 187, in execute
self._hook.submit(self._application)
File "/home/alcsaw/workspace/Alura/datapipeline/.env/lib/python3.8/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 383, in submit
spark_submit_cmd = self._build_spark_submit_command(application)
File "/home/alcsaw/workspace/Alura/datapipeline/.env/lib/python3.8/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 325, in _build_spark_submit_command
self.log.info("Spark-Submit cmd: %s", self._mask_cmd(connection_cmd))
File "/home/alcsaw/workspace/Alura/datapipeline/.env/lib/python3.8/site-packages/airflow/contrib/hooks/spark_submit_hook.py", line 237, in _mask_cmd
r'\1******', ' '.join(connection_cmd), flags=re.I)
TypeError: sequence item 5: expected str instance, tuple found
Alguma sugestão de correção? Tem alguma forma de debugar esse comando para tentar entender onde está chegando o tipo errado?
Obrigado!