1
resposta

[Dúvida] problema

cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: operator [SQL: ALTER TABLE task_instance ADD COLUMN operator VARCHAR(1000)] (Background on this error at: http://sqlalche.me/e/14/e3q8)

Erro ao executar:

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.macros import ds_add import pendulum from os.path import join import pandas as pd

with DAG( "dados_climaticos", start_date=pendulum.datetime(2022, 8, 22, tz="UTC"), schedule_interval='0 0 * * 1', # executar toda segunda feira ) as dag:

tarefa_1 = BashOperator(
    task_id = 'cria_pasta',
    bash_command = 'mkdir -p "/home/macaubas/Documentos/APACHE_AIRFLOW/primeiro_pipeline/DAG/semana={{data_interval_end.strftime("%Y-%m-%d")}}"'
)

def extrai_dados(data_interval_end):
    city = 'Boston'
    key = 'SDZW8HETJSHP5QKZ8TN4D4AGT'

    URL = join('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/',
        f'{city}/{data_interval_end}/{ds_add(data_interval_end, 7)}?unitGroup=metric&include=days&key={key}&contentType=csv')

    dados = pd.read_csv(URL)

    file_path = f'/home/macaubas/Documentos/APACHE_AIRFLOW/primeiro_pipeline/DAG/semana={data_interval_end}/'

    dados.to_csv(file_path + 'dados_brutos.csv')
    dados[['datetime','tempmin', 'temp', 'tempmax']].to_csv(file_path + 'temperaturas.csv')
    dados[['datetime', 'description', 'icon']].to_csv(file_path + 'condicoes.csv')

tarefa_2 = PythonOperator(
    task_id = 'extrai_dados',
    python_callable = extrai_dados,
    op_kwargs = {'data_interval_end': '{{data_interval_end.strftime("%Y-%m-%d")}}'}
)

tarefa_1 >> tarefa_2
1 resposta

Oi

Isso geralmente acontece quando você tá tentando adicionar uma coluna que já existe, e pelo erro parece ser no trecho ALTER TABLE task_instance ADD COLUMN operator VARCHAR(1000).

Pode ser que a coluna "operator" já exista ou tá sendo adicionada de novo de alguma forma. Dá uma olhada se essa coluna já não tá lá ou se você não tá tentando adicioná-la mais de uma vez.

Fora isso, o DAG em si parece estar orquestrando um pipeline de dados climáticos usando o Visual Crossing Web Services. Bacana demais! A tarefa_1 tá criando uma pasta com base nas datas, e a tarefa_2 tá pegando dados climáticos de Boston e salvando em arquivos CSV. Muito legal!

Dá uma revisada nesse trecho e veja se resolve esse problema. Boa sorte!