Solucionado (ver solução)
Solucionado
(ver solução)
9
respostas

DAG Import Errors - No module named 'operators'

Meu Airflow está apresentando o seguinte erro:

Broken DAG: [/home/poletto/Documents/CURSO1/airflow_pipeline/dags/twitter_dag.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/poletto/Documents/CURSO1/airflow_pipeline/dags/twitter_dag.py", line 6, in <module>
    from operators.twitter_operator import TwitterOperator
ModuleNotFoundError: No module named 'operators'
9 respostas

Meu twitter_dag.py:

import sys
sys.path.append("airflow_pipeline")

from airflow.models import DAG
from datetime import datetime, timedelta
from operators.twitter_operator import TwitterOperator
from os.path import join

with DAG(dag_id = "TwitterTest", start_date=datetime.now()) as dag:
        
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"
    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(days=-7)).date().strftime(TIMESTAMP_FORMAT)
    query = "datascience"

    to = TwitterOperator(file_path=join("datalake/twitter_datascience", 
        "extract_date={{ ds }}", 
        "datascience_{{ ds_nodash }}.json"), 
        query=query, start_time=start_time, end_time=end_time, task_id="test_run")

Meu twitter_operator.py:

import sys
sys.path.append("airflow_pipeline")

from airflow.models import BaseOperator, DAG, TaskInstance
from hook.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
from os.path import join
from pathlib import Path

class TwitterOperator(BaseOperator):

    template_fields = ["query", "file_path", "start_time", "end_time"]

    def __init__(self, file_path, end_time, start_time, query, **kwargs):
        self.end_time = end_time
        self.start_time = start_time
        self.query =  query
        self.file_path = file_path
        super().__init__(**kwargs)

    def create_parent_folder(self):
        (Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)

    def execute(self, context):
        end_time = self.end_time
        start_time = self.start_time
        query = self.query

        self.create_parent_folder()

        with open (self.file_path, "w") as output_file:
            for pg in TwitterHook (end_time, start_time, query).run():
                json.dump(pg, output_file, ensure_ascii=False)
                output_file.write("̣\n")

if __name__ == "__main__":
    #montando URL
    TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.00Z"

    end_time = datetime.now().strftime(TIMESTAMP_FORMAT)
    start_time = (datetime.now() + timedelta(days=-7)).date().strftime(TIMESTAMP_FORMAT)
    query = "datascience"

    with DAG(dag_id = "TwitterTest", start_date=datetime.now()) as dag:
        to = TwitterOperator(file_path=join("datalake/twitter_datascience", 
                                            f"extract_date={datetime.now().date()}", 
                                            f"datascience_{datetime.now().date().strftime('%Y%m%d')}.json"), 
                                            query=query, start_time=start_time, end_time=end_time, task_id="test_run")
        ti = TaskInstance(task=to)
        to.execute(ti.task_id)

Olá Marcelo, tudo bem ? Espero que sim.

Verifica por favor se você está rodando o comando airflow standalone da pasta airflow_pipeline ?

(venv) poletto@PC:~/Documents/CURSO1/airflow_pipeline/$ airflow standalone

Se for esse o caso, volte uma pasta e rode o comando da pasta CURSO1

cd ..

E depois inicie novamente o Airflow

(venv) poletto@PC:~/Documents/CURSO1/$ airflow standalone

Fico no aguardo para saber se assim o airflow conseguiu localizar o operators, mas caso não funciona peço que mande um print de como estão suas pastas e como está sendo rodado o comando no terminal.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Fiz isso que tu sugeriu, exportei a variável com o comando export AIRFLOW_HOME=$(pwd)/ estando na pasta (venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ e na sequência subi o Airflow com o comando airflow standalone.

Dessa forma ele subiu uma nova instância do Airflow (precisei copiar a nova senha), não aparecendo a DAG que criamos e nem a conexão twitter_default.

Minha estrutura de pastas é essa:

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Comandos no terminal conforme tu pediu:

(venv) poletto@poletto-virtual-machine:~$ cd Documents/CURSO1/
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ export AIRFLOW_HOME=$(pwd)/
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ airflow standalone
standalone | Starting Airflow Standalone
standalone | Checking database is initialized

...e rodando da forma abaixo, de dentro da pasta /airflow_pipeline é que dá o erro do "operators", porém dessa forma ele sobe a instância correta, com a conexão twitter_default e sem nenhuma DAG (tendo em vista o parâmetro que mudamos pra false no airflw.cfg.

(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ cd airflow_pipeline/
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1/airflow_pipeline$ export AIRFLOW_HOME=$(pwd)/
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1/airflow_pipeline$ airflow standalone
solução!

Olá Marcelo.

Nessa etapa aqui, você pode mudar o export para incluir o nome da pasta airflow_pipeline.

export AIRFLOW_HOME=$(pwd)/airflow_pipeline

Então os passos completos ficam assim, entrar na pasta CURSO1, exportar a variável de ambiente, inicializar o airflow standalone.

(venv) poletto@poletto-virtual-machine:~$ cd Documents/CURSO1/
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ export AIRFLOW_HOME=$(pwd)/airflow_pipeline
(venv) poletto@poletto-virtual-machine:~/Documents/CURSO1$ airflow standalone
standalone | Starting Airflow Standalone
standalone | Checking database is initialized

Dessa maneira você vai iniciar o airflow da parta CURSO1 e mesmo assim manter a mesmas configurações que já tinha feito.

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓. Bons Estudos!

Deu certo, Igor! Obrigado!