Boa tarde a todos,
Estou tomando o seguinte erro:
Caused by: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $
Eu entendi que, esse erro ocorre quando há uma inconsistência no tipo esperado para desserialização dos dados no Kafka....mas eu não to conseguindo corrigir, podem me ajudar?
FraudDetectorService.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Map;
public class FraudDetectorService {
public static void main(String[] args) {
var fraudService = new FraudDetectorService();
try (var service = new KafkaService<>(FraudDetectorService.class.getSimpleName(),
"ECOMMERCE_NEW_ORDER",
fraudService::parse,
Order.class,
Map.of())) {
service.run();
}
}
private void parse(ConsumerRecord<String, Order> record) {
System.out.println("------------------------------------------");
System.out.println("Processing new order, checking for fraud");
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.offset());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// ignoring
e.printStackTrace();
}
System.out.println("Order processed");
}
}
KafkaService.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
class KafkaService<T> implements Closeable {
private final KafkaConsumer<String, T> consumer;
private final ConsumerFunction parse;
KafkaService(String groupId, String topico, ConsumerFunction parse, Class<T> type, Map<String,String> properties) {
this(parse, groupId, type, properties);
consumer.subscribe(Collections.singletonList(topico));
}
KafkaService(String groupId, Pattern topico, ConsumerFunction parse, Class<T> type, Map<String,String> properties) {
this(parse, groupId, type, properties);
consumer.subscribe(topico);
}
private KafkaService(ConsumerFunction parse, String groupId, Class<T> type, Map<String, String> properties) {
this.parse = parse;
this.consumer = new KafkaConsumer<>(getProperties(type, groupId, properties));
}
void run() {
while (true) {
var records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
System.out.println("Encontrei " + records.count() + " registros");
for (var record : records) {
parse.consume(record);
}
}
}
}
private Properties getProperties(Class<T> type, String groupId, Map<String,String> propriedades) {
var properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GsonDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
properties.setProperty(GsonDeserializer.TYPE_CONFIG, type.getName());
properties.putAll(propriedades);
return properties;
}
@Override
public void close() {
consumer.close();
}
EmailService e LogService não está dando erro.