Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

Kafka não cria o tópico USER_GENERATE_READING_REPORT (Invalid Topic Exception)

O Serviço ReadingReportServiceMain não consegue associar às partições de USER_GENERATE_READING_REPORT e o mesmo tópico nunca é criado, com isso, a classe BatchSendMessageServiceMain ocorre em exceção ao tentar envidar um record para o tópico para cada usuário:

BatchSendMessageServiceMain Log:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]
    at br.com.afcl.ecommerce.services.KafkaService.lambda$run$0(KafkaService.java:71)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at br.com.afcl.ecommerce.services.KafkaService.run(KafkaService.java:67)
    at br.com.afcl.ecommerce.services.AbstractService.run(AbstractService.java:51)
    at br.com.afcl.ecommerce.BatchSendMessageServiceMain.main(BatchSendMessageServiceMain.java:9)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1341)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905)
    at br.com.afcl.ecommerce.dispatcher.KafkaDispatcher.send(KafkaDispatcher.java:51)
    at br.com.afcl.ecommerce.BatchSendMessageService.parse(BatchSendMessageService.java:43)
    at br.com.afcl.ecommerce.services.KafkaService.lambda$run$0(KafkaService.java:69)
    ... 4 more
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]

Process finished with exit code 1

ReadingReportServiceMain Log:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1653934417588
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Subscribed to pattern: 'USER_GENERATE_READING_REPORT'
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Cluster ID: OC0bTxEkQFqTobMOb61naA
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Discovered group coordinator andrefelipePC:9090 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Successfully joined group with generation Generation{generationId=17, memberId='c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Finished assignment for group at generation 17: {c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468=Assignment(partitions=[])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Successfully synced group in generation Generation{generationId=17, memberId='c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Notifying assignor about the new Assignment(partitions=[])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Adding newly assigned partitions: 

Kafka Topics:

Kafka rodando com Cluster de 3 brokers (9090, 9091, 9092) e 3 partições replicadas.

1 resposta
solução!

Descobri um fato:

Em BatchSendMessageService eu enviei um record para o tópico USER_GENERATE_READING_REPORT escrito diretamente, ao invés de pegar o valor do ConsumerRecord e funcionou tanto a criação do tópico quanto o consumo do mesmo...

Antes: Insira aqui a descrição dessa imagem para ajudar na acessibilidade

Depois: Insira aqui a descrição dessa imagem para ajudar na acessibilidadeSe alguém souber o que pode ocorrer na diferença entre pegar o valor do ConsumerRecord e escrever diretamente, será muito interessante.