O Serviço ReadingReportServiceMain não consegue associar às partições de USER_GENERATE_READING_REPORT e o mesmo tópico nunca é criado, com isso, a classe BatchSendMessageServiceMain ocorre em exceção ao tentar envidar um record para o tópico para cada usuário:
BatchSendMessageServiceMain Log:
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]
at br.com.afcl.ecommerce.services.KafkaService.lambda$run$0(KafkaService.java:71)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at br.com.afcl.ecommerce.services.KafkaService.run(KafkaService.java:67)
at br.com.afcl.ecommerce.services.AbstractService.run(AbstractService.java:51)
at br.com.afcl.ecommerce.BatchSendMessageServiceMain.main(BatchSendMessageServiceMain.java:9)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1341)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1005)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:905)
at br.com.afcl.ecommerce.dispatcher.KafkaDispatcher.send(KafkaDispatcher.java:51)
at br.com.afcl.ecommerce.BatchSendMessageService.parse(BatchSendMessageService.java:43)
at br.com.afcl.ecommerce.services.KafkaService.lambda$run$0(KafkaService.java:69)
... 4 more
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["USER_GENERATE_READING_REPORT"]
Process finished with exit code 1
ReadingReportServiceMain Log:
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 37edeed0777bacb3
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1653934417588
[main] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Subscribed to pattern: 'USER_GENERATE_READING_REPORT'
[main] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Cluster ID: OC0bTxEkQFqTobMOb61naA
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Discovered group coordinator andrefelipePC:9090 (id: 2147483647 rack: null)
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Request joining group due to: need to re-join with the given member-id
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] (Re-)joining group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Successfully joined group with generation Generation{generationId=17, memberId='c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Finished assignment for group at generation 17: {c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468=Assignment(partitions=[])}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Successfully synced group in generation Generation{generationId=17, memberId='c1f36cdf-2cac-48f9-8133-80972fa40846-bbbb5cb8-cec8-47e3-8d52-769471c0c468', protocol='range'}
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Notifying assignor about the new Assignment(partitions=[])
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=c1f36cdf-2cac-48f9-8133-80972fa40846, groupId=ReadingReportService] Adding newly assigned partitions:
Kafka Topics:
Kafka rodando com Cluster de 3 brokers (9090, 9091, 9092) e 3 partições replicadas.