Olá! Estou com problemas no projeto Java que faz a publicação de uma mensagem no kafka. Consegui realizar uma publicação pela CLI do kafka mas pela biblioteca Java tem acontecido a falha de timeout. Aparentemente o projeto não está conseguindo conectar-se ao servidor kafka.
Segue a mensagem de erro:
org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1427)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1069)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
at org.example.Main.main(Main.java:17)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.
Segue o código:
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var producer = new KafkaProducer<String, String>(properties());
var value = "2342,3423,2342";
var record = new ProducerRecord<>("ECOMMERCE_NEW_ORDER", value, value);
producer.send(record, (data, ex) -> {
if(ex != null){
ex.printStackTrace();
return;
}
System.out.println("sucesso enviando " + data.topic() + ":::partition " + data.partition() + "/ offset "+ data.offset());
}).get();
}
private static Properties properties(){
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000");
return properties;
}
}
Os últimos logs do kafka:
[2024-02-13 17:53:34,576] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-42 in 150 milliseconds for epoch 0, of which 150 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,577] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-12 in 150 milliseconds for epoch 0, of which 150 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,578] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-21 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,579] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-36 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,580] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-6 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,581] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-43 in 147 milliseconds for epoch 0, of which 147 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,582] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-13 in 147 milliseconds for epoch 0, of which 147 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,583] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 146 milliseconds for epoch 0, of which 146 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)