-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14190: Update Zk TopicId from locally stored cache in controller #13111
Conversation
@jolshan, since you are our topic Id expert, please take a look at this change when you get a chance. I would request some urgency (if possible) since I would preferably like to have this bug fix added in 3.4 due to greater number of users facing this problem with each passing day. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This obviously needs to also be reviewed by people with more knowledge of the codebase, but the current changes make sense to me.
@@ -1698,7 +1708,7 @@ class KafkaController(val config: KafkaConfig, | |||
private def processPartitionModifications(topic: String): Unit = { | |||
def restorePartitionReplicaAssignment( | |||
topic: String, | |||
newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment] | |||
newPartitionReplicaAssignment: Set[(TopicPartition, ReplicaAssignment)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we make this a set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we changed the call to Zk from zkClient.getFullReplicaAssignmentForTopics
to zkClient.getReplicaAssignmentAndTopicIdForTopics
since the former dud not provide the topic Id. As a result of this change we needed to use flatMap
at line 1730 and hence it results in a Set
.
We can work on improving the readibility as you suggested once we have a consensus on whether we even want to make this change at all.
} | ||
onNewPartitionCreation(partitionsToBeAdded.keySet) | ||
processTopicIds(partitions) | ||
partitionsToBeAdded.foreach(tuple => controllerContext.updatePartitionFullReplicaAssignment(tuple._1, tuple._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think switching this to "tuple" makes it harder to read and less clear what the variables are.
}.toSet | ||
|
||
val setDataRequests = updatedAssignments.map { case TopicIdReplicaAssignment(topic, topicIdOpt, assignments) => | ||
SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(topicIdOpt, assignments), ZkVersion.MatchAnyVersion) | ||
}.toSeq | ||
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spacing
// Otherwise, maintain what we have in the topicZNode | ||
val updatedTopicIdAssignments = if (config.usesTopicId) { | ||
val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined) | ||
withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion) | ||
val (withTopicIds, withoutTopicIds, withLocalTopicIds) = topicIdAssignments.foldLeft((Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment], Set.empty[TopicIdReplicaAssignment])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readability is a bit tricky here. I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we go through all the partitions again to check the topic ID and add to the replica assignment I wonder if we could have kept the partition and then a separate part for the ones with local IDs. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering if there was an operation you could have done besides making singleton sets and unioning them.
I chose this approach because this was the only one where we are able to 3-way partition the topicIdAssignments
in a single iterations. All other approaches require multiple iterations.
We can work on improving this and favouring readibility once we have a consensus on whether we will proceed with this change or not.
I'm a bit concerned about cases like this. I'm also wondering if there are any cases where we actually assign the wrong ID to a topic. I think as it stands now, we isolated the issue to older clients, but once we start making kafka changes (and releasing them into new releases) we may open up the problem further. I'm also not sure if it's worth adding this complexity to the code path to handle this edge case. Finally I'm wondering the implications for upgrades and downgrades. |
It's too late for 3.4 unless it's a regression or a low risk critical fix to something new in 3.4. Have we tried encouraging users to move away from the command line switches deprecated a long time ago? |
I hate to say this, but I wonder if this should be WONTFIX. The rule for admin tools that used the Let me give an example of the general kind of problem you could have here. If you created a prefix ACL using an admin tool that had the --zookeeper flag, but your kafka version didn't support prefix ACLs, what would happen? I'm not completely sure, but nothing good. In this specific case you have old software overwriting whatever the new software has put in the znode. So it's not just topic IDs, but in general anything new that we add, that will get overwritten. We can't really support this -- we never promised to keep that znode the same forever. Stuff like this is why the --zookeeper flags were removed. We do support cross-version compatibility when using the --bootstrap-server option. I wonder if we could somehow move the ZK paths so that the old tools would fail rather than doing the wrong thing. Another option is to set ZK-level ACLs to prevent these tools from going behind Kafka's back. We did talk about this but due to the accelerated timeline for 3.4 we never implemented it. Obviously in a case where we're upgrading from ZK to KRaft we would want to avoid users doing this. |
What you say makes sense Colin. I do think its a bit tricky to make such a big code change to support folks using older and deprecated tools. I also understand the point of view of the pain this causes though. (It's caused me quite a bit of pain!) I am interested to see if there are any other options here. |
Thanks for the comments folks. I would like to break the conversation as multiple FAQs and hopefully that would address the questions and points that you have raised above. What is the motivation? Is it to make the latest versions compatible with pre 2.8 clients (for the scope of this bug) OR is it to protect the server when older clients are used? It’s the latter. Currently, the bug manifests in availability loss for the impacted topic since the topic stops replication. This is recoverable by deleting the metadata file and broker will recreate it from Zk. However, when KIP405(Tiered Storage) is merged in, it will begin to impact data integrity. This is because the metadata for a segment uses topicId as a key. When same segment for same topic is uploaded with different topic Ids, it leads to an unrecoverable situation.
I would be happy to discuss a different solution than what has been proposed in the PR which can protect the server against the above two cases.
Can the users migrate to the newer versions?
What are the alternative ways to protect the state of the server against thus bug?
Any other suggestions? |
This is not quite right. You need multiple things:
|
Yes, you are right. I already mentioned it in the description of this PR. I will update my above comment to be more specific. |
I am not super sure (still checking) but I believe https://github.com/yahoo/CMAK is a popular 3P tool that directly accesses zookeeper with older clients. |
I guess these are the two relevant issues:
Not sure how easy it would be, but contributing a fix to the project above would be really helpful. |
@divijvaidya My other concern here is that even though this fixes the issue in the case where the controller stays the same, it doesn't cover controller re-election. This means we would still have to share and support the recovery methods. If this is a big issue for tiered storage, then we could still be in trouble. |
Will this code still be around by the time tiered storage is completed? |
Also curious if we can upload a segment with the wrong ID if the leader and ISR request is blocked (and thus can't become a leader or follower) |
I don't know but my point is that this code change is simple and safe enough to add it to the current code as of today. It will prevent the non-TS topic mismatch bugs and when TS comes to upstream, the impact on it will be mitigated.
To be very precise here, this fix won't work, if the controller context does not have the old topic Id. It will only happen when controller failover took place exactly between the duration when admin overwrote Zk and controller. Note that controller failover during all other time will work fine (since controller will recreate controller context from Zk which would have been updated with oldTopicId earlier). And yes, I agree this is not a 100% fix but it's a start. Since, it's a safe fix and doesn't have side effects, we should push it out.
Great question! The topic Id mismatch check during handling of LISR request is based on matching the local topic Id in the broker with the one that is sent with LISR. However, it's very much possible to not have any topicId locally. As an example, let's say the partition reassignment leads to partition placement on a broker where log hasn't been created so far. In such cases, LISR won't throw a topic mismatch error and it won't be blocked. Instead it will start operating with new topic Id. Now, we will have some followers working with old topic Id (where LISR was blocked) and some with new topic Id. If a failover happens to the one with new topic Id, it will start uploading segments to tiered storage with new topic Id and thus, for the same topic partition, we will have segments with old topic Id as well as new topic Id. |
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. |
We're now removing ZooKeeper support, so closing |
Change
Controller should update Zk with locally cached TopicId (when available) instead of assigning a new one when Zk doesn't have a TopicId.
Motivation for this change
This problem was highlighted in KAFKA-14190 and since then, multiple users have complained about the problem HERE (mailing list), HERE (mailing list) and HERE (ASF slack channel).
Description of the problem
In certain situations, it is possible that the TopicId stored locally on a broker for a topic differs from the topicId stored for that topic on Zk. Currently, such situation arises when users use a <2.8 client to alterPartitions for a topic on a >=2.8 (including latest 3.4) brokers AND they use
--zookeeper
flag from the client. Note that--zookeeper
has been marked deprecated for a long time and has been replaced by--bootstrap-server
which doesn't face this problem.The result of topic Id discrepancy leads to availability loss for the topic until user performs the mitigation steps listed in KAFKA-14190.
The exact sequence of steps are:
TopicChange
event is created, During handling on this event, controller notices that there is no TopicId, it generated a new one and updates Zk.Testing
I have added a test with this change which asserts that TopicId for a topic is immutable i.e. once assigned, it does not change. This test fails before this change and passes after this change.
All integration tests and unit tests have been successful for me locally.
Side effects of this fix
There are no additional side effects of this change. No additional calls to Zk. We are only updating the TopicId from a locally cached value instead of assigning a new one.
Caveats
This code change does not fix the problem completely. The code change assumes that controller would have the TopicId locally so that it can update Zk but situations such as controller failover, that may not be true. More specifically, we will still end up having two different topic Ids in cases when controller failover takes place between the time when Zk TopicID was overwritten/removed and time when controller could update the TopicId with local value.
However, this code change should fix majority of the scenario that are impacted by this bug and a separate PR would be filed to fix the minority scenarios of controller failover during the exact duration.
Release
Due to the simple nature of the fix and the number of users who are impacted, I would request to consider adding this to 3.4.0 and backporting to as many previous version as we can.