Solucionado (ver solução)
Solucionado
(ver solução)
4
respostas

Stacktrace como header customizado nas mensagens enviadas para DLQ

Existe alguma forma de ao enviar uma mensagem para DLQ ( após feitos os retries conforme a configuração recomendada no curso ) adicionar automaticamente o stacktrace do erro nos headers da mensagem ?

Cheguei a ver algo com o RepublishMessageRecoverer ( que olhando a implementação até define uns headers com o stack trace), mas não funcionou.

Cheguei até a não configurar retries no application.properties e usar um RabbitListenerErrorHandler para adicionar headers customizados e publicar a mensagem diretamente na fila de erros após x tentativas ( contabilizadas através de headers customizados ), só que isso parece meio torto também.

Sem o stack trace nos headers da mensagem fica bem difícil de identificar o motivo dos erros em um cenário onde temos milhares de erros.

4 respostas

Olá Rafael! Tudo bem?

Uma abordagem que pode ajudar é utilizar o RepublishMessageRecoverer em conjunto com um RetryTemplate e um RetryPolicy customizado. Dessa forma, você pode configurar o RepublishMessageRecoverer para adicionar o stacktrace nos headers antes de republicar a mensagem na DLQ.

Aqui está um exemplo prático de como você pode fazer isso:

import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer.HeaderNames;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.retry.RetryTemplate;
import org.springframework.amqp.rabbit.retry.SimpleRetryPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                               RabbitTemplate rabbitTemplate) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAdviceChain(RetryInterceptorBuilder.stateless()
                .retryPolicy(simpleRetryPolicy())
                .recoverer(republishMessageRecoverer(rabbitTemplate))
                .build());
        return factory;
    }

    @Bean
    public SimpleRetryPolicy simpleRetryPolicy() {
        return new SimpleRetryPolicy(3); // Número de tentativas de retry
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "exchange.dlq", "routing.key.dlq");
        recoverer.setHeaderNames(new HeaderNames("x-exception-stacktrace", "x-exception-message", "x-original-exchange", "x-original-routingKey"));
        return recoverer;
    }
}

Neste exemplo, o RepublishMessageRecoverer é configurado para adicionar o stacktrace do erro no header x-exception-stacktrace antes de republicar a mensagem na DLQ. Você pode ajustar os nomes dos headers conforme necessário.

Além disso, certifique-se de que sua configuração de filas e exchanges esteja correta para que as mensagens sejam encaminhadas para a DLQ conforme esperado, sem falar de ajustes com base na sua estrutura de desenvolvimento.

Espero ter ajudado e bons estudos!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓.

Então,

O problema dessa abordagem é que fica 1 única DLQ e DLE fixas para todas as filas

No caso eu gostaria de 1 DLQ para cada fila padrão

Exemplo

fila padrão: nome: stock.event.auth.user.created exchange: stock (topic) routing key: event.auth.user.created

fila de erro: nome: stock.event.auth.user.created.error exchange: stock.dlq (direct)routing key: event.auth.user.created.error

fila padrão: nome: notifications.event.auth.user.# exchange: notifications (topic) routing key: event.auth.user.#

fila de erro: nome: notifications.event.auth.user.#.error exchange: notifications.dlq (direct)routing key: event.auth.user.#.error

Motivos dessa estrutura de filas:

  • facilitar a análise e o reprocessamento por fila de erro
  • aplicar politicas para descarte de mensagens por fila de erro (em caso de muitos erros em que a quantidade de mensagens possa comprometer o cluster)
  • isolamento dos erros por serviço ( ou módulo )

Se reparar as filas estão com a seguinte estrutura

nome da fila: "nome do serviço consumidor".event."nome do serviço produtor"."nome do agregado"."fato ocorrido"

Isso facilita manter um padrão na nomenclatura

Também tenho uma exchange intermediária ( events ) que roteia as mensagens entre as exchanges de cada serviço ( ou cada módulo se tratando de um monolito modular ).

Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Consegui dessa forma: application.properties

solução!

Consegui dessa forma:

//application.properties spring.rabbitmq.host=${BROKER_HOST:localhost} spring.rabbitmq.port=5672 spring.rabbitmq.username=${BROKER_USER:guest} spring.rabbitmq.password=${BROKER_PASSWORD:guest} spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=1s spring.rabbitmq.listener.simple.retry.multiplier=2 spring.rabbitmq.listener.simple.retry.max-attempts=3 spring.rabbitmq.listener.simple.default-requeue-rejected=false

//configuração do rabbit



import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessagingContext {
    public static final String AMQP_ERROR_HANDLER = "amqpErrorHandler";

    @Bean("eventsExchange")
    public TopicExchange exchange() {
        return new TopicExchange("events");
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public MessagingConfig identityMessagingConfig(AmqpAdmin admin, HandlersConfig config) {
        var messagingConfig = new MessagingConfig(admin);
        for (var module : config.config().modules()) {
            messagingConfig.withModule(module);
        }
        return messagingConfig;
    }

    @Bean(AMQP_ERROR_HANDLER)
    public RabbitListenerErrorHandler handler() {
        return (amqpMessage, message, e) -> {
            var errorIndex = amqpMessage.getMessageProperties().getHeaders().keySet().stream()
            .filter(h-> h.startsWith("x-exception[")).count();
            amqpMessage.getMessageProperties().getHeaders().put("x-exception["+errorIndex+"]", e.getStackTrace());
            amqpMessage.getMessageProperties().getHeaders().put("x-exception-message["+errorIndex+"]", e.getCause().getMessage());
            return amqpMessage;
        };
    }

    @Bean
    MessageRecoverer recoverer(AmqpTemplate amqpTemplate) {
        return (message, error) -> {
            var exchange = message.getMessageProperties().getReceivedExchange() + ".error";
            var queue = message.getMessageProperties().getReceivedRoutingKey() + ".error";
            amqpTemplate.send(exchange, queue, message);
        };
    }
}

Dai usou os retries da configuração, mas forneci uma implementação customizada da interface do MessageRecoverer. Assim consegui jogar a mensagem pra miha fila de erro.

Valeu pela ajuda Armano!