consumer.seek(topicPartition, offset);有人用过吗？*****************java.lang.IllegalStateException: No current assignment for partition HighAvailabilityTest-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134) at com.masai.kafka.ConsumerOnce.run(ConsumerOnce.java:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
上一条: 查看偏移量，报 WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead.
发表于: 1年前 最后更新时间: 1年前 游览量:6362
In this blog post, I’d like to focus the attention on how “automatic” and “manual” partition assignments can interfere with each other — and even break things. I’d like to give an advice on using them in the right way avoiding to mix them in the same scenario or being aware of what you are doing.
The Consumer Group Experience
In Apache Kafka, the consumer group concept is a way of achieving two things:
- Having consumers as part of the same consumer group means providing the“competing consumers” pattern with whom the messages from topic partitions are spread across the members of the group. Each consumer receives messages from one or more partitions (“automatically” assigned to it) and the same messages won’t be received by the other consumers (assigned to different partitions). In this way, we can scale the number of the consumers up to the number of the partitions (having one consumer reading only one partition); in this case, a new consumer joining the group will be in an idle state without being assigned to any partition.
- Having consumers as part of different consumer groups means providing the “publish/subscribe” pattern where the messages from topic partitions are sent to all the consumers across the different groups. It means that inside the same consumer group, we’ll have the rules explained above, but across different groups, the consumers will receive the same messages. It’s useful when the messages inside a topic are of interest for different applications that will process them in different ways. We want all the interested applications to receive all the same messages from the topic.
Another great advantage of consumers grouping is the rebalancing feature. When a consumer joins a group, if there are still enough partitions available (i.e. we haven’t reached the limit of one consumer per partition), a re-balancing starts and the partitions will be reassigned to the current consumers, plus the new one. In the same way, if a consumer leaves a group, the partitions will be reassigned to the remaining consumers.
What I have told so far it’s really true using the method provided by the KafkaConsumer API. This method forces you to assign the consumer to a consumer group, setting the property, because it’s needed for re-balancing. In any case, it’s not the consumer's choice to decide the partitions it wants to read for. In general, the first consumer joins the group doing the assignment while other consumers join the group.
How Things Can Be Broken
Other than using the method, there is another way for a consumer to read from topic partitions: the method. In this case, the consumer is able to specify the topic partitions it wants to read for.
This type of approach can be useful when you know exactly where some specific messages will be written (the partition) and you want to read directly from there. Of course, you lose the re-balancing feature in this case, which is the first big difference in using the subscribe method.
Another difference is that with “manual” assignment, you can avoid specifying a consumer group (i.e. the property) for the consumer — it will be just empty. In any case, it’s better to specify it.
Most people use the subscribe method, leveraging the “automatic” assignment and re-balancing feature. Using both of these methods can break things, as we're about to see.
Imagine having a single “test” topic with only two partitions (P0 and P1) and a consumer C1 that subscribes to the topic as part of the consumer group G1. This consumer will be assigned to both the partitions receiving messages from them. Now, let’s start a new consumer C2 that is configured to be part of the same consumer group G1 but it uses the assign method to ask partitions P0 and P1 explicitly.
Now we have broken something! ...but what is it?
Both C1 and C2 will receive messages from the topic from both partitions P0 and P1, but they are part of the same consumer group G1! So we have “broken” what we said in the previous paragraph about “competing consumers” when they are part of the same consumer group. You experience a “publish/subscribe” pattern, but with consumers within the same consumer group.
What About Offsets Commits?
Generally, you should avoid a scenario like the one described above. Starting from version 0.8.2.0, the offsets committed by the consumers aren’t saved in Zookeeper but on a partitioned and replicated topic named , which is hosted on the Kafka brokers in the cluster.
When a consumer commits some offsets (for different partitions), it sends a message to the broker to the topic. The message has the following structure :
- key = [group, topic, partition]
- value = offset
Coming back to the previous scenario... what does it mean?
Having C1 and C2 as part of the same consumer group but being able to receive from the same partitions (both P0 and P1) would look something like the following:
- C1 commits offset X for partition P0 writing a message like this:
- key = [G1, “test”, P0], value = X
- C2 commits offset Y for partition P0 writing a message like this:
- key = [G1, “test”, P0], value = Y
C2 has overwritten the committed offset for the same partition P0 of the consumer C1 and maybe X was less than Y. If C1 crashes and restarts, it will lose messages starting to read from Y (remember Y > X).
Something like that can’t happen with consumers which use only the subscribe way for being assigned to partitions because as part of the same consumer group they’ll receive different partitions so the key for the offset commit message will be always different.
Update: As a confirmation that mixing subscribe and assign isn’t a good thing to do, after a discussion with one of my colleagues, Henryk Konsek, it turned out that if you try to call both methods on the same consumer, the client library throws the following exception:
The consumer groups mechanism in Apache Kafka works really well. Leveraging it for scaling consumers and having “automatic” partitions assignment with rebalancing is a great plus. There are cases in which you would need to assign partitions “manually” but in those cases, pay attention to what could happen if you mix both solutions.