Solucionado (ver solução)
Solucionado
(ver solução)
3
respostas

Duvidas sobre consumo de filas com javascript

Atualmente tenho um código que le de uma fila do rabbitmq, enriquece os dados e depois envia pra outra fila. Esse processo funciona normalmente, porém preciso resolver algumas coisas:

1 - quando inicio a leitura da fila o channel fica aberto mesmo após terminar de consumir a fila, eu preciso fechar o canal e a conexão assim que a fila esteja vazia. Atualmente temos uma configuração de time out pra fechar a conexão com o rabbitmq, porém dependendo da quantidade de mensagens que esta na fila, o timeout vem altes de terminar o consumo.

2 - eu preciso de alguma forma controlar a quantidade de mensagens que eu leio por segundo na fila, ou seja, a cada x segundos ler x mensagems e esperar.

Algumas dessas necessidades é possivel?

versão do node = v16.18.1 versão do npm = 8.19.2

3 respostas

Oii, tudo bem?

Levando em conta as informações que você deu, te darei dicas: 1 - Para fechar o canal e a conexão assim que a fila estiver vazia, você pode usar o método channel.checkQueue(queueName). Este método retorna um objeto com a propriedade messageCount que indica o número de mensagens na fila. Quando esse número for 0, você pode fechar o canal e a conexão.

2 - Para controlar a quantidade de mensagens que você lê por segundo na fila, você pode usar o método channel.prefetch(count), onde count é o número de mensagens que você deseja consumir por vez. Para ler uma mensagem a cada x segundos, você pode usar a função setTimeout do JavaScript.

Por favor, lembre que essas são apenas sugestões e podem não funcionar perfeitamente para o seu caso específico.

Um abraço.

Bom dia Lorena, como vai?

Tem algum exemplo de código?

solução!

Sim! Vou dar exemplos de base, mas você precisa fazer modificações de acordo com sua necessidade:

  1. Fechar o canal e a conexão quando a fila estiver vazia:
const amqp = require('amqplib');

async function consumeQueue() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queueName = 'sua_fila';

    await channel.assertQueue(queueName);

    const consumeMessages = async () => {
        const { messageCount } = await channel.checkQueue(queueName);
        if (messageCount === 0) {
            console.log('Fila está vazia, fechando canal e conexão...');
            await channel.close();
            await connection.close();
            return;
        }

        // Processar as mensagens aqui
        console.log('Processando mensagens...');
        // Após processar as mensagens, chamar novamente a função
        consumeMessages();
    };

    consumeMessages();
}

consumeQueue().catch(console.error);
  1. Controlar a quantidade de mensagens lidas por segundo:
const amqp = require('amqplib');

async function consumeQueueWithRateLimit() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queueName = 'sua_fila';

    await channel.assertQueue(queueName);

    const prefetchCount = 1; // Defina o número de mensagens que deseja consumir por vez
    channel.prefetch(prefetchCount);

    const consumeMessages = async () => {
        // Consumir as mensagens da fila
        await channel.consume(queueName, async (message) => {
            if (message !== null) {
                // Processar a mensagem aqui
                console.log('Processando mensagem:', message.content.toString());
                // Simular processamento com setTimeout
                await new Promise(resolve => setTimeout(resolve, 1000)); // Espera 1 segundo

                // Acknowledge para remover a mensagem da fila
                channel.ack(message);
            }
        });
    };

    consumeMessages().catch(console.error);
}

consumeQueueWithRateLimit().catch(console.error);

Esse exemplos devem te ajudar a implementar as funcionalidades. Lembra de configurar corretamente o tempo de espera e o número de mensagens a serem consumidas por vez. ;)

Um abraço.