synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
...
if (eventType == Event.EventType.None) {
switch (event.getState()) {
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
...
}
}
}
private void enterNeutralMode() {
if (state != State.NEUTRAL) {
if (LOG.isDebugEnabled()) {
LOG.debug("Entering neutral mode for " + this);
}
state = State.NEUTRAL;
appClient.enterNeutralMode();
}
}
public void enterNeutralMode() {
LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+ zkSessionTimeout + " ms if connection is not reestablished.");
// If we've just become disconnected, start a timer. When the time's up,
// we'll transition to standby.
synchronized (zkDisconnectLock) {
if (zkDisconnectTimer == null) {
zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
zkDisconnectTimer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (zkDisconnectLock) {
// Only run if the timer hasn't been cancelled
if (zkDisconnectTimer != null) {
becomeStandby();
}
}
}
}, zkSessionTimeout);
}
}
}
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
...
if (eventType == Event.EventType.None) {
switch (event.getState()) {
case Expired:
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
...
}
}
}
private void reJoinElection(int sleepTime) {
LOG.info("Trying to re-establish ZK session");
sessionReestablishLockForTests.lock();
try {
terminateConnection();
sleepFor(sleepTime);
// Should not join election even before the SERVICE is reported
// as HEALTHY from ZKFC monitoring.
if (appData != null) {
joinElectionInternal();
} else {
LOG.info("Not joining election since service has not yet been " +
"reported as healthy.");
}
} finally {
sessionReestablishLockForTests.unlock();
}
}
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
对于standby的RM,其完整的日志如下所示:
// 超时会接收到任何数据
2022-09-01 19:10:25,230 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000
// 异常捕获
2022-09-01 19:10:25,230 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 6668ms for sessionid 0x10054aa9d110000, closing socket connection and attempting reconnect
// RM的回调处理
2022-09-01 19:10:25,331 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session disconnected. Entering neutral mode...
// 触发定时器线程
2022-09-01 19:10:25,331 WARN org.apache.hadoop.yarn.server.resourcemanager.ActiveStandbyElectorBasedElectorService: Lost contact with Zookeeper. Transitioning to standby in 10000 ms if connection is not reestablished.
// ZK客户端的发送线程尝试重连
2022-09-01 19:10:26,905 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181. Will not attempt to authenticate using SASL (unknown error)
// 定时器线程触发进行状态的状态, 但当前状态已经是standby状态
2022-09-01 19:10:35,334 INFO org.apache.hadoop.yarn.server.resourcemanager.ResourceManager: Already in standby state
// 重连成功
2022-09-01 19:13:51,101 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 会话过期, 向事件回调线程队列插入会话过期的事件
2022-09-01 19:13:51,104 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired
// 回调处理, 并触发重新选举
2022-09-01 19:13:51,104 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session expired. Entering neutral mode and rejoining...
// 发送线程捕获异常
2022-09-01 19:13:51,105 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x10054aa9d110000 has expired, closing socket connection
// 重新建立连接并进行选举
2022-09-01 19:13:51,105 INFO org.apache.hadoop.ha.ActiveStandbyElector: Trying to re-establish ZK session
2022-09-01 19:13:51,109 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, initiating session
// 成功建立连接(注意会话ID不同)
2022-09-01 19:13:51,122 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server zk-0-hncscwc.network-hncscwc/172.168.1.1:2181, sessionid = 0x10054aa9d110006, negotiated timeout = 10000
// 连接成功建立的回调
2022-09-01 19:13:51,123 INFO org.apache.hadoop.ha.ActiveStandbyElector: Session connected.
private void fatalError(String errorMessage) {
LOG.error(errorMessage);
reset();
appClient.notifyFatalError(errorMessage);
}
public void notifyFatalError(String errorMessage) {
rm.getRMContext().getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
}
private class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> {
@Override
public void handle(RMFatalEvent event) {
LOG.error("Received " + event);
if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("Transitioning the resource manager to standby.");
handleTransitionToStandByInNewThread();
}
...
}
}
private void handleTransitionToStandByInNewThread() {
Thread standByTransitionThread =
new Thread(activeServices.standByTransitionRunnable);
standByTransitionThread.setName("StandByTransitionThread");
standByTransitionThread.start();
}
private class StandByTransitionRunnable implements Runnable {
// The atomic variable to make sure multiple threads with the same runnable
// run only once.
private final AtomicBoolean hasAlreadyRun = new AtomicBoolean(false);
@Override
public void run() {
// Run this only once, even if multiple threads end up triggering
// this simultaneously.
if (hasAlreadyRun.getAndSet(true)) {
return;
}
if (rmContext.isHAEnabled()) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
EmbeddedElector elector = rmContext.getLeaderElectorService();
if (elector != null) {
elector.rejoinElection();
}
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
}
}
}