Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

[Bug] Bug: Kafka Rebalance como resolver

org.apache.kafka.streams.errors.MissingSourceTopicException: One or more source topics were missing during rebalance

Alguém pode m eajudar?

@Configuration
@EnableKafkaStreams
public class ConsumerKafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers:localhost:9092}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamConfig() {
        Map props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG , "astin04.poc-kafka-streams-demo-6");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, PixSerdes.class);
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public ProducerFactory<String, PixDTO> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, PixDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    @Bean
    public ConsumerFactory<String, PixDTO> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put( JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000000);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PixDTO>
        kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, PixDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
@Service
public class PixAggregator {
    @Autowired
    public void aggregator(StreamsBuilder streamsBuilder) {

        KStream<String, PixDTO> messageStream = streamsBuilder
                .stream("astin04.poc-pix-topic", Consumed.with(Serdes.String(), PixSerdes.serdes()))
                .peek((key, value) -> System.out.println("Pix recebido (Streams) (filter): " + value.getChaveOrigem()))
                .filter((key, value) -> value.getValor() > 1000)
                .peek((key, value) -> System.out.println("Pix: " + key + " será verificado para possível frause"));

        messageStream.print(Printed.toSysOut());
        messageStream.to("astin04.poc-pix-verificacao-fraude", Produced.with(Serdes.String(), PixSerdes.serdes()));

        KTable<String, Double> aggregateStream = streamsBuilder
                .stream("astin04.poc-pix-topic-2", Consumed.with(Serdes.String(), PixSerdes.serdes()))
                .peek((key, value) -> System.out.println("Pix recebido (Streams) (Aggregatos): " + value.getChaveOrigem()))
                .filter((key, value) -> value.getValor() != null)
                .groupBy((key, value) -> value.getChaveOrigem())
                .aggregate(
                        () -> 0.0,
                        (key, value, aggregate) -> (aggregate + value.getValor()),
                        Materialized.with(Serdes.String(), Serdes.Double())
                );


        aggregateStream.toStream().print(Printed.toSysOut());
    }
}
2 respostas
solução!

Olá!

Esse erro geralmente ocorre quando o Kafka Streams não encontra todos os tópicos de origem configurados durante o processo de rebalanceamento. Aqui estão algumas sugestões para resolver esse problema:

  1. Garanta que os tópicos de origem existem: Certifique-se de que os tópicos "astin04.poc-pix-topic" e "astin04.poc-pix-topic-2" existam no seu cluster Kafka. Você pode verificar isso usando ferramentas como o console do Kafka ou usando comandos da linha de comando.

  2. Ajuste o tempo de espera do rebalanceamento: O tempo de espera pode estar muito curto para que todos os consumidores obtenham as informações do tópico durante o rebalanceamento. Tente aumentar o valor de max.poll.interval.ms no ConsumerConfig do seu ConsumerFactory para um valor maior, por exemplo, 20000000 pode ser muito pequeno.

  3. Configurações consistentes entre produtores e consumidores: Certifique-se de que as configurações de serialização e desserialização (Serdes) são consistentes entre os produtores e consumidores para os tópicos relevantes.

  4. Debugging durante o rebalanceamento: Adicione logs no seu código para rastrear o comportamento durante o rebalanceamento. Isso pode ajudar a identificar onde o processo está falhando.

  5. Verifique os logs do Kafka: Verifique os logs do Kafka para obter informações adicionais sobre o motivo pelo qual o tópico está faltando durante o rebalanceamento.

  6. Certifique-se de que o Kafka está em execução: Às vezes, problemas simples, como o Kafka não estar em execução ou não ser acessível, podem levar a esse tipo de problema.

Além disso, sugiro usar tipos genéricos ao criar o KafkaStreamsConfiguration para evitar warnings relacionados a tipos. Substitua Map por Map<String, Object>:

public KafkaStreamsConfiguration kafkaStreamConfig() {
    Map<String, Object> props = new HashMap<>();
    // ... outras configurações
    return new KafkaStreamsConfiguration(props);
}

Essas são algumas sugestões gerais. Se você puder fornecer mais informações sobre os logs de erro específicos ou sobre o ambiente em que o código está sendo executado, posso oferecer orientações mais precisas.

Obrigado pela ajuda. Deu certo!