Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

Problema na execução do operator

quando tento executar o código aparece isso.

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

import os
from pathlib import Path
import sys
import json
sys.path.append("/mnt/f/alura-airflow/docker-airflow/datapipeline/airflow/plugins")
sys.path.append("/mnt/f/alura-airflow/docker-airflow/datapipeline/airflow/dags")




from airflow.models import BaseOperator,DAG,TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import airflow.macros as macros
from os.path import join

from datetime import datetime, timedelta






class TwitterOperator(BaseOperator):

    template_fields =[
        "query",
        "filepath",
        "start_time",
        "end_time"
    ]


    @apply_defaults
    def __init__(self,
                 query,
                 filepath,
                 conn_id = None,
                 start_time = None,
                 end_time = None,
                 *args,
                 **kwargs):
        super().__init__(*args,**kwargs)
        self.query = query
        self.filepath = filepath
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time


    def create_parant_folder(self):
        Path(Path(self.filepath).parent).mkdir(parents=True,exist_ok=True)

    def execute(self, context):
        hook = TwitterHook(query=self.query, 
                           conn_id = self.conn_id,
                           start_time=self.start_time,
                           end_time=self.end_time
                           )
        self.create_parant_folder()
        with open(self.filepath,"w") as outputfile:
            for pg in hook.run():
                json.dump(pg,outputfile, ensure_ascii=False)
                outputfile.write("\n")


if __name__ == '__main__':


    with DAG(dag_id='TwitterTest',start_date=datetime.now()) as dag:
        to =TwitterOperator(
            query="AluraOnline",
            filepath=join("/mnt/f/alura-airflow/docker-airflow/datapipeline/datalake",
                          "dados_Twiter",
                          "execution_date={{ ds }}",
                          "AluraOnline_{{ ds_nodash }}.json"),
            task_id="test_run"
        )
        ti = TaskInstance(task=to, execution_date = datetime.now())
        ti.run()
1 resposta
solução!

Consegui resolver dando um

airflow resetdb
#depois uma
airflow initdb

acredito que a solução deve funcionar para o airflow 2.x tbm, qualquer coisa testo e depois posto aqui.