Os Consumers FraudDetectorService e FraudDetectorService1 estão consumindo as mesmas mensagens, não está havendo uma divisão das mensagens para um e para outro.
/home/xxxxxxxxxxx/Imagens/Capturas de tela/Captura de tela de 2024-03-18 11-24-44.png
Os Consumers FraudDetectorService e FraudDetectorService1 estão consumindo as mesmas mensagens, não está havendo uma divisão das mensagens para um e para outro.
/home/xxxxxxxxxxx/Imagens/Capturas de tela/Captura de tela de 2024-03-18 11-24-44.png
public class FraudDetectorService {
public static void main(String[] args) {
var consumer = new KafkaConsumer<String, String>(properties());
consumer.subscribe(Collections.singletonList("ECOMMERCE_NEW_ORDER"));
while(true) {
var records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
System.out.println("Encontrei " + records.count() + " registros");
for (var record : records) {
System.out.println("------------------------------------------");
System.out.println("Processing new order, checking for fraud");
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.offset());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// ignoring
e.printStackTrace();
}
System.out.println("Order processed");
}
}
}
}
private static Properties properties() {
var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FraudDetectorService.class.getSimpleName());
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, FraudDetectorService.class.getSimpleName() + "-" + UUID.randomUUID().toString());
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
return properties;
}
}
public class FraudDetectorService1 {
public static void main(String[] args) {
var consumer = new KafkaConsumer<String, String>(properties());
consumer.subscribe(Collections.singletonList("ECOMMERCE_NEW_ORDER"));
while(true) {
var records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
System.out.println("Encontrei " + records.count() + " registros");
for (var record : records) {
System.out.println("------------------------------------------");
System.out.println("Processing new order, checking for fraud");
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.offset());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// ignoring
e.printStackTrace();
}
System.out.println("Order processed");
}
}
}
}
private static Properties properties() {
var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FraudDetectorService1.class.getSimpleName());
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, FraudDetectorService1.class.getSimpleName() + "-" + UUID.randomUUID().toString());
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
return properties;
}
}
public class NewOrderMain {
public static void main(String[] args) throws ExecutionException, InterruptedException {
var producer = new KafkaProducer<String, String>(properties());
for ( var i = 0; i < 100; i ++) {
var key = UUID.randomUUID().toString();
var value = key + ",67523,1234";
var record = new ProducerRecord<>("ECOMMERCE_NEW_ORDER", key, value);
Callback callback = (data, ex) -> {
if (ex != null) {
ex.printStackTrace();
return;
}
System.out.println("sucesso enviando " + data.topic() + ":::partition " + data.partition() + "/ offset " + data.offset() + "/ timestamp " + data.timestamp());
};
var email = "Thank you for your order! We are processing your order!";
var emailRecord = new ProducerRecord<>("ECOMMERCE_SEND_EMAIL", email, email);
producer.send(record, callback).get();
producer.send(emailRecord, callback).get();
}
}
private static Properties properties() {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
}
Oi, Ricardo! Tudo bem contigo?
No Kafka, para que diferentes consumers consumam mensagens de forma balanceada e exclusiva dentro de um mesmo tópico, eles devem pertencer ao mesmo consumer group. Quando consumers pertencem a grupos diferentes, cada um deles vai receber todas as mensagens do tópico, pois o Kafka entende que são aplicações diferentes que precisam processar todas as mensagens de forma independente.
Se a intenção é ter um balanceamento de carga, onde cada mensagem é processada por apenas um consumer, você deve garantir que ambos os consumers estejam configurados com o mesmo group.id
. Isso fará com que eles cooperem entre si para consumir as mensagens de forma balanceada.
Aqui está um exemplo prático de como configurar o group.id
em um consumer do Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "meu-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
No exemplo acima, todos os consumers que tiverem o "group.id"
configurado para o "meu-consumer-group"
vão cooperar entre si.
Se a dúvida persistir, estamos aqui.
Abraços e bons estudos!