Pessoal, inclui o codigo no arquivo twitter_operator.py import sys sys.path.append("/airflow/plugins")
Mas continua dando erro. Notei que na aula ele criou a pasta operators dentro da pasta hooks e quando ele roda no terminal a pasta operators parece estar dentro da pasta plugins em paralelo a pasta hooks.
import sys
sys.path.append("/airflow/plugins")
from airflow.models import BaseOperator, DAG, TaskInstance
from airflow.utils.decorators import apply_defaults
from hooks.twitter_hook import TwitterHook
import json
from datetime import datetime
from pathlib import Path
class TwitterOperator(BaseOperator):
template_fields = [
"query",
"file_path",
"start_time",
"end_time"
]
@apply_defaults
def __init__(
self,
query,
file_path,
conn_id=None,
start_time=None,
end_time=None,
*args, **kwargs
):
super().__init__(*args, **kwargs)
self.query = query
self.file_path = file_path
self.conn_id = conn_id
self.start_time = start_time
self.end_time = end_time
def create_parent_folder(self):
Path(Path(self.file_path).parent).mkdir(parents=True, exist_ok=True)
def execute(self, context):
hook = TwitterHook(
query=self.query,
conn_id=self.conn_id,
start_time=self.start_time,
end_time=self.end_time
)
self.create_parent_folder()
with open(self.file_path, "w") as output_file:
for pg in hook.run():
json.dump(pg, output_file, ensure_ascii=False)
if __name__ == "__main__":
with DAG(dag_id="TwitterTest", start_date=datetime.now()) as dag:
to = TwitterOperator(
query="AluraOnLine",
file_path="AluraOnLine_{{ ds_nodash }}",
task_id="test_run"
)
ti = TaskInstance(task=to, execution_date=datetime.now())
ti.run()
Minha estrutura de pastas está representada na figura abaixo.