Solucionado (ver solução)
Solucionado
(ver solução)
4
respostas

Consumers FraudDetectorService e FraudDetectorService1 estão consumindo as mesmas mensagens

Os Consumers FraudDetectorService e FraudDetectorService1 estão consumindo as mesmas mensagens, não está havendo uma divisão das mensagens para um e para outro.

/home/xxxxxxxxxxx/Imagens/Capturas de tela/Captura de tela de 2024-03-18 11-24-44.pngInsira aqui a descrição dessa imagem para ajudar na acessibilidade

4 respostas

public class FraudDetectorService {

public static void main(String[] args) {
    var consumer = new KafkaConsumer<String, String>(properties());
    consumer.subscribe(Collections.singletonList("ECOMMERCE_NEW_ORDER"));
    while(true) {
        var records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()) {
            System.out.println("Encontrei " + records.count() + " registros");
            for (var record : records) {
                System.out.println("------------------------------------------");
                System.out.println("Processing new order, checking for fraud");
                System.out.println(record.key());
                System.out.println(record.value());
                System.out.println(record.partition());
                System.out.println(record.offset());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // ignoring
                    e.printStackTrace();
                }
                System.out.println("Order processed");
            }
        }
    }
}

private static Properties properties() {
    var properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FraudDetectorService.class.getSimpleName());
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, FraudDetectorService.class.getSimpleName() + "-" + UUID.randomUUID().toString());
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
    return properties;
}

}

public class FraudDetectorService1 {

public static void main(String[] args) {
    var consumer = new KafkaConsumer<String, String>(properties());
    consumer.subscribe(Collections.singletonList("ECOMMERCE_NEW_ORDER"));
    while(true) {
        var records = consumer.poll(Duration.ofMillis(100));
        if (!records.isEmpty()) {
            System.out.println("Encontrei " + records.count() + " registros");
            for (var record : records) {
                System.out.println("------------------------------------------");
                System.out.println("Processing new order, checking for fraud");
                System.out.println(record.key());
                System.out.println(record.value());
                System.out.println(record.partition());
                System.out.println(record.offset());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // ignoring
                    e.printStackTrace();
                }
                System.out.println("Order processed");
            }
        }
    }
}

private static Properties properties() {
    var properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FraudDetectorService1.class.getSimpleName());
    properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, FraudDetectorService1.class.getSimpleName() + "-" + UUID.randomUUID().toString());
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
    return properties;
}

}

public class NewOrderMain {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    var producer = new KafkaProducer<String, String>(properties());
    
    for ( var i = 0; i < 100; i ++) {
    var key = UUID.randomUUID().toString();
    var value = key + ",67523,1234";
    var record = new ProducerRecord<>("ECOMMERCE_NEW_ORDER", key, value);
    Callback callback = (data, ex) -> {
        if (ex != null) {
            ex.printStackTrace();
            return;
        }
        System.out.println("sucesso enviando " + data.topic() + ":::partition " + data.partition() + "/ offset " + data.offset() + "/ timestamp " + data.timestamp());
    };
    var email = "Thank you for your order! We are processing your order!";
    var emailRecord = new ProducerRecord<>("ECOMMERCE_SEND_EMAIL", email, email);
    producer.send(record, callback).get();
    producer.send(emailRecord, callback).get();
    }
}

private static Properties properties() {
    var properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return properties;
}

}

solução!

Oi, Ricardo! Tudo bem contigo?

No Kafka, para que diferentes consumers consumam mensagens de forma balanceada e exclusiva dentro de um mesmo tópico, eles devem pertencer ao mesmo consumer group. Quando consumers pertencem a grupos diferentes, cada um deles vai receber todas as mensagens do tópico, pois o Kafka entende que são aplicações diferentes que precisam processar todas as mensagens de forma independente.

Se a intenção é ter um balanceamento de carga, onde cada mensagem é processada por apenas um consumer, você deve garantir que ambos os consumers estejam configurados com o mesmo group.id. Isso fará com que eles cooperem entre si para consumir as mensagens de forma balanceada.

Aqui está um exemplo prático de como configurar o group.id em um consumer do Kafka:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "meu-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

No exemplo acima, todos os consumers que tiverem o "group.id" configurado para o "meu-consumer-group" vão cooperar entre si.

Se a dúvida persistir, estamos aqui.

Abraços e bons estudos!

Caso este post tenha lhe ajudado, por favor, marcar como solucionado ✓.