Olá, estou enfrentando um erro ao tentar o comando airflow tasks test, conforme segue na imagem
Já revisei o código do arquivo twitter_dag.py e não encontrei nenhuma inconsistência em relação ao código do instrutor.
Você está vendo a versão anterior da nova experiência da Alura que estamos preparando para você. Em breve, ela ganha uma identidade visual novinha totalmente pensada em potencializar seus estudos!
Olá, estou enfrentando um erro ao tentar o comando airflow tasks test, conforme segue na imagem
Já revisei o código do arquivo twitter_dag.py e não encontrei nenhuma inconsistência em relação ao código do instrutor.
Olá Luiz, eu comentei em outro post seu, eu creio que seja o mesmo erro, mas meu comentario foi infeliz e nao consigo remover. Eu gostaria que postasse o codigo do twitter_dag pra que a gente possa verificar. Pelo que da pra ver no log, o erro vem da linha 6 do twitter_dag "from spark.transformation import twitter_transform". Acho que se voce comentar ela ou excluir, seu codigo roda. Isso ta fazendo quebrar sua dag que é o erro que voce publicou no outro post.
Olá, Afonso!
Obrigado pela sua resposta e peço desculpas pela demora em responder.
Eu acabei de tentar rodar com a linha 6 comentada, como você sugeriu e, embora não tenha recebido o erro no terminal, ele continua dando o erro no webserver, como mostra a imagem
Segue também o código do twitter_dag.py, conforme você solicitou:
from datetime import datetime
from os.path import join
from pathlib import Path
from airflow.models import DAG
from airflow.operators.alura import TwitterOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from spark.transformation import twitter_transform
from airflow.utils.dates import days_ago
ARGS = {
"owner": "airflow",
"depends_on_past": False,
"start_date": days_ago(6)
}
BASE_FOLDER = join(
str(Path("~/Users").expanduser()),
"Cudo/alura/datapipeline/datalake/{stage}/twitter_aluraonline/{partition}"
)
PARTITION_FOLDER = "extract_date={{ ds }}"
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
with DAG(dag_id="twitter_dag", default_args=ARGS, schedule_interval="0 9 * * *", max_active_runs=1) as dag:
twitter_operator = TwitterOperator(
task_id="twitter_aluraonline",
query="AluraOnline",
file_path=join(
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
"AluraOnline_{{ ds_nodash }}.json"
),
start_time=("{{" f"execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}"),
end_time=("{{" f"next_execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}")
)
twitter_transform = SparkSubmitOperator(
task_id="transform_twitter_aluraonline",
application=join(str(Path(__file__).parents[2]),
"spark/transformation.py"
),
name="twitter_transformation",
application_args=[
"--src",
BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
"--dest",
BASE_FOLDER.format(stage="silver", partition=""),
"--process-date",
"{{ ds }}"
]
)
twitter_operator >> twitter_transform
Oi Luiz, tente fazer o seguinte, exclua esse trecho:
from spark.transformation import twitter_transformrode o script python twitter_dag.py com o ambiente virtual activado e o airflow_home exportado
python3 /Users/Cudo/alura/datapipeline/airflow/dags/twitter_dag.pydepois dê um list nas dags:
airflow dags listsCreio que com isso o twitter_dag deva aparecer.
Talvez você tenha erro no twitter_operator, mas por hora isso deve resolver para listar seu dag no airflow.
Olá, Afonso!
Excluí o import que você recomendou e rodei o script no ambiente virtual ativado e com o airflow_home exportado.
Ao rodar o script, recebi esse erro
E ao dar um list nas dags, o twitter_dag aparece.
Você saberia me explicar o que aconteceu? Por que tive que excluir aquele import do twitter_transform e por que continuo recebendo esse ModuleNotFoundError, sendo que o caminho está correto e o arquivo está lá?
Muito obrigado!
Oi Luiz, na verdade erro meu. Era pra você rodar o scritp sem o "-m", o que acontece é que quando você usa o "-m" o python acha que você está chamando um modulo e vai procurar esse modulo no repositório dele (no caso, do ambiente virtual) por isso da esse erro.
Quanto ao "from spark.transformation import twitter_transform" eu não lembro a parte do curso que isso foi inserido, mas pelo código, ele é totalmente desnecessário, (no meu codigo ele ficou comentado porque tambem tava dando erro), porque ele chama o transformation.py como uma aplicação do modulo spark_submit_operator.
Caso sua duvida tenha sido respondida, marque como solucionado, qualquer outra duvida é so chamar. Valeu, bons estudos.
Muito obrigado pela ajuda e pelas explicações, Afonso! Abraços!