Boa noite!
Estou assistindo as aulas para pegar o conceito e estou replicando utilizando o docker para dar andamento nas atividades, e peguei esse warn ao subir o consumer :
WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'topico' was supplied but isn't a known config
passei um tempinho quebrando a cabeça aqui e vi que quando eu utilizo no GROUP_ID_CONFIG o metodo getSimpleName() ele retorna o nome da classe
group.id=ConsumerEvent
Troquei o value do group.id para getName() e ele trouxe o caminho do arquivo e conseguiu ler os dados do topico :
group.id=br.com.integrador.consumer.ConsumerEvent
e também consegui ler passando uma string com o nome "default" e também deu tudo certinho
group.id=default
public class ConsumerEvent {
private final KafkaConsumer<String, String> consumer;
public ConsumerEvent() throws IOException {
this.consumer = createConsumerEvent();
}
private KafkaConsumer<String, String> createConsumerEvent() throws IOException {
if (this.consumer != null) {
return this.consumer;
}
return new KafkaConsumer<String, String>(getProperties());
}
private Properties getProperties() throws IOException {
Properties properties = new Properties();
properties.load(new FileInputStream("src/main/resources/consumer.properties"));
return properties;
}
public void exec() {
List<String> topicos = new ArrayList<>();
topicos.add("INTEGRADOR-WEB");
this.consumer.subscribe(topicos);
System.out.println("Inicializando [Consumer] ...");
while (true) {
var records = this.consumer.poll(Duration.ofMillis(100));
for (var rec : records) {
System.out.printf("Chave : %s | Valor: %s | offset : %d \n", rec.key(), rec.value(), rec.offset());
}
}
}
}
Subida do serviço:
public class RunConsumerEvent { public static void main(String[] args) throws IOException { var consumer = new ConsumerEvent(); consumer.exec(); } }
Também to utilizando o KAFDROP pra conseguir ver as mensagens subindo , caso alguém queira aprender também.