Olá, estou enfrentando um erro ao tentar o comando airflow tasks test
, conforme segue na imagem Já revisei o código do arquivo twitter_dag.py e não encontrei nenhuma inconsistência em relação ao código do instrutor.
Olá, estou enfrentando um erro ao tentar o comando airflow tasks test
, conforme segue na imagem Já revisei o código do arquivo twitter_dag.py e não encontrei nenhuma inconsistência em relação ao código do instrutor.
Olá Luiz, eu comentei em outro post seu, eu creio que seja o mesmo erro, mas meu comentario foi infeliz e nao consigo remover. Eu gostaria que postasse o codigo do twitter_dag pra que a gente possa verificar. Pelo que da pra ver no log, o erro vem da linha 6 do twitter_dag "from spark.transformation import twitter_transform". Acho que se voce comentar ela ou excluir, seu codigo roda. Isso ta fazendo quebrar sua dag que é o erro que voce publicou no outro post.
Olá, Afonso! Obrigado pela sua resposta e peço desculpas pela demora em responder. Eu acabei de tentar rodar com a linha 6 comentada, como você sugeriu e, embora não tenha recebido o erro no terminal, ele continua dando o erro no webserver, como mostra a imagem
Segue também o código do twitter_dag.py, conforme você solicitou:
from datetime import datetime
from os.path import join
from pathlib import Path
from airflow.models import DAG
from airflow.operators.alura import TwitterOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from spark.transformation import twitter_transform
from airflow.utils.dates import days_ago
ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(6)
}
BASE_FOLDER = join(
str(Path("~/Users").expanduser()),
"Cudo/alura/datapipeline/datalake/{stage}/twitter_aluraonline/{partition}"
)
PARTITION_FOLDER = "extract_date={{ ds }}"
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
with DAG(dag_id="twitter_dag", default_args=ARGS, schedule_interval="0 9 * * *", max_active_runs=1) as dag:
twitter_operator = TwitterOperator(
task_id="twitter_aluraonline",
query="AluraOnline",
file_path=join(
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
"AluraOnline_{{ ds_nodash }}.json"
),
start_time=("{{" f"execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}"),
end_time=("{{" f"next_execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}")
)
twitter_transform = SparkSubmitOperator(
task_id="transform_twitter_aluraonline",
application=join(str(Path(__file__).parents[2]),
"spark/transformation.py"
),
name="twitter_transformation",
application_args=[
"--src",
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
"--dest",
BASE_FOLDER.format(stage="silver", partition=""),
"--process-date",
"{{ ds }}"
]
)
twitter_operator >> twitter_transform
Oi Luiz, tente fazer o seguinte, exclua esse trecho:
from spark.transformation import twitter_transform
rode o script python twitter_dag.py com o ambiente virtual activado e o airflow_home exportado
python3 /Users/Cudo/alura/datapipeline/airflow/dags/twitter_dag.py
depois dê um list nas dags:
airflow dags lists
Creio que com isso o twitter_dag deva aparecer.
Talvez você tenha erro no twitter_operator, mas por hora isso deve resolver para listar seu dag no airflow.
Olá, Afonso! Excluí o import que você recomendou e rodei o script no ambiente virtual ativado e com o airflow_home exportado. Ao rodar o script, recebi esse erro E ao dar um list nas dags, o twitter_dag aparece. Você saberia me explicar o que aconteceu? Por que tive que excluir aquele import do twitter_transform e por que continuo recebendo esse ModuleNotFoundError, sendo que o caminho está correto e o arquivo está lá? Muito obrigado!
Oi Luiz, na verdade erro meu. Era pra você rodar o scritp sem o "-m", o que acontece é que quando você usa o "-m" o python acha que você está chamando um modulo e vai procurar esse modulo no repositório dele (no caso, do ambiente virtual) por isso da esse erro.
Quanto ao "from spark.transformation import twitter_transform"
eu não lembro a parte do curso que isso foi inserido, mas pelo código, ele é totalmente desnecessário, (no meu codigo ele ficou comentado porque tambem tava dando erro), porque ele chama o transformation.py como uma aplicação do modulo spark_submit_operator.
Caso sua duvida tenha sido respondida, marque como solucionado, qualquer outra duvida é so chamar. Valeu, bons estudos.
Muito obrigado pela ajuda e pelas explicações, Afonso! Abraços!