Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4837: Network issue causes ephemeral node unremoved after t… #2172

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ public long loadDataBase() throws IOException {
return zxid;
}

/**
* load the database from the disk onto memory and also add
* the transactions to the committedlog in memory
* until the checkpoint zxid
*
* @return the last valid zxid on disk
* @throws IOException
*/
public long loadDataBase(long zxid) throws IOException {
long startTime = Time.currentElapsedTime();
long newZxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener, zxid);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
return newZxid;
}

/**
* Fast forward the database adding transactions from the committed log into memory.
* @return the last valid zxid.
Expand Down Expand Up @@ -602,7 +621,7 @@ public boolean truncateLog(long zxid) throws IOException {
return false;
}

loadDataBase();
loadDataBase(zxid);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,72 @@ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOExcep
return dt.lastProcessedZxid;
}

/**
* deserialize a data tree from the most recent snapshot before the checkpoint zxid
*
* @return the zxid of the snapshot
*/
public long deserialize(DataTree dt, Map<Long, Integer> sessions, long zxid) throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
long snapZxid = -1;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot {}", snap);
snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);

// if this snapshot has a higher zxid than the checkpoint zxid, skip and continue to the previous snapshot
if (snapZxid > zxid) {
continue;
}

try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
deserialize(dt, sessions, ia);
SnapStream.checkSealIntegrity(snapIS, ia);

// Digest feature was added after the CRC to make it backward
// compatible, the older code can still read snapshots which
// includes digest.
//
// To check the intact, after adding digest we added another
// CRC check.
if (dt.deserializeZxidDigest(ia, snapZxid)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}

// deserialize lastProcessedZxid and check inconsistency
if (dt.deserializeLastProcessedZxid(ia)) {
SnapStream.checkSealIntegrity(snapIS, ia);
}

foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file {}", snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = snapZxid;
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);

// compare the digest if this is not a fuzzy snapshot, we want to compare
// and find inconsistent asap.
if (dt.getDigestFromLoadedSnapshot() != null) {
dt.compareSnapshotDigests(dt.lastProcessedZxid);
}
return dt.lastProcessedZxid;
}

/**
* deserialize the datatree from an inputarchive
* @param dt the datatree to be serialized into
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,87 @@ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener l
return finalizer.run();
}

/**
* this function restores the server
* database after reading from the
* snapshots and transaction logs
* until the checkpoint zxid.
*
* @param dt the datatree to be restored
* @param sessions the sessions to be restored
* @param listener the playback listener to run on the
* database restoration
* @return the highest zxid restored
* @throws IOException
*/
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener, long zxid)
throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
long deserializeResult = snapLog.deserialize(dt, sessions, zxid);
ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
FileTxnLog txnLog = new FileTxnLog(dataDir);
boolean trustEmptyDB;
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
trustEmptyDB = autoCreateDB;
}

RestoreFinalizer finalizer = () -> {
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid;
};

if (-1L == deserializeResult) {
/*
* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325)
*/
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}

if (trustEmptyDB) {
/*
* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save()
*/
save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);

/* return a zxid of 0, since we know the database is empty */
return 0L;
} else {
/* return a zxid of -1, since we are possibly missing data */
LOG.warn("Unexpected empty data tree, setting zxid to -1");
dt.lastProcessedZxid = -1L;
return -1L;
}
}

return finalizer.run();
}

/**
* This function will fast forward the server database to have the latest
* transactions in it. This is the same as restore, but only reads from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ public interface SnapShot {
*/
long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException;

/**
* deserialize a data tree from the last valid snapshot before the checkpoint zxid and return the last zxid that was deserialized
*
* @param dt the datatree to be deserialized into
* @param sessions the sessions to be deserialized into
* @return the last zxid that was deserialized from the snapshot
* @throws IOException
*/
long deserialize(DataTree dt, Map<Long, Integer> sessions, long zxid) throws IOException;

/**
* persist the datatree and the sessions into a persistence storage
* @param dt the datatree to be serialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,98 @@ public class EphemeralNodeDeletionTest extends QuorumPeerTestBase {
private static int SERVER_COUNT = 3;
private MainThread[] mt = new MainThread[SERVER_COUNT];

/**
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-4837.
*/
@Test
@Timeout(value = 300)
public void testEphemeralNodeDeletionZK4837() throws Exception {
final int[] clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;

for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
// start all the servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
Thread.sleep(1000);
mt[i].start();
}

// ensure all servers started
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}

// Check that node 1 is the initial leader
assertEquals(1, mt[1].getQuorumPeer().getLeaderId());

CountdownWatcher watch = new CountdownWatcher();
// QuorumPeer l = getByServerState(mt, ServerState.LEADING);
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

Stat firstEphemeralNode = new Stat();

// 1: create ephemeral node
String nodePath = "/e1";
zk.create(nodePath, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, firstEphemeralNode);

// 2: Inject network error
for (int i = 0; i < SERVER_COUNT; i++) {
CustomQuorumPeer cqp = (CustomQuorumPeer) mt[i].getQuorumPeer();
cqp.setInjectError(true);
}

// 3: Quit
zk.close();

// 4: Wait until node 1 and node 2 have removed the ephemeral node
while (true) {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}

if (mt[1].getQuorumPeer().getZkDb().getSessionCount() == 0
&& mt[2].getQuorumPeer().getZkDb().getSessionCount() == 0) {

// Double check
Thread.sleep(1000);
if (mt[1].getQuorumPeer().getZkDb().getSessionCount() == 0
&& mt[2].getQuorumPeer().getZkDb().getSessionCount() == 0) {
break;
}
}
}

// 5: Remove network error
for (int i = 0; i < SERVER_COUNT; i++) {
CustomQuorumPeer cqp = (CustomQuorumPeer) mt[i].getQuorumPeer();
cqp.setInjectError(false);
}

// Node must have been deleted from 1 and 2
assertNodeNotExist(nodePath, 2);
assertNodeNotExist(nodePath, 1);

// If buggy, node is not deleted from 0
assertNodeNotExist(nodePath, 0);
}

/**
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2355.
* ZooKeeper ephemeral node is never deleted if follower fail while reading
Expand Down Expand Up @@ -156,6 +248,18 @@ public TestQPMain getTestQPMain() {
followerZK.close();
}

void assertNodeNotExist(String nodePath, int idx) throws Exception {
QuorumPeer qp = mt[idx].getQuorumPeer();

CountdownWatcher watch = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + qp.getClientPort(), ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

Stat exists = zk.exists(nodePath, false);
zk.close();
assertNull(exists, "Node must have been deleted from the node " + idx);
}

@AfterEach
public void tearDown() {
// stop all severs
Expand Down
Loading