0
respostas

Código não cria a pasta

Boa tarde! Meu código só funciona quando eu manualmente crio a pasta datalake/twitter_aluraonline/extraction_date=(dia de hoje). QUando eu não crio, ele dá o erro dizendo que a pasta não foi encontrada.

Exemplo:

FileNotFoundError: [Errno 2] No such file or directory: '/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/datalake/twitter_aluraonline/extract_date=2022-02-17/AluraOnline_20220217.json'

Código:

import sys
sys.path.append("/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/airflow/plugins")
from pathlib import Path
from airflow.models import BaseOperator,DAG,TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
from os.path import join

class TwitterOperator(BaseOperator):

    template_fields = [
        "query",
        "file_path",
        "start_time",
        "end_time"
    ]

    @apply_defaults
    def __init__(self,
                 query,
                 file_path,
                 conn_id = None,
                 start_time = None,
                 end_time = None,
                 *args,
                 **kwargs):
        super().__init__(*args,**kwargs)
        self.query = query
        self.file_path = file_path
        self.conn_id = conn_id
        self.start_time = start_time
        self.end_time = end_time

    def create_parent_folder(self):
        Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)




    def execute(self, context):
        hook = TwitterHook(query=self.query, 
            conn_id = self.conn_id,
            start_time=self.start_time,
            end_time=self.end_time
        )
        self.create_parent_folder
        with open(self.file_path, "w") as output_file:
            for pg in hook.run():
                json.dump(pg, output_file, ensure_ascii=False)
                output_file.write("\n")

if __name__ == '__main__':
    with DAG(dag_id='Twitter_Test',start_date=datetime.now()) as dag:
        to = TwitterOperator(
            query="AluraOnline",
            file_path=join(
                "/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/datalake",
                "twitter_aluraonline",
                "extract_date={{ ds }}",
                "AluraOnline_{{ ds_nodash }}.json"
                ),
            task_id="test_run"
        )
        ti = TaskInstance(task=to, execution_date=datetime.now() - timedelta(days=1))
        ti.run()