org.apache.kafka.streams.errors.MissingSourceTopicException: One or more source topics were missing during rebalance
Alguém pode m eajudar?
@Configuration
@EnableKafkaStreams
public class ConsumerKafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers:localhost:9092}")
private String bootstrapAddress;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamConfig() {
Map props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG , "astin04.poc-kafka-streams-demo-6");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, PixSerdes.class);
return new KafkaStreamsConfiguration(props);
}
@Bean
public ProducerFactory<String, PixDTO> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, PixDTO> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, PixDTO> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put( JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000000);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PixDTO>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PixDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
@Service
public class PixAggregator {
@Autowired
public void aggregator(StreamsBuilder streamsBuilder) {
KStream<String, PixDTO> messageStream = streamsBuilder
.stream("astin04.poc-pix-topic", Consumed.with(Serdes.String(), PixSerdes.serdes()))
.peek((key, value) -> System.out.println("Pix recebido (Streams) (filter): " + value.getChaveOrigem()))
.filter((key, value) -> value.getValor() > 1000)
.peek((key, value) -> System.out.println("Pix: " + key + " será verificado para possível frause"));
messageStream.print(Printed.toSysOut());
messageStream.to("astin04.poc-pix-verificacao-fraude", Produced.with(Serdes.String(), PixSerdes.serdes()));
KTable<String, Double> aggregateStream = streamsBuilder
.stream("astin04.poc-pix-topic-2", Consumed.with(Serdes.String(), PixSerdes.serdes()))
.peek((key, value) -> System.out.println("Pix recebido (Streams) (Aggregatos): " + value.getChaveOrigem()))
.filter((key, value) -> value.getValor() != null)
.groupBy((key, value) -> value.getChaveOrigem())
.aggregate(
() -> 0.0,
(key, value, aggregate) -> (aggregate + value.getValor()),
Materialized.with(Serdes.String(), Serdes.Double())
);
aggregateStream.toStream().print(Printed.toSysOut());
}
}