Solucionado (ver solução)
Solucionado
(ver solução)
3
respostas

Definições importantes

Olá, tem alguns pontos que ainda não ficaram muito claros pra mim.:

1 - Supondo que eu produza uma mensagem em um tópico e tenha dois consumidores de grupos distintos ouvindo este topico. A mensagem chegará para ambos? Se sim, qual a distinção e benefício de trabalhar com os grupos?

2 - O particionamento do tópico serve para auxiliar no rebalanceamento das mensagens. Neste sentido, se por algum motivo eu quiser que algum consumidor só escute determinada partição. É possível? Faz sentido em quais casos?

3 - Com relação ao offset, está clara a questão do earliest e latest. No entanto, caso eu queira posicionar o consumidor para ler a partir de um offset específico de uma partição específica, como posso fazer?

4 - Pela app em Java, fico claro pra mim a verificação da mensagem commitada, porém não entendi se via linha de comando no próprio kafka é possível verificar mensagens ainda não commitadas.

5 - Se por algum motivo eu perder uma mensagem, como posso saber qual offset e de qual partição essa mensagem foi perdida?

Desde já agradeço :)

3 respostas

Pessoal, alguém pode me auxiliar?

Olá, André! Perguntas bem interessantes! Vou tentar ajudar.

1 - Supondo que eu produza uma mensagem em um tópico e tenha dois consumidores de grupos distintos ouvindo este topico. A mensagem chegará para ambos? Se sim, qual a distinção e benefício de trabalhar com os grupos?

Sim, a mensagem chegaria para ambos. Para ilustrar o benefício, imagine uma loja online: depois de um pagamento confirmado, é necessário gerar uma nota fiscal e iniciar a entrega. Poderíamos ter um tópico "pagamentosConfirmados" com um consumer group "notaFiscal" para instâncias de um serviço de geração de notas fiscais e outro "entrega", para instâncias de um serviço de entregas.

Fiz uma live no Hipsters.Talks que falei sobre isso, usando a CLI: https://www.youtube.com/watch?v=aaCqcX30pzc

2 - O particionamento do tópico serve para auxiliar no rebalanceamento das mensagens. Neste sentido, se por algum motivo eu quiser que algum consumidor só escute determinada partição. É possível? Faz sentido em quais casos?

É possível usando um assign/seekToEnd.

Por exemplo:

int partition = 0;
TopicPartition topicPartition = new TopicPartition("vendas", partition);
consumer.assign(Collections.singleton(topicPartition));
consumer.seekToEnd(topicPartition);

Creio que faria sentido no cenário de sua próxima pergunta.

Referência: https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

3 - Com relação ao offset, está clara a questão do earliest e latest. No entanto, caso eu queira posicionar o consumidor para ler a partir de um offset específico de uma partição específica, como posso fazer?

Poderíamos fazer um assign/seek passando um offset específico, reconsumindo todas as mensagens a partir desse offset:

Por exemplo:

int partition = 0;
TopicPartition topicPartition = new TopicPartition("vendas", partition);
consumer.assign(Collections.singleton(topicPartition));
consumer.seek(topicPartition);

Isso é bem útil em alguns cenários:

  • se você corrigiu um bug e quer reprocessar N mensagens
  • se você quer retreinar um modelo de Machine Learning a partir de um determinado offset de uma partição específica

4 - Pela app em Java, fico claro pra mim a verificação da mensagem commitada, porém não entendi se via linha de comando no próprio kafka é possível verificar mensagens ainda não commitadas.

Pelo que sei, não tem como fazer isso pela linha de comando. Olhei no código do consumer e parece não estar entre as opções permitidas: https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

5 - Se por algum motivo eu perder uma mensagem, como posso saber qual offset e de qual partição essa mensagem foi perdida?

Essa realmente não sei.

No lado do consumer, é possível minimizar a possibilidade de falhas fazendo o commit do offset manualmente.

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
, "false");
consumer.commitSync();
solução!

Oi Alexandre, muito obrigado pelos esclarecimentos :)