Estou trabalhando num processo de manipulação de dados (PostgreSQL) de dados utilizando Go, estou tentando trabalhar com worker's e channel's, creio que estou fazendo algo de errado, mas não está trazendo diferença no tempo de execução.
O ambiente que estou trabalhando possui mais de 9 milhões de registros e na tentativa de melhorar o processo fiz uma segmentação para consultar os dados separados por ano. Abaixo trago o código que estou trabalhando.
Conexão com o banco de dados:
// Obtém conexão com o banco de dados da Database
func ConnectDatabase() (*gorm.DB, *sql.DB, error) {
// Carregando as configurações do banco de dados
conf, err := configs.LoadConfig(".")
if err != nil {
panic(err)
}
// Abrindo a conexão com o banco de dados via GORM
dsn := "host=" + conf.DBHostDatabase + " user=" + conf.DBUserDatabase + " password=" + conf.DBPassDatabase + " dbname=" + conf.DBNameDatabase + " port=" + conf.DBPortDatabase + " sslmode=disable" + " client_encoding=UTF8"
db, errOpenDb := gorm.Open(
postgres.Open(dsn),
)
if errOpenDb != nil {
panic(errOpenDb)
}
sqlDB, errSqlDB := db.DB()
if errSqlDB != nil {
panic(errSqlDB)
}
sqlDB.SetMaxIdleConns(30)
sqlDB.SetMaxOpenConns(60)
sqlDB.SetConnMaxLifetime(time.Hour)
return db, sqlDB, nil
}
Método para consultar os dados:
func (t *TributarioDB) AtualizarDividaPalmas() (bool, error) {
qtdWorkers := 50
defer t.sqlDB.Close()
duamCh := make(chan dto.DUAMAtualizaDivida)
wg := sync.WaitGroup{}
anoAtual := time.Now().Year()
for i := anoAtual; i >= 2023; i-- {
wg.Add(1)
go func(ano int) {
defer wg.Done()
var duams []dto.DUAMAtualizaDivida
anoString := fmt.Sprintf("%d", ano)
query := fmt.Sprintf(query.SQLGetDUAMParcelasVencidasPorAno, anoString)
err := t.DB.Raw(query).Scan(&duams).Error
if err != nil {
log.Println("Ocorreu um erro ao executar a consulta das DuasVencidas", err)
return
}
for _, duam := range duams {
duamCh <- duam
}
}(i)
}
for i := 0; i < qtdWorkers; i++ {
go t.AtualizarDividaPalmasParallel(i, duamCh)
}
wg.Wait()
close(duamCh)
return true, nil
}
Função a ser executada via go routine.
func (t *TributarioDB) AtualizarDividaPalmasParallel(workerID int, duamCh chan dto.DUAMAtualizaDivida) {
diaAtual := time.Now().Format("2006-01-02")
resultQuery := ""
for duam := range duamCh {
query := fmt.Sprintf(
query.SQLAtualizaDividaPalmas, duam.DUAM, diaAtual, duam.PARCELA)
err := t.DB.Raw(query).Scan(&resultQuery).Error
if err != nil {
log.Println("Erro ao atualizar DUAM:", err)
panic(err)
}
}
...
}
A grande questão é que o tempo de execução é quase sempre o mesmo, girando em torno de 29 minutos, mesmo que eu aumente o número de worker's e máximo de conexões abertas.
Gostaria portanto de saber o que estou esquecendo ou fazendo de errado.