4
respostas

Implementar a rotina de carga de usuários para o cadunico usando o servidor de arquivo minio.

Olá Bom dia!! Estou Implementando una rotina de carga de usuários para o cadunico usando o servidor de arquivo minio, mas ao processar os arquivos que estão no min.IO para um diretório no próprio min.IO depois de algum tempo apresenta problemas de conexão e a rotina não é finalizada!!

4 respostas
package br.gov.am.processamento;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import br.gov.am.processamento.processadores.*;
import io.minio.GetObjectArgs;
import io.minio.ListObjectsArgs;
import io.minio.MinioClient;
import io.minio.Result;
import io.minio.messages.Item;

public class FileReading {

    private static final Logger logger = LoggerFactory.getLogger(FileReading.class);
    private static String BUCKET_NAME = "carga-redegoverno-municipios";
    private static final String MINIO_URL = "http://10.20.0.107:9000";
    private static final String MINIO_ACCESS_KEY = "admin";
    private static final String MINIO_SECRET_KEY = "prodam123";
    
    private static MinioClient minioClient; // MinioClient como variável única e compartilhada

    public static void main(String[] args) {
        if (args.length > 0) {
            BUCKET_NAME = args[0]; // Nome do bucket é passado como argumento
        }

        logger.info("--------------------------------------------------------------------");
        logger.info("Processando arquivos no bucket MinIO: " + BUCKET_NAME + "... Por favor, aguarde.");

        try {
            // Inicializar o cliente MinIO (uma única vez)
            minioClient = MinioClient.builder()
                    .endpoint(MINIO_URL)
                    .credentials(MINIO_ACCESS_KEY, MINIO_SECRET_KEY)
                    .build();

            // Listar os objetos no bucket
            Iterable<Result<Item>> items = minioClient.listObjects(
                    ListObjectsArgs.builder().bucket(BUCKET_NAME).build());

            // Processar cada arquivo no bucket
            for (Result<Item> result : items) {
                Item item = result.get();
                String fileName = item.objectName();
                logger.info("Processando arquivo: " + fileName);

                processarArquivo(fileName);
            }

        } catch (Exception e) {
            logger.error("Erro ao processar arquivos no MinIO", e);
        }
    }

    private static void processarArquivo(String fileName) {
        int lineNumber = 0;

        try (BufferedReader br = new BufferedReader(new InputStreamReader(
                minioClient.getObject(GetObjectArgs.builder()
                        .bucket(BUCKET_NAME)
                        .object(fileName)
                        .build()), StandardCharsets.ISO_8859_1))) {

            String line;
            while ((line = br.readLine()) != null) {
                lineNumber++;
                String tipoRegistro = line.substring(37, 39);
                TipoRegistroEnum tipoDoRegistro = TipoRegistroEnum.find(tipoRegistro);
                TipoRegistro processo = null;

                switch (tipoDoRegistro) {
                    case REG00:
                        processo = new ProcessoRegistro00();
                        break;
                    case REG01:
                        processo = new ProcessoRegistro01("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG02:
                        processo = new ProcessoRegistro02("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG03:
                        processo = new ProcessoRegistro03("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG04:
                        processo = new ProcessoRegistro04("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG05:
                        processo = new ProcessoRegistro05("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG06:
                        processo = new ProcessoRegistro06("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG07:
                        processo = new ProcessoRegistro07("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG08:
                        processo = new ProcessoRegistro08("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG09:
                        processo = new ProcessoRegistro09("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG11FS1:
                        processo = new ProcessoRegistro11FS1("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG15FS1:
                        processo = new ProcessoRegistro15FS1("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG12FS2:
                        processo = new ProcessoRegistro12FS2("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG13PENDOCORF:
                        processo = new ProcessoRegistro13PENDOCORF("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG14PENDOCOP:
                        processo = new ProcessoRegistro14PENDOCP("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG16:
                        processo = new ProcessoRegistro16("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG17:
                        processo = new ProcessoRegistro17("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG18:
                        processo = new ProcessoRegistro18("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG19:
                        processo = new ProcessoRegistro19("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG20:
                        processo = new ProcessoRegistro20("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG21:
                        processo = new ProcessoRegistro21("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG98:
                        processo = new ProcessoRegistro98("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    case REG99:
                        processo = new ProcessoRegistro99("minio://" + BUCKET_NAME, minioClient, BUCKET_NAME);
                        break;
                    default:
                        logger.warn("Tipo de registro desconhecido: " + tipoRegistro);
                        break;
                }

                if (processo != null) {
                    processo.processaLinha(line);
                }
            }

        } catch (Exception ex) {
            logger.error("Erro na linha: " + lineNumber + ", no arquivo " + fileName, ex);
        }
    }
}
O Registro01:

package br.gov.am.processamento.processadores;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import org.beanio.StreamFactory;
import org.beanio.Unmarshaller;
import org.beanio.builder.StreamBuilder;

import br.gov.am.processamento.model.Registro01;
import io.minio.GetObjectArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;

public class ProcessoRegistro01 extends ProcessoBasico implements TipoRegistro {
    private final MinioClient minioClient;  
    private final String bucketName;

    public ProcessoRegistro01(String filePath, MinioClient minioClient, String bucketName) {
        super(filePath);
        this.minioClient = minioClient;
        this.bucketName = bucketName;
    }

    StreamFactory factory = StreamFactory.newInstance();

    @Override
    public void processaLinha(String linha) {
        factory.define(new StreamBuilder("registro01").format("fixedlength").addRecord(Registro01.class));
        Unmarshaller unmarshaller = factory.createUnmarshaller("registro01");
        Registro01 registro01 = (Registro01) unmarshaller.unmarshal(linha);

        // Nome do arquivo no bucket MinIO
        String fileKey = "csv/registro01.csv";

        // Processar o arquivo no MinIO
        processaArquivo(fileKey, registro01.toString());
    }

    public void processaArquivo(String fileKey, String linha) {
        try {
            // Ler o conteúdo atual do arquivo no MinIO
            StringBuilder existingContent = new StringBuilder();
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(
                    minioClient.getObject(GetObjectArgs.builder()
                            .bucket(bucketName)
                            .object(fileKey)
                            .build()), StandardCharsets.ISO_8859_1))) {

                String existingLine;
                while ((existingLine = reader.readLine()) != null) {
                    existingContent.append(existingLine).append("\n");
                }
            } catch (Exception e) {
                // Se o arquivo não existir, apenas adiciona a nova linha
                System.out.println("Arquivo não encontrado no MinIO, será criado: " + fileKey);
            }

            // Adicionar a nova linha ao conteúdo existente
            existingContent.append(linha).append("\n");

            // Upload do arquivo atualizado para o MinIO
            try (ByteArrayInputStream inputStream = new ByteArrayInputStream(existingContent.toString().getBytes(StandardCharsets.ISO_8859_1))) {
                minioClient.putObject(
                        PutObjectArgs.builder()
                                .bucket(bucketName)
                                .object(fileKey)
                                .stream(inputStream, existingContent.length(), -1)
                                .build()
                );
            }

            System.out.println("Linha processada e enviada para o arquivo: " + fileKey);
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Erro ao processar arquivo no MinIO: " + e.getMessage());
        }
    }
}

Oi, Jhony! Como vai?

O problema pode estar relacionado à conexão persistente com o MinIO.

O MinIO pode estar fechando a conexão por timeout ou limite de conexões simultâneas. Para evitar esse problema, tente usar um único cliente MinioClient compartilhado e garanta o fechamento correto dos streams após a leitura dos arquivos.

Ajuste seu código assim:

import io.minio.MinioClient;
import io.minio.errors.MinioException;
import java.io.IOException;

public class MinioConnection {
    private static MinioClient minioClient;

    public static MinioClient getClient() {
        if (minioClient == null) {
            minioClient = MinioClient.builder()
                .endpoint("http://10.20.0.107:9000")
                .credentials("admin", "prodam123")
                .build();
        }
        return minioClient;
    }

    public static void closeClient() {
        try {
            if (minioClient != null) {
                minioClient = null; // Destrói a instância ao final
            }
        } catch (Exception e) {
            System.err.println("Erro ao fechar conexão: " + e.getMessage());
        }
    }
}

Agora, no seu código principal, utilize MinioConnection.getClient() para garantir uma única conexão ativa.

Além disso, verifique se há exceções de timeout ou conexões ativas não encerradas, pois isso pode impedir a finalização da rotina corretamente. Se o problema persistir, verifique o log do MinIO para mensagens como "Too many open connections" ou "Idle connection timeout".

Espero ter ajudado!