Boa tarde! Meu código só funciona quando eu manualmente crio a pasta datalake/twitter_aluraonline/extraction_date=(dia de hoje). QUando eu não crio, ele dá o erro dizendo que a pasta não foi encontrada.
Exemplo:
FileNotFoundError: [Errno 2] No such file or directory: '/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/datalake/twitter_aluraonline/extract_date=2022-02-17/AluraOnline_20220217.json'
Código:
import sys
sys.path.append("/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/airflow/plugins")
from pathlib import Path
from airflow.models import BaseOperator,DAG,TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime, timedelta
from os.path import join
class TwitterOperator(BaseOperator):
template_fields = [
"query",
"file_path",
"start_time",
"end_time"
]
@apply_defaults
def __init__(self,
query,
file_path,
conn_id = None,
start_time = None,
end_time = None,
*args,
**kwargs):
super().__init__(*args,**kwargs)
self.query = query
self.file_path = file_path
self.conn_id = conn_id
self.start_time = start_time
self.end_time = end_time
def create_parent_folder(self):
Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)
def execute(self, context):
hook = TwitterHook(query=self.query,
conn_id = self.conn_id,
start_time=self.start_time,
end_time=self.end_time
)
self.create_parent_folder
with open(self.file_path, "w") as output_file:
for pg in hook.run():
json.dump(pg, output_file, ensure_ascii=False)
output_file.write("\n")
if __name__ == '__main__':
with DAG(dag_id='Twitter_Test',start_date=datetime.now()) as dag:
to = TwitterOperator(
query="AluraOnline",
file_path=join(
"/home/joaopedro/PycharmProjects/DataPipelineProjectBackup/datalake",
"twitter_aluraonline",
"extract_date={{ ds }}",
"AluraOnline_{{ ds_nodash }}.json"
),
task_id="test_run"
)
ti = TaskInstance(task=to, execution_date=datetime.now() - timedelta(days=1))
ti.run()