Estou tendo problema ao testar a classe EmailService do curso de Kafka (produtores, consumidores e streams) onde ao executar a classe Email service ela não consegue deserializar a mensagem, essa e a classe: public class EmailService {
public static void main(String[] args) {
var emailService = new EmailService();
try (var service = new KafkaService<>(EmailService.class.getSimpleName(),
"ECOMMERCE_SEND_EMAIL",
emailService::parse,
String.class,
new HashMap<>())) {
service.run();
}
}
private void parse(ConsumerRecord<String, String> record) {
System.out.println("------------------------------------------");
System.out.println("Send email");
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.offset());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignoring
e.printStackTrace();
}
System.out.println("Email sent");
}
} esse e o erro: Exception in thread "main" org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition ECOMMERCE_SEND_EMAIL-0 at offset 30. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:331) at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:283) at org.apache.kafka.clients.consumer.internals.FetchCollector.fetchRecords(FetchCollector.java:168) at org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:134) at org.apache.kafka.clients.consumer.internals.Fetcher.collectFetch(Fetcher.java:145) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.pollForFetches(LegacyKafkaConsumer.java:693) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:617) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.poll(LegacyKafkaConsumer.java:590) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874) at br.com.aclecio.KafkaService.run(KafkaService.java:38) at br.com.aclecio.EmailService.main(EmailService.java:16) Caused by: com.google.gson.JsonSyntaxException: java.lang.IllegalStateException: Expected a string but was BEGIN_OBJECT at line 1 column 2 path $ at com.google.gson.Gson.fromJson(Gson.java:975) at com.google.gson.Gson.fromJson(Gson.java:928) at com.google.gson.Gson.fromJson(Gson.java:877) at com.google.gson.Gson.fromJson(Gson.java:848) at br.com.aclecio.GsonDeserializer.deserialize(GsonDeserializer.java:28) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:321) ... 10 more Caused by: java.lang.IllegalStateException: Expected a string but was BEGIN_OBJECT at line 1 column 2 path $ at com.google.gson.stream.JsonReader.nextString(JsonReader.java:824) at com.google.gson.internal.bind.TypeAdapters$15.read(TypeAdapters.java:380) at com.google.gson.internal.bind.TypeAdapters$15.read(TypeAdapters.java:368) at com.google.gson.Gson.fromJson(Gson.java:963)