Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

[Bug] Erro One or more source topics were missing during rebalance

Boa tarde,

Estou com o seuinte erro quando executo a aplicação pix-aggregator:

org.apache.kafka.streams.errors.MissingSourceTopicException: One or more source topics were missing during rebalance
    at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:58) ~[kafka-streams-3.3.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:329) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:478) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGr

Minhas classes de configuração:

package com.alura.pix.config;

import com.alura.pix.dto.PixDTO;
import com.alura.pix.serdes.PixSerdes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;


import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;


@Configuration
@EnableKafkaStreams
public class ProducerKafkaConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfig() {
        Map props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG , "kafka-streams-demo-6");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, PixSerdes.class);
        return new KafkaStreamsConfiguration(props);
    }


    @Value(value = "${spring.kafka.bootstrap-servers:localhost:9092}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, PixDTO> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, PixDTO> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(
                JsonDeserializer.TRUSTED_PACKAGES,
                "*");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "50000000");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Por favor, alguma idéia do que pode estar acontecendo?

Obrigado!

2 respostas

Olá,

Encontrei o causador do problema. Ao descargar o repositorio e seguir as aulas o código da classe PixConsumer da aula 3 parte 1 está assim:

KTable<String, Double> aggregateStream = streamsBuilder
                .stream("pix-topic-2", Consumed.with(Serdes.String(), PixSerdes.serdes()))
                .peek((key, value) -> System.out.println("Pix recebido: " + value.getChaveOrigem()))
                .filter((key, value) -> value.getValor() != null)
                .groupBy((key, value) -> value.getChaveOrigem())
                .aggregate(
                        () -> 0.0,
                        (key, value, aggregate) -> (aggregate + value.getValor()),
                        Materialized.with(Serdes.String(), Serdes.Double())
                );
aggregateStream.toStream().print(Printed.toSysOut());

Isso estava causando o erro. Não sei o porque porém já estou desbloqueado para continuar com o curso.

Sugestão para Alura, o curso até o momento tem um conteúdo de qualidade porém neste curso em específico disponibilizar o código pronto de todas as aulas na minha opinião é muito ruim. Por que? Porque o melhor é que o aluno acompanhe o instrutor e faça junto conforme os módulos de cada aula. Ao final de cada módulo disponibilizar o download do código até o aquele momento. Evitamos código de outra aula que ainda não foi explicado e não se sabe os efeitos colaterais si modificado.

solução!

Olá, Lauro.

Tudo bem?

Muito obrigado por voltar aqui e compartilhar a solução do problema. Parabéns por identificar e resolver o problema no código da classe PixConsumer! Sua habilidade em debugar e corrigir questões é fundamental para o aprendizado e o progresso no curso.

A sua sugestão ´muito legal e válida. Disponibilizar o código apenas ao final de cada módulo, permitindo que os alunos acompanhem o instrutor passo a passo, é uma abordagem excelente. Isso não só facilita a compreensão do conteúdo, mas também evita confusões com códigos de aulas subsequentes que ainda não foram abordadas. Sua observação é valiosa e pode contribuir para a melhoria contínua do curso.

Agradeço por compartilhar sua experiência e sugestão. Continue aproveitando o curso e, caso surjam mais desafios, manda aqui no fórum. Valeu.

Atenciosamente, Renan Lima.