1
resposta

Erro....FraudDetectorService. BEGIN_OBJECT - Serialização customizada....

Boa tarde a todos,

Estou tomando o seguinte erro:

Caused by: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $

Eu entendi que, esse erro ocorre quando há uma inconsistência no tipo esperado para desserialização dos dados no Kafka....mas eu não to conseguindo corrigir, podem me ajudar?

FraudDetectorService.java


import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Map;

public class FraudDetectorService {

    public static void main(String[] args) {
        var fraudService = new FraudDetectorService();
        try (var service = new KafkaService<>(FraudDetectorService.class.getSimpleName(),
                "ECOMMERCE_NEW_ORDER",
                fraudService::parse,
                Order.class,
                Map.of())) {
            service.run();
        }
    }


    private void parse(ConsumerRecord<String, Order> record) {
        System.out.println("------------------------------------------");
        System.out.println("Processing new order, checking for fraud");
        System.out.println(record.key());
        System.out.println(record.value());
        System.out.println(record.partition());
        System.out.println(record.offset());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // ignoring
            e.printStackTrace();
        }
        System.out.println("Order processed");
    }

}

KafkaService.java


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;

class KafkaService<T> implements Closeable {
    private final KafkaConsumer<String, T> consumer;
    private final ConsumerFunction parse;

    KafkaService(String groupId, String topico, ConsumerFunction parse, Class<T> type, Map<String,String> properties) {
        this(parse, groupId, type, properties);
        consumer.subscribe(Collections.singletonList(topico));
    }

    KafkaService(String groupId, Pattern topico, ConsumerFunction parse, Class<T> type, Map<String,String> properties) {
        this(parse, groupId, type, properties);
        consumer.subscribe(topico);
    }

    private KafkaService(ConsumerFunction parse, String groupId, Class<T> type, Map<String, String> properties) {
        this.parse = parse;
        this.consumer = new KafkaConsumer<>(getProperties(type, groupId, properties));
    }

    void run() {
        while (true) {
            var records = consumer.poll(Duration.ofMillis(100));
            if (!records.isEmpty()) {
                System.out.println("Encontrei " + records.count() + " registros");
                for (var record : records) {
                    parse.consume(record);
                }
            }
        }
    }

    private Properties getProperties(Class<T> type, String groupId, Map<String,String> propriedades) {
        var properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GsonDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
        properties.setProperty(GsonDeserializer.TYPE_CONFIG, type.getName());
        properties.putAll(propriedades);
        return properties;
    }

    @Override
    public void close() {
        consumer.close();
    }

EmailService e LogService não está dando erro.

1 resposta

Oi

O erro específico que você mencionou, java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $, indica que o Kafka está esperando um objeto JSON, mas está recebendo uma string na serialização.

Vamos examinar o trecho de código onde ocorre a desserialização no KafkaService.java:

properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GsonDeserializer.class.getName());

Aqui, você está usando um GsonDeserializer para desserializar os valores. Parece que este é o ponto onde o problema está ocorrendo. Certifique-se de que a estrutura dos dados no tópico Kafka ECOMMERCE_NEW_ORDER está de acordo com a classe Order que você está tentando desserializar.

A classe Order deve ter uma estrutura correspondente à mensagem que está sendo enviada para o tópico. Se a mensagem contém uma string e o GsonDeserializer está configurado para esperar um objeto, isso pode causar o erro que você está enfrentando.

Verifique se a classe Order está corretamente mapeada para a estrutura da mensagem. Se a mensagem no tópico contém apenas uma string, talvez seja mais apropriado usar StringDeserializer em vez de GsonDeserializer. Por exemplo:

properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

Se a mensagem no tópico é de fato um objeto JSON e a classe Order representa corretamente sua estrutura, então precisamos garantir que a desserialização seja feita corretamente. Você pode fornecer mais detalhes sobre a estrutura da mensagem no tópico e a definição da classe Order para uma análise mais precisa.

Além disso, você pode adicionar logs de depuração para verificar o conteúdo das mensagens recebidas. Modifique o método parse em FraudDetectorService para incluir logs que exibam o conteúdo da mensagem antes da tentativa de desserialização:

private void parse(ConsumerRecord<String, Order> record) {
    System.out.println("------------------------------------------");
    System.out.println("Processing new order, checking for fraud");
    System.out.println("Record key: " + record.key());
    System.out.println("Record value: " + record.value());
    System.out.println("Record partition: " + record.partition());
    System.out.println("Record offset: " + record.offset());

    try {
        // Seu código de desserialização aqui
        // ...
    } catch (Exception e) {
        // Adicione logs de erro aqui
        e.printStackTrace();
    }

    System.out.println("Order processed");
}

Dessa forma, você pode verificar os logs para garantir que o conteúdo da mensagem seja o que você espera. Se possível, compartilhe uma amostra da mensagem no tópico para que possamos fornecer uma orientação mais específica.