2
respostas

[Dúvida] Trabalhando com go routine channel's e worker's, não está mudando nada.

Estou trabalhando num processo de manipulação de dados (PostgreSQL) de dados utilizando Go, estou tentando trabalhar com worker's e channel's, creio que estou fazendo algo de errado, mas não está trazendo diferença no tempo de execução.

O ambiente que estou trabalhando possui mais de 9 milhões de registros e na tentativa de melhorar o processo fiz uma segmentação para consultar os dados separados por ano. Abaixo trago o código que estou trabalhando.

Conexão com o banco de dados:

// Obtém conexão com o banco de dados da Database
func ConnectDatabase() (*gorm.DB, *sql.DB, error) {
    // Carregando as configurações do banco de dados
    conf, err := configs.LoadConfig(".")
    if err != nil {
        panic(err)
    }
    // Abrindo a conexão com o banco de dados via GORM
    dsn := "host=" + conf.DBHostDatabase + " user=" + conf.DBUserDatabase + " password=" + conf.DBPassDatabase + " dbname=" + conf.DBNameDatabase + " port=" + conf.DBPortDatabase + " sslmode=disable" + " client_encoding=UTF8"
    db, errOpenDb := gorm.Open(
        postgres.Open(dsn),
    )

    if errOpenDb != nil {
        panic(errOpenDb)
    }

    sqlDB, errSqlDB := db.DB()

    if errSqlDB != nil {
        panic(errSqlDB)
    }

    sqlDB.SetMaxIdleConns(30)
    sqlDB.SetMaxOpenConns(60)
    sqlDB.SetConnMaxLifetime(time.Hour)

    return db, sqlDB, nil
}

Método para consultar os dados:

func (t *TributarioDB) AtualizarDividaPalmas() (bool, error) {
    qtdWorkers := 50
    defer t.sqlDB.Close()
    duamCh := make(chan dto.DUAMAtualizaDivida)
    wg := sync.WaitGroup{}
    anoAtual := time.Now().Year()
    for i := anoAtual; i >= 2023; i-- {
        wg.Add(1)
        go func(ano int) {
            defer wg.Done()
            var duams []dto.DUAMAtualizaDivida
            anoString := fmt.Sprintf("%d", ano)
            query := fmt.Sprintf(query.SQLGetDUAMParcelasVencidasPorAno, anoString)
            err := t.DB.Raw(query).Scan(&duams).Error
            if err != nil {
                log.Println("Ocorreu um erro ao executar a consulta das DuasVencidas", err)
                return
            }
            for _, duam := range duams {
                duamCh <- duam
            }
        }(i)
    }
    for i := 0; i < qtdWorkers; i++ {
        go t.AtualizarDividaPalmasParallel(i, duamCh)
    }
    wg.Wait()
    close(duamCh)

    return true, nil
}

Função a ser executada via go routine.

func (t *TributarioDB) AtualizarDividaPalmasParallel(workerID int, duamCh chan dto.DUAMAtualizaDivida) {
    diaAtual := time.Now().Format("2006-01-02")
    resultQuery := ""
    for duam := range duamCh {
        query := fmt.Sprintf(
            query.SQLAtualizaDividaPalmas, duam.DUAM, diaAtual, duam.PARCELA)
        err := t.DB.Raw(query).Scan(&resultQuery).Error
        if err != nil {
            log.Println("Erro ao atualizar DUAM:", err)
            panic(err)
        }
    }
    ...
}

A grande questão é que o tempo de execução é quase sempre o mesmo, girando em torno de 29 minutos, mesmo que eu aumente o número de worker's e máximo de conexões abertas.

Gostaria portanto de saber o que estou esquecendo ou fazendo de errado.

2 respostas

Olá Afrânio! Tudo joia?

Seguem algumas sugestões que podem ajudar a melhorar o desempenho do seu código:

  1. Verifique o Gargalo do Banco de Dados: Mesmo que você tenha múltiplas goroutines, o banco de dados pode ser o gargalo. Certifique-se de que o banco de dados pode lidar com o número de conexões simultâneas que você está tentando abrir. Verifique se há índices adequados nas colunas que você está consultando e atualizando.

  2. Limite de Conexões: Você mencionou que ajustou o número de conexões, mas é importante garantir que o banco de dados e o driver Go estão configurados para suportar esse número. Verifique as configurações do PostgreSQL para max_connections e ajuste conforme necessário.

  3. Batch Processing: Em vez de enviar cada DUAM individualmente para atualização, considere processar em lotes. Isso pode reduzir significativamente o tempo de execução, pois você estará fazendo menos chamadas ao banco de dados.

Por exemplo, se você estiver atualizando muitos registros individualmente, considere algo como:

// Exemplo de processamento em lote
var duamsBatch []dto.DUAMAtualizaDivida
batchSize := 100 // Ajuste conforme necessário
for duam := range duamCh {
    duamsBatch = append(duamsBatch, duam)
    if len(duamsBatch) >= batchSize {
        // Processa o lote
        // Atualize o banco de dados com o lote
        duamsBatch = duamsBatch[:0] // Limpa o lote
    }
}
// Não esqueça de processar o restante
if len(duamsBatch) > 0 {
    // Atualize o banco de dados com o restante
}

Espero que essas dicas ajudem a melhorar o desempenho do seu processo.

Bons estudos!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓.

No caso do processo em lote, eu tenho como controlar numa transaction a exclusão de registro existente para inserir o novo registro? Como é um script de extração de dados de um sistema legado tenho que garantir que o registro do dia anterior só seja considerado como "migrado" se o registro do dia anterior for apagado e o novo inserido.