1
resposta

Estou tomando ClassCastException no Order

Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to br.com.alura.ecommerce.Order at br.com.alura.ecommerce.FraudDetectorService.parse(FraudDetectorService.java:43) at br.com.alura.ecommerce.KafkaService.run(KafkaService.java:42) at br.com.alura.ecommerce.FraudDetectorService.main(FraudDetectorService.java:22)

package br.com.alura.ecommerce;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class FraudDetectorService {

    public static void main(String[] args) {

        FraudDetectorService fraudeService = new FraudDetectorService();
        try(KafkaService service = new KafkaService(
                FraudDetectorService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudeService::parse,
                Order.class,
                new HashMap<>())) {
            service.run();
        }
    }

    private final KafkaDispatcher<Order> orderDispatcher = new KafkaDispatcher<>();

    private void parse(ConsumerRecord<String, Order> record) throws ExecutionException, InterruptedException{
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for fraud");
        System.out.println(record.key());
        System.out.println(record.value());
        System.out.println(record.partition());
        System.out.println(record.offset());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // ignoring
            e.printStackTrace();
        }


        Order order = record.value();


        if(isFraud(order)) {
            // pretending that the fraud happens when the amount is >= 4500
            System.out.println("Order is a fraud!!!!!" + record.value());
            orderDispatcher.send("ECOMMERCE_ORDER_REJECTED", order.getUserId(), record.value());
        } else {
            System.out.println("Approved: " + record.value());
            orderDispatcher.send("ECOMMERCE_ORDER_APPROVED", order.getUserId(), record.value());
        }

        System.out.println("Order processed");
    }

    private boolean isFraud(Order order) {
        return order.getAmount().compareTo(new BigDecimal("4500")) >= 0;
    }


    private static Properties properties() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FraudDetectorService.class.getSimpleName());
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        return properties;
    }
}
1 resposta

Nao sei se te ajuda ainda, mas o problema é que vc nao sobreescreveu o VALUE_DESERIALIZER_CLASS_CONFIG.

Faz assim

try(KafkaService service = new KafkaService(
                FraudDetectorService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudeService::parse,
                Order.class,
                Map.of(VALUE_DESERIALIZER_CLASS_CONFIG, GsonDeserializer.class.name))

Ou algo do tipo, eu nao tenho o codigo no java, mas se quiser eu tenho isso em kotlin.

https://github.com/eduardompinto/kafka-alura/blob/main/common-kafka/src/main/kotlin/eduardompinto/alura/kafka/ecommerce/KafkaService.kt