Solucionado (ver solução)
Solucionado
(ver solução)
6
respostas

Erro na execução do comando airflow tasks test

Olá, estou enfrentando um erro ao tentar o comando airflow tasks test, conforme segue na imagem Insira aqui a descrição dessa imagem para ajudar na acessibilidadeJá revisei o código do arquivo twitter_dag.py e não encontrei nenhuma inconsistência em relação ao código do instrutor.

6 respostas

Olá Luiz, eu comentei em outro post seu, eu creio que seja o mesmo erro, mas meu comentario foi infeliz e nao consigo remover. Eu gostaria que postasse o codigo do twitter_dag pra que a gente possa verificar. Pelo que da pra ver no log, o erro vem da linha 6 do twitter_dag "from spark.transformation import twitter_transform". Acho que se voce comentar ela ou excluir, seu codigo roda. Isso ta fazendo quebrar sua dag que é o erro que voce publicou no outro post.

Olá, Afonso! Obrigado pela sua resposta e peço desculpas pela demora em responder. Eu acabei de tentar rodar com a linha 6 comentada, como você sugeriu e, embora não tenha recebido o erro no terminal, ele continua dando o erro no webserver, como mostra a imagem Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Segue também o código do twitter_dag.py, conforme você solicitou:

from datetime import datetime
from os.path import join
from pathlib import Path
from airflow.models import DAG
from airflow.operators.alura import TwitterOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from spark.transformation import twitter_transform
from airflow.utils.dates import days_ago

ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": days_ago(6)
}
BASE_FOLDER = join(
    str(Path("~/Users").expanduser()),
    "Cudo/alura/datapipeline/datalake/{stage}/twitter_aluraonline/{partition}"
)
PARTITION_FOLDER = "extract_date={{ ds }}"
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

with DAG(dag_id="twitter_dag", default_args=ARGS, schedule_interval="0 9 * * *", max_active_runs=1) as dag:
    twitter_operator = TwitterOperator(
        task_id="twitter_aluraonline",
        query="AluraOnline",
        file_path=join(
            BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
            "AluraOnline_{{ ds_nodash }}.json"
        ),
        start_time=("{{" f"execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}"),
        end_time=("{{" f"next_execution_date.strftime('{ TIMESTAMP_FORMAT }')" "}}")
    )

    twitter_transform = SparkSubmitOperator(
        task_id="transform_twitter_aluraonline",
        application=join(str(Path(__file__).parents[2]),
            "spark/transformation.py"
        ),
        name="twitter_transformation",
        application_args=[
            "--src",
            BASE_FOLDER.format(stage="bronze", partition=PARTITION_FOLDER),
            "--dest",
            BASE_FOLDER.format(stage="silver", partition=""),
            "--process-date",
            "{{ ds }}"
        ]
    )

    twitter_operator >> twitter_transform

Oi Luiz, tente fazer o seguinte, exclua esse trecho:

from spark.transformation import twitter_transform

rode o script python twitter_dag.py com o ambiente virtual activado e o airflow_home exportado

python3  /Users/Cudo/alura/datapipeline/airflow/dags/twitter_dag.py

depois dê um list nas dags:

airflow dags lists

Creio que com isso o twitter_dag deva aparecer.

Talvez você tenha erro no twitter_operator, mas por hora isso deve resolver para listar seu dag no airflow.

Olá, Afonso! Excluí o import que você recomendou e rodei o script no ambiente virtual ativado e com o airflow_home exportado. Ao rodar o script, recebi esse erro Insira aqui a descrição dessa imagem para ajudar na acessibilidade E ao dar um list nas dags, o twitter_dag aparece. Você saberia me explicar o que aconteceu? Por que tive que excluir aquele import do twitter_transform e por que continuo recebendo esse ModuleNotFoundError, sendo que o caminho está correto e o arquivo está lá? Muito obrigado!

solução!

Oi Luiz, na verdade erro meu. Era pra você rodar o scritp sem o "-m", o que acontece é que quando você usa o "-m" o python acha que você está chamando um modulo e vai procurar esse modulo no repositório dele (no caso, do ambiente virtual) por isso da esse erro. Quanto ao "from spark.transformation import twitter_transform" eu não lembro a parte do curso que isso foi inserido, mas pelo código, ele é totalmente desnecessário, (no meu codigo ele ficou comentado porque tambem tava dando erro), porque ele chama o transformation.py como uma aplicação do modulo spark_submit_operator. Caso sua duvida tenha sido respondida, marque como solucionado, qualquer outra duvida é so chamar. Valeu, bons estudos.

Muito obrigado pela ajuda e pelas explicações, Afonso! Abraços!