0
respostas

{{ds}} e {{ds_nodash}} não funcionam

Já tentei de tudo, e o jinja template, nao da certo pra nada...

Eu tento usar o date.today do python, mas n sei se vai ter problema mais pra frente.

Ja refiz o código várias vezes, mas o problema continua:

from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
from datetime import datetime, date
from pathlib import Path
from os.path import join
import json

from hooks.twitter_hook import TwitterHook

#/home/william/Documentos/ESTUDOS/ALURA-ENGENHARIA_DADOS/datapipeline/ -- Diretório onde ta vitual
#source .env/bin/activate
#export AIRFLOW_HOME=$(pwd)/airflow
#airflow initdb
#airflow webserver

#-- Outro terminal
#airflow scheduler

# airflow/plugins/operators
#export BEARER_TOKEN=AAAAAAAAAAAAAAAAAAAAAHUEYQEAAAAAjQdylpkuRxSGst860iNnYKbfzuw%3DqWyk9fitbG8udwp4uUeVULFu0i9ELt5ikBzwDW22lejGr9bgOC


class TwitterOperator(BaseOperator):

    template_fileds = [
        "query",
        "file_path",
        "start_time",
        "end_time"
    ]

    @apply_defaults
    def __init__(
        self,
        query,
        file_path,
        conn_id = None,
        start_time = None,
        end_time = None,
        *args, **kwargs
        ):
        super().__init__(*args, **kwargs)
        self.query = query
        self.file_path = file_path
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time

    def create_parent_folder(self):
        Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)

    def execute(self, context):
        hook = TwitterHook(
            query=self.query,
            conn_id=self.conn_id,
            start_time=self.start_time,
            end_time=self.end_time
            )
        self.create_parent_folder()
        with open(self.file_path, "w") as output_file:
            for pg in hook.run():
                json.dump(pg, output_file, ensure_ascii=False)
                output_file.write("\n")

if __name__=="__main__":
    with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:
        to = TwitterOperator(
            query="AluraOnline",
            file_path=join(
                "/home/william/Documentos/ESTUDOS/ALURA-ENGENHARIA_DADOS/datapipeline/datalake",
                "twitter_aluraonline",
                "extract_date={{ ds }}",
                "AluraOnline_{{ ds_nodash }}"  #f"AluraOnline_{date.today()}.json",  #
                ),
            task_id="test_run"
        )
        ti = TaskInstance(task=to, execution_date=datetime.now())
        ti.run()

Insira aqui a descrição dessa imagem para ajudar na acessibilidade