Solucionado (ver solução)
Solucionado
(ver solução)
2
respostas

Timeout ao publicar no tópico

Olá! Estou com problemas no projeto Java que faz a publicação de uma mensagem no kafka. Consegui realizar uma publicação pela CLI do kafka mas pela biblioteca Java tem acontecido a falha de timeout. Aparentemente o projeto não está conseguindo conectar-se ao servidor kafka.

Segue a mensagem de erro:

org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1427)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1069)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
    at org.example.Main.main(Main.java:17)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms.

Segue o código:

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        var producer = new KafkaProducer<String, String>(properties());

        var value = "2342,3423,2342";
        var record = new ProducerRecord<>("ECOMMERCE_NEW_ORDER", value, value);
        producer.send(record, (data, ex) -> {
            if(ex != null){
                ex.printStackTrace();
                return;
            }
            System.out.println("sucesso enviando " + data.topic() + ":::partition " + data.partition() + "/ offset "+ data.offset());
        }).get();
    }

    private static Properties properties(){
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//        properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "3000");
        return properties;
    }
}

Os últimos logs do kafka:

[2024-02-13 17:53:34,576] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-42 in 150 milliseconds for epoch 0, of which 150 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,577] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-12 in 150 milliseconds for epoch 0, of which 150 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,578] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-21 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,579] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-36 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,580] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-6 in 149 milliseconds for epoch 0, of which 149 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,581] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-43 in 147 milliseconds for epoch 0, of which 147 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,582] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-13 in 147 milliseconds for epoch 0, of which 147 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2024-02-13 17:53:34,583] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 146 milliseconds for epoch 0, of which 146 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
2 respostas
solução!

Olá, Diogo.

Tudo bem?

Pelo que você descreveu e pelo código que compartilhou, parece que o problema está na conexão entre o seu produtor Java e o servidor Kafka. A mensagem de erro "Topic ECOMMERCE_NEW_ORDER not present in metadata after 60000 ms" sugere que o produtor não está conseguindo encontrar o tópico "ECOMMERCE_NEW_ORDER" no servidor Kafka dentro do tempo limite especificado.

Existem algumas possíveis causas para este problema. Aqui estão algumas coisas que você precisa verificar:

  1. Verifique se o tópico "ECOMMERCE_NEW_ORDER" realmente existe no servidor Kafka. Você pode fazer isso usando a CLI do Kafka com o comando kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092. Se o tópico não existir, você precisará criá-lo.

  2. Verifique se o servidor Kafka está realmente rodando no endereço e porta especificados. No seu código, você especificou que o servidor Kafka está rodando em "127.0.0.1:9092". Se o servidor Kafka estiver rodando em um endereço ou porta diferente, você precisará atualizar essa configuração.

  3. Verifique se o seu produtor Java tem permissão para se conectar ao servidor Kafka. Se o servidor Kafka estiver protegido por autenticação ou se houver algum firewall bloqueando a conexão, você precisará resolver esses problemas de permissão.

  4. Verifique se o tempo limite de conexão está adequado. O tempo limite padrão é de 60 segundos (60000 ms), mas dependendo do seu ambiente, pode ser necessário aumentar esse valor.

Espero que estas sugestões possam ajudar a identificar e resolver o problema de início. Lembre-se que é difícil garantir uma solução sem ter acesso ao seu ambiente de desenvolvimento, mas estas são as causas mais comuns para o erro que você está enfrentando.

Espero ter ajudado, qualquer coisa manda aqui de novo, se o problema persistir ou se tiver algum outro erro. Bons estudos.

Olá! Obrigado pelos esclarecimentos.

Estou utilizando WSL2 para executar o kafka, por algum motivo a aplicação sendo executada pelo console do IntelliJ não estava se comunicando com o Kafka executando via WSL. Então executei a aplicação java também pelo WSL utilizando o arquivo jar gerado pelo maven. A partir daí estou conseguindo produzir as mensagens para o Kafka. Vou deixar o link do projeto que estou trabalhando: https://github.com/diogosfpinto/ecommerce-kafka

Obrigado!