quando tento executar o código aparece isso.
import os
from pathlib import Path
import sys
import json
sys.path.append("/mnt/f/alura-airflow/docker-airflow/datapipeline/airflow/plugins")
sys.path.append("/mnt/f/alura-airflow/docker-airflow/datapipeline/airflow/dags")
from airflow.models import BaseOperator,DAG,TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import airflow.macros as macros
from os.path import join
from datetime import datetime, timedelta
class TwitterOperator(BaseOperator):
template_fields =[
"query",
"filepath",
"start_time",
"end_time"
]
@apply_defaults
def __init__(self,
query,
filepath,
conn_id = None,
start_time = None,
end_time = None,
*args,
**kwargs):
super().__init__(*args,**kwargs)
self.query = query
self.filepath = filepath
self.conn_id = conn_id
self.start_time = start_time
self.end_time = end_time
def create_parant_folder(self):
Path(Path(self.filepath).parent).mkdir(parents=True,exist_ok=True)
def execute(self, context):
hook = TwitterHook(query=self.query,
conn_id = self.conn_id,
start_time=self.start_time,
end_time=self.end_time
)
self.create_parant_folder()
with open(self.filepath,"w") as outputfile:
for pg in hook.run():
json.dump(pg,outputfile, ensure_ascii=False)
outputfile.write("\n")
if __name__ == '__main__':
with DAG(dag_id='TwitterTest',start_date=datetime.now()) as dag:
to =TwitterOperator(
query="AluraOnline",
filepath=join("/mnt/f/alura-airflow/docker-airflow/datapipeline/datalake",
"dados_Twiter",
"execution_date={{ ds }}",
"AluraOnline_{{ ds_nodash }}.json"),
task_id="test_run"
)
ti = TaskInstance(task=to, execution_date = datetime.now())
ti.run()