Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

No module named 'hooks'

Pessoal, inclui o codigo no arquivo twitter_operator.py import sys sys.path.append("/airflow/plugins")

Mas continua dando erro. Notei que na aula ele criou a pasta operators dentro da pasta hooks e quando ele roda no terminal a pasta operators parece estar dentro da pasta plugins em paralelo a pasta hooks.

import sys
sys.path.append("/airflow/plugins")

from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime
from pathlib import Path


class TwitterOperator(BaseOperator):

    template_fields = [
        "query",
        "file_path",
        "start_time",
        "end_time"
    ]

    @apply_defaults
    def __init__(
            self,
            query,
            file_path,
            conn_id=None,
            start_time=None,
            end_time=None,
            *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.query = query
        self.file_path = file_path
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time

    def create_parent_folder(self):
        Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)

    def execute(self, context):
        hook = TwitterHook(
            query=self.query,
            conn_id=self.conn_id,
            start_time=self.start_time,
            end_time=self.end_time
        )
        self.create_parent_folder()
        with open(self.file_path, "w") as output_file:
            for pg in hook.run():
                json.dump(pg, output_file, ensure_ascii=False)


if __name__ == "__main__":
    with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:
        to = TwitterOperator(
            query="AluraOnLine",
            file_path="AluraOnLine_{{ ds_nodash }}",
            task_id="test_run"
        )
        ti = TaskInstance(task=to, execution_date=datetime.now())
        ti.run()

Minha estrutura de pastas está representada na figura abaixo.

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

2 respostas
solução!

Olá, Hugo! Tudo tranquilo por aí?

Tente colocar o caminho completo da pasta em sys.path.append("{caminho_completo}").

Por exemplo:

import sys
sys.path.append("/home/enyak/datapipeline/plugins")

Coloquei o meu assim. Você pode verificar o caminho no Linux executando o comando pwd no terminal dentro da pasta plugins, conforme imagem abaixo:

Terminal do linux em fundo preto executando o comando "pwd". Obtemos então a saída: /home/enyak/datapipeline/plugins

Se mesmo assim o erro persistir, peço que me envie o código da forma como você colocou e um print screen da tela de erro para sermos mais assertivos na resposta.

Espero ter ajudado, mas se ainda persistir alguma dúvida estou sempre à disposição.

:)

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓.Bons Estudos!

Oi Bruno, não estava claro pra mim que sempre teria que passar a variavel de ambiente antes de executar o airflow webserver. Estava com um airflow na pasta raiz e outro na pasta datapipeline.

Outro gap que observei é que é necessário um terminal para ativar o webserver e outro para ativar o scheduler ou então aprender a colocar rodando no background. Essas dicas são super importantes para quem não é da área de TI.