Raft协议
Raft是分布式系统中的一种共识算法,用于在集群中选举Leader管理集群。Raft协议中有以下角色:
Leader(领导者):集群中的领导者,负责管理集群。
Candidate(候选者):具有竞选Leader资格的角色,如果集群需要选举Leader,节点需要先转为候选者角色才可以发起竞选。
Follower(跟随者 ):Leader的跟随者,接收和处理来自Leader的消息,与Leader之间保持通信,如果通信超时或者其他原因导致节点与Leader之间通信失败,节点会认为集群中没有Leader,就会转为候选者发起竞选,推荐自己成为Leader。
Raft协议中还有一个Term(任期)的概念,任期是随着选举的举行而变化,一般是单调进行递增,比如说集群中当前的任期为1,此时某个节点发现集群中没有Leader,开始发起竞选,此时任期编号就会增加为2,表示进行了新一轮的选举。一般会为Term较大的那个节点进行投票,当某个节点收到的投票数达到了Quorum,一般是 集群中的节点数/2 + 1
,将会被选举为Leader。
Elasticsearch选主
Elasticsearch在7.0版本以前采用Bully算法进行选主,7.0以后使用了Raft协议,但没有完全按照Raft协议来实现,而是做了一些调整,ES选主流程如下:
在ES启动节点的时候,会调用Coordinator的 startInitialJoin
方法开启选举:
// Node
public class Node implements Closeable {
public Node start() throws NodeValidationException {
// ...
// 启动集群选举
coordinator.startInitialJoin();
// ...
}
}
// Coordinator
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
public void startInitialJoin() {
synchronized (mutex) {
// 先转为候选者
becomeCandidate("startInitialJoin");
}
// 启动选举任务
clusterBootstrapService.scheduleUnconfiguredBootstrap();
}
}
成为候选节点
becomeCandidate
方法主要做一些Leader选举的前置工作:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
void becomeCandidate(String method) {
// 判断是否持有锁
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])", method,
getCurrentTerm(), mode, lastKnownLeader);
// 如果不是CANDIDATE
if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
// 设置为CANDIDATE
mode = Mode.CANDIDATE;
cancelActivePublication("become candidate: " + method);
//...
// 如果之前是Leader
if (prevMode == Mode.LEADER) {
// 清除Master相关信息
cleanMasterService();
}
// ...
}
// 更新PreVoteCollector里面记录的leader节点和Term信息,这里还没有选举出leader,所以传入的是null
preVoteCollector.update(getPreVoteResponse(), null);
}
private PreVoteResponse getPreVoteResponse() {
// 创建PreVoteResponse,记录当前Term、上一次接受的Term和上一次接受的版本
return new PreVoteResponse(
getCurrentTerm(),
coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedState().version()
);
}
}
PreVoteCollector
的二元组如下,DiscoveryNode为leader节点, PreVoteResponse
记录了Term相关信息,其他节点发起选举时,返回给发起者的投票结果就是 PreVoteResponse
:
public class PreVoteCollector {
// 二元组
private volatile Tuple state;
public void update(final PreVoteResponse preVoteResponse, @Nullable final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
// 初始化状态信息
state = new Tuple<>(leader, preVoteResponse);
}
}
Leader选举
在 scheduleUnconfiguredBootstrap
方法中,对节点是否有Master角色权限进行了判断,如果没有Master角色权限,直接返回终止选举,否则启动选举任务,获取集群中发现的节点,调用 startBootstrap
开始启动:
public class ClusterBootstrapService {
scheduleUnconfiguredBootstrap() {
if (unconfiguredBootstrapTimeout == null) {
return;
}
// Master角色权限校验
if (transportService.getLocalNode().isMasterNode() == false) {
return;
}
logger.info(
"no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] "
+ "unless existing master is discovered",
unconfiguredBootstrapTimeout
);
// 执行启动任务
transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
// 获取集群中发现的节点
final Set discoveredNodes = getDiscoveredNodes();
logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
// 启动
startBootstrap(discoveredNodes, emptyList());
}
// ...
});
}
}
在 startBootstrap
方法中,首先判断探测到的集群节点discoveryNodes是否有Master角色权限,然后调用 doBootstrap
进行启动。
在 doBootstrap
方法中,创建了 VotingConfiguration
,然后调用 votingConfigurationConsumer
触发选举,并进行了异常捕捉,如果出现异常进行重试:
public class ClusterBootstrapService {
private void startBootstrap(Set discoveryNodes, List unsatisfiedRequirements) {
// 判断发现的节点是否有Master角色权限
assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
if (bootstrappingPermitted.compareAndSet(true, false)) {
// 启动
doBootstrap(
// 创建VotingConfiguration
new VotingConfiguration(
Stream.concat(
discoveryNodes.stream().map(DiscoveryNode::getId),
unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s)
).collect(Collectors.toSet())
)
);
}
}
private void doBootstrap(VotingConfiguration votingConfiguration) {
assert transportService.getLocalNode().isMasterNode();
try {
// 触发投票
votingConfigurationConsumer.accept(votingConfiguration);
} catch (Exception e) {
logger.warn(() -> "exception when bootstrapping with " + votingConfiguration + ", rescheduling", e);
// 如果出现异常,进行重试
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC, new Runnable() {
@Override
public void run() {
doBootstrap(votingConfiguration);
}
// ...
});
}
}
}
votingConfigurationConsumer
是一个函数式编程接口,它接收一个表达式,在 Coordinator
的构造函数中可以看到对 ClusterBootstrapService
进行实例化时,传入的是 setInitialConfiguration
方法,所以 votingConfigurationConsumer.accept(votingConfiguration)
会执行 Coordinator
的 setInitialConfiguration
方法:
public class ClusterBootstrapService {
// votingConfigurationConsumer
private final Consumer votingConfigurationConsumer;
public ClusterBootstrapService(
Settings settings,
TransportService transportService,
Supplier> discoveredNodesSupplier,
BooleanSupplier isBootstrappedSupplier,
Consumer votingConfigurationConsumer
) {
//...
// 设置votingConfigurationConsumer
this.votingConfigurationConsumer = votingConfigurationConsumer;
}
}
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
public Coordinator(
// ...
) {
// ...
// 初始化ClusterBootstrapService
this.clusterBootstrapService = new ClusterBootstrapService(
settings,
transportService,
this::getFoundPeers,
this::isInitialConfigurationSet,
this::setInitialConfiguration // 传入setInitialConfiguration方法
);
// ...
}
}
setInitialConfiguration
方法的处理逻辑如下:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
synchronized (mutex) {
// 获取集群状态
final ClusterState currentState = getStateForMasterService();
// 判断是否初始化过
if (isInitialConfigurationSet()) {
logger.debug("initial configuration already set, ignoring {}", votingConfiguration);
return false;
}
// 校验Master角色权限
if (getLocalNode().isMasterNode() == false) {
logger.debug("skip setting initial configuration as local node is not a master-eligible node");
throw new CoordinationStateRejectedException(
"this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"
);
}
// 如果节点ID中不包含当前节点的ID
if (votingConfiguration.getNodeIds().contains(getLocalNode().getId()) == false) {
logger.debug("skip setting initial configuration as local node is not part of initial configuration");
throw new CoordinationStateRejectedException("local node is not part of initial configuration");
}
// ...
// 判断节点个数是否达到Quorum
if (votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).toList()) == false) {
// ...
throw new CoordinationStateRejectedException(
"not enough nodes discovered to form a quorum in the initial configuration "
+ "[knownNodes="
+ knownNodes
+ ", "
+ votingConfiguration
+ "]"
);
}
// ...
// 更新
preVoteCollector.update(getPreVoteResponse(), null);
// 开始选举
startElectionScheduler();
return true;
}
}
}
startElectionScheduler
方法用于启动选举任务,任务是异步执行的:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;
// 校验Master角色权限
if (getLocalNode().isMasterNode() == false) {
return;
}
final TimeValue gracePeriod = TimeValue.ZERO;
// 启动选举任务
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
// 如果是CANDIDATE节点
if (mode == Mode.CANDIDATE) {
// 获取之前的集群状态
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
if (localNodeMayWinElection(lastAcceptedState) == false) {
logger.trace("skip prevoting as local node may not win election: {}", lastAcceptedState.coordinationMetadata());
return;
}
// 获取集群状态信息
final StatusInfo statusInfo = nodeHealthService.getHealth();
// 如果处于UNHEALTHY状态
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("skip prevoting as local node is unhealthy: [{}]", statusInfo.getInfo());
return;
}
if (prevotingRound != null) {
prevotingRound.close();
}
// 发起投票
prevotingRound = preVoteCollector.start(lastAcceptedState, getDiscoveredNodes());
}
}
}
// ...
});
}
}
PreVoteCollector
的start方法中,创建了 PreVotingRound
,然后调用 PreVotingRound
的start的方法发起投票:
public class PreVoteCollector {
public Releasable start(final ClusterState clusterState, final Iterable broadcastNodes) {
// 创建PreVotingRound
PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
// 发起投票
preVotingRound.start(broadcastNodes);
return preVotingRound;
}
}
PreVotingRound
是 PreVoteCollector
的内部类,在start方法中,会遍历探测到的集群节点,然后进行遍历,向每一个节点发送 PRE_VOTE
投票请求,投票请求响应信息处理是在 handlePreVoteResponse
方法中处理的:
public class PreVoteCollector {
private class PreVotingRound implements Releasable {
PreVotingRound(final ClusterState clusterState, final long currentTerm) {
// 集群状态
this.clusterState = clusterState;
// 构建投票请求
preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm);
}
void start(final Iterable broadcastNodes) {
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
// 遍历发现的节点,当前节点向每一个节点发送投票请求
broadcastNodes.forEach(
// 发送PRE_VOTE请求
n -> transportService.sendRequest(
n,
REQUEST_PRE_VOTE_ACTION_NAME,
preVoteRequest,
new TransportResponseHandler() {
// ...
@Override
public void handleResponse(PreVoteResponse response) {
// 处理返回的响应
handlePreVoteResponse(response, n);
}
// ...
}
)
);
}
}
}
在 PreVoteCollector
的构造函数中可以看到,注册了 REQUEST_PRE_VOTE_ACTION_NAME
请求处理器,对 PRE_VOTE
请求的处理是调用 handlePreVoteRequest
方法进行的,处理完毕后调用 sendResponse
返回响应信息:
public class PreVoteCollector {
// 选举任务
private final Runnable startElection;
// 更新最大Term
private final LongConsumer updateMaxTermSeen;
PreVoteCollector(
final TransportService transportService,
final Runnable startElection,
final LongConsumer updateMaxTermSeen,
final ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService
) {
this.transportService = transportService;
this.startElection = startElection;
this.updateMaxTermSeen = updateMaxTermSeen;
this.electionStrategy = electionStrategy;
this.nodeHealthService = nodeHealthService;
// 注册PRE_VOTE请求处理器
transportService.registerRequestHandler(
REQUEST_PRE_VOTE_ACTION_NAME,
Names.CLUSTER_COORDINATION,
false,
false,
PreVoteRequest::new,
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)) // 调用handlePreVoteRequest处理请求
);
}
}
在 handlePreVoteRequest
之前,首先看 Coordinator
的构造函数对 PreVoteCollector
实例化时传入的参数,主要关注 startElection
和 updateMaxTermSeen
,它们都是函数式编程接口,从实例化的代码中可以看到分别对应 Coordinator
的传入的 startElection
和 updateMaxTermSeen
方法,在后面会用到这两个方法:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
public Coordinator(
// ...
){
// ...
this.preVoteCollector = new PreVoteCollector(
transportService,
this::startElection, // 传入startElection方法,启动选举
this::updateMaxTermSeen, // 传入updateMaxTermSeen,更新收到的最大Term
electionStrategy,
nodeHealthService
);
// ...
}
}
handlePreVoteRequest
方法处理逻辑如下:
public class PreVoteCollector {
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// 比较Term,更新maxTermSeen
updateMaxTermSeen.accept(request.getCurrentTerm());
Tuple state = this.state;
assert state != null : "received pre-vote request before fully initialised";
// 获取当前节点记录的集群Leader节点
final DiscoveryNode leader = state.v1();
// 获取当前节点的Term信息
final PreVoteResponse response = state.v2();
// 获取健康状态
final StatusInfo statusInfo = nodeHealthService.getHealth();
// 如果当前节点的状态处于UNHEALTHY
if (statusInfo.getStatus() == UNHEALTHY) {
String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
logger.debug(message);
throw new NodeHealthCheckFailureException(message);
}
// 如果leader为空,表示还没有Leader节点,返回响应同意发起投票的节点成为leader
if (leader == null) {
return response;
}
// 如果leader不为空,但是与发起请求的节点是同一个节点,同样支持发起请求的节点成为leader
if (leader.equals(request.getSourceNode())) {
return response;
}
// 其他情况,表示已经存在leader,拒绝投票请求
throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}
}
上面说过 updateMaxTermSeen
指向 Coordinator
的 updateMaxTermSeen
方法,处理逻辑如下:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
// 当前节点收到过的最大的Term与请求中的term,如果请求中的Term较大,maxTermSeen的值将被更新为请求中的Term的值
maxTermSeen = Math.max(maxTermSeen, term);
// 获取当前节点的term
final long currentTerm = getCurrentTerm();
// 如果当前节点是Leader并且maxTermSeen大于当前节点的Term,请求中的Term较大,这里maxTermSeen的值就是请求中的Term,所以也是在比较请求中的Term是否大于当前节点的Term
if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
} else {
try {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
// 确保Term是最新
ensureTermAtLeast(getLocalNode(), maxTermSeen);
// 发起选举
startElection();
} catch (Exception e) {
logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
becomeCandidate("updateMaxTermSeen");
}
}
}
}
}
}
在 ensureTermAtLeast
方法中,判断当前节点的Term是否小于请求中的Term:
- 如果是则创建StartJoinRequest然后调用
joinLeaderInTerm
方法,joinLeaderInTerm
方法会返回一个JOIN信息;
在集群选举Leader的时候,某个节点成为Leader之前,会向其他节点发送StartJoin请求,这里进行模拟发送,当前节点向自己发送一个StartJoinRequest进行处理,更新当前节点的Term信息,后面会详细讲解StartJoin请求的处理。
- 如果不是,返回一个空的JOIN信息;
在 joinLeaderInTerm
方法中,会调用 handleStartJoin
处理StartJoin请求, 它会更新当前节点Term信息为最新,之后判断当前节点是否是CANDIDATE,如果不是需要将节点转为 CANDIDATE
:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
// 判断当前节点Term是否小于请求中的Term
if (getCurrentTerm() < targetTerm) {
// 调用joinLeaderInTerm
return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm)));
}
return Optional.empty();
}
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
synchronized (mutex) {
logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
// 如果不是CANDIDATE转为CANDIDATE
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
preVoteCollector.update(getPreVoteResponse(), null);
}
return join;
}
}
}
发起者收到集群节点返回的 PRE_VOTE
请求响应时,在 handlePreVoteResponse
方法中进行处理:
public class PreVoteCollector {
private class PreVotingRound implements Releasable {
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
if (isClosed.get()) {
logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
return;
}
// 处理最大Term
updateMaxTermSeen.accept(response.getCurrentTerm());
// 如果响应中的Term大于当前节点的Term, 或者Term相等但是版本号大于当前节点的版本号
if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
return;
}
// 记录得到的投票
preVotesReceived.put(sender, response);
// ...
// 判断是否得到了大多数投票
if (electionStrategy.isElectionQuorum(
clusterState.nodes().getLocalNode(),
localPreVoteResponse.getCurrentTerm(),
localPreVoteResponse.getLastAcceptedTerm(),
localPreVoteResponse.getLastAcceptedVersion(),
clusterState.getLastCommittedConfiguration(),
clusterState.getLastAcceptedConfiguration(),
voteCollection
) == false) {
logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
return;
}
// ...
// 开始选举
startElection.run();
}
}
}
在成为Leader前,需要向集群中的节点发送 StartJoin
请求,邀请节点加入集群:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void startElection() {
synchronized (mutex) {
// 是否是CANDIDATE
if (mode == Mode.CANDIDATE) {
if (localNodeMayWinElection(getLastAcceptedState()) == false) {
logger.trace("skip election as local node may not win it: {}", getLastAcceptedState().coordinationMetadata());
return;
}
// 创建StartJoin请求,这里可以看到在请求中的Term,设置为最大Term + 1
final StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
logger.debug("starting election with {}", startJoinRequest);
// 调用sendStartJoinRequest发送StartJoin请求
getDiscoveredNodes().forEach(node -> joinHelper.sendStartJoinRequest(startJoinRequest, node));
}
}
}
}
StartJoin
请求表示邀请节点加入集群信息,接收者收到请求后会向发起者发送JOIN请求表示进行加入,所以发起者对StartJoin的响应不需要做什么处理,等待接收者发送JOIN请求即可:
public class JoinHelper {
void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
assert startJoinRequest.getSourceNode().isMasterNode()
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
// 发送START_JOIN请求
transportService.sendRequest(destination, START_JOIN_ACTION_NAME, startJoinRequest, new TransportResponseHandler.Empty() {
@Override
public void handleResponse(TransportResponse.Empty response) {
// 什么也不处理
logger.debug("successful response to {} from {}", startJoinRequest, destination);
}
@Override
public void handleException(TransportException exp) {
logger.debug(new ParameterizedMessage("failure in response to {} from {}", startJoinRequest, destination), exp);
}
});
}
}
JoinHelper的构造函数中,注册了 START_JOIN请
求处理器,在收到 START_JOIN
请求时,会调用 joinLeaderInTerm
处理,然后调用 sendJoinRequest
向发送者发送JOIN请求:
public class JoinHelper {
JoinHelper(
// ...
) {
// 注册START_JOIN_ACTION_NAME请求处理
transportService.registerRequestHandler(
START_JOIN_ACTION_NAME,
Names.CLUSTER_COORDINATION,
false,
false,
StartJoinRequest::new,
(request, channel, task) -> {
final DiscoveryNode destination = request.getSourceNode();
// 发送join请求
sendJoinRequest(destination, currentTermSupplier.getAsLong(), Optional.of(joinLeaderInTerm.apply(request))); // 调用joinLeaderInTerm处理
channel.sendResponse(Empty.INSTANCE);
}
);
}
}
joinLeaderInTerm
方法用于处理StartJoin请求, 返回一个Join对象并发送给发起者,发起者会根据返回的Join信息计算得到的票数,以此决定是否成为Leader, joinLeaderInTerm
处理逻辑如下:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
synchronized (mutex) {
logger.debug("joinLeaderInTerm: for [{}] with term {}", startJoinRequest.getSourceNode(), startJoinRequest.getTerm());
// 处理StartJoin请求
final Join join = coordinationState.get().handleStartJoin(startJoinRequest);
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
// 如果节点不是CANDIDATE,转为CANDIDATE
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
} else {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
preVoteCollector.update(getPreVoteResponse(), null);
}
return join;
}
}
}
在handleStartJoin方法中从请求中获取Term信息并更新到当前节点的CurrentTerm中:
handleStartJoin
方法中只要请求中的Term大于当前节点的Term,都会继续往下进行,最后返回一个Join对象,这意味着当前节点同意为发起者进行投票,也就是说Elasticsearch允许一个节点多次进行投票,并没有按照Raft协议中的规定每个任期内只能给一个节点投票。
public class CoordinationState {
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
// 如果StartJoin请求中的Term小于或者等于当前节点的Term,抛出异常
if (startJoinRequest.getTerm()
StartJoin
请求处理完毕后调用 sendJoinRequest
向发起者发送JOIN请求,表示加入集群:
public class JoinHelper {
public void sendJoinRequest(DiscoveryNode destination, long term, Optional optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final StatusInfo statusInfo = nodeHealthService.getHealth();
// 如果处于UNHEALTHY状态不进行发送
if (statusInfo.getStatus() == UNHEALTHY) {
logger.debug("dropping join request to [{}]: [{}]", destination, statusInfo.getInfo());
return;
}
// 构建JOIN请求体
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), term, optionalJoin);
// ...
if (pendingOutgoingJoins.putIfAbsent(dedupKey, pendingJoinInfo) == null) {
logger.debug("attempting to join {} with {}", destination, joinRequest);
pendingJoinInfo.message = PENDING_JOIN_CONNECTING;
// 连接节点
transportService.connectToNode(destination, new ActionListener<>() {
@Override
public void onResponse(Releasable connectionReference) {
// ...
clusterApplier.onNewClusterState(
"joining " + destination.descriptionWithoutAttributes(),
() -> null,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
// ....
pendingJoinInfo.message = PENDING_JOIN_WAITING_RESPONSE;
// 发送JOIN请求
transportService.sendRequest(
destination,
JOIN_ACTION_NAME,
joinRequest,
TransportRequestOptions.of(null, TransportRequestOptions.Type.PING),
new TransportResponseHandler.Empty() {
@Override
public void handleResponse(TransportResponse.Empty response) {
pendingJoinInfo.message = PENDING_JOIN_WAITING_STATE;
pendingOutgoingJoins.remove(dedupKey);
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
}
// ...
}
);
}
// ...
}
);
}
// ...
});
} else {
logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest);
}
}
}
JoinHelper
的构造函数中,注册了JOIN请求处理器,是通过joinHandler来处理请求的,它同样是函数式编程接口,在Coordinator对JoinHelper进行实例化的时候,可以看到传入的是 handleJoinRequest
方法:
public class JoinHelper {
JoinHelper(
// ...
BiConsumer> joinHandler,
// ...
) {
// ...
transportService.registerRequestHandler(
JOIN_ACTION_NAME,
Names.CLUSTER_COORDINATION,
false,
false,
JoinRequest::new,
(request, channel, task) -> joinHandler.accept(
request,
new ChannelActionListener(channel, JOIN_ACTION_NAME, request).map(ignored -> Empty.INSTANCE)
)
);
// ...
}
}
// Coordinator
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
public Coordinator(
// ...
) {
// ...
this.joinHelper = new JoinHelper(
allocationService,
masterService,
clusterApplier,
transportService,
this::getCurrentTerm,
this::handleJoinRequest, // handleJoinRequest方法
// ...
);
// ...
}
}
Coordinator的 handleJoinRequest
方法中,会对发送JOIN的节点进行连接,进行JOIN请求验证:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void handleJoinRequest(JoinRequest joinRequest, ActionListener joinListener) {
// ...
// 连接节点
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
@Override
public void onResponse(Releasable response) {
boolean retainConnection = false;
try {
// 对JOIN请求进行验证
validateJoinRequest(
joinRequest,
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) // 处理请求
);
retainConnection = true;
} catch (Exception e) {
joinListener.onFailure(e);
} finally {
if (retainConnection == false) {
Releasables.close(response);
}
}
}
// ...
});
}
}
processJoinRequest
处理逻辑如下:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void processJoinRequest(JoinRequest joinRequest, ActionListener joinListener) {
assert Transports.assertNotTransportThread("blocking on coordinator mutex and maybe doing IO to increase term");
// 获取JOIN信息
final Optional optionalJoin = joinRequest.getOptionalJoin();
try {
synchronized (mutex) {
// 更新最大Term
updateMaxTermSeen(joinRequest.getTerm());
// 获取集群协调状态
final CoordinationState coordState = coordinationState.get();
// 获取上一次的状态,是否成功选举为Leader
final boolean prevElectionWon = coordState.electionWon();
// 处理JOIN
optionalJoin.ifPresent(this::handleJoin);
joinAccumulator.handleJoinRequest(joinRequest.getSourceNode(), joinListener);
// 如果之前未成为Leader并且当前选举Leader成功
if (prevElectionWon == false && coordState.electionWon()) {
// 成为Leader
becomeLeader();
}
}
} catch (Exception e) {
joinListener.onFailure(e);
}
}
}
接下来看下 handleJoin
的处理过程:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
// 获取CoordinationState
private final SetOnce coordinationState = new SetOnce<>();
private void handleJoin(Join join) {
synchronized (mutex) {
// 确保Term最新,如果不是最新,会返回一个JOIN对象,调用handleJoin进行处理,这里可以理解为节点给自己投了一票
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);
// 如果已经被选举为Leader
if (coordinationState.get().electionWon()) {
// 调用对异常进行捕捉的handleJoin方法
final boolean isNewJoinFromMasterEligibleNode = handleJoinIgnoringExceptions(join);
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
if (isNewJoinFromMasterEligibleNode && establishedAsMaster && publicationInProgress() == false) {
scheduleReconfigurationIfNeeded();
}
} else { // 如果还未为成为Leader
// CoordinationState的handleJoin处理请求
coordinationState.get().handleJoin(join);
}
}
}
private boolean handleJoinIgnoringExceptions(Join join) {
try {
// CoordinationState的handleJoin处理请求
return coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug(() -> "failed to add " + join + " - ignoring", e);
return false;
}
}
}
在CoordinationState的handleJoin中,首先会对Term和版本信息进行一系列的校验,如果校验通过,记录收到的JOIN请求个数,表示当前已经成功收到的投票数,然后调用isElectionQuorum判断是否获得了大多数的投票,也就是获得的投票数达到了Quorum,并将值更新到 electionWon
中:
public class CoordinationState {
public boolean handleJoin(Join join) {
assert join.targetMatches(localNode) : "handling join " + join + " for the wrong node " + localNode;
// 如果收到的JOIN请求Term与当前节点的Term不一致抛出异常
if (join.getTerm() != getCurrentTerm()) {
logger.debug("handleJoin: ignored join due to term mismatch (expected: [{}], actual: [{}])", getCurrentTerm(), join.getTerm());
throw new CoordinationStateRejectedException(
"incoming term " + join.getTerm() + " does not match current term " + getCurrentTerm()
);
}
// ...
// 获取上一次的Term
final long lastAcceptedTerm = getLastAcceptedTerm();
// 如果请求中的上一次接受的Term大于当前节点的lastAcceptedTerm,抛出异常
if (join.getLastAcceptedTerm() > lastAcceptedTerm) {
logger.debug( "handleJoin: ignored join as joiner has a better last accepted term (expected: getLastAcceptedVersion()) {
logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected:
当节点收到了大多数投票后,就会调用 becomeLeader
转为Leader,这里会将节点由CANDIDATE转为LEADER角色,然后调用preVoteCollector的update更新Term和Leader节点信息:
public class Coordinator extends AbstractLifecycleComponent implements ClusterStatePublisher {
private void becomeLeader() {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
// 是否是CANDIDATE
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
// 是否有Master角色权限
assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";
logger.debug("handleJoinRequest: coordinator becoming LEADER in term {} (was {}, lastKnownLeader was [{}])", getCurrentTerm(), mode,lastKnownLeader);
// 转为Leader
mode = Mode.LEADER;
joinAccumulator.close(mode);
// 设置为LeaderJoinAccumulator
joinAccumulator = joinHelper.new LeaderJoinAccumulator();
lastKnownLeader = Optional.of(getLocalNode());
peerFinder.deactivate(getLocalNode());
clusterFormationFailureHelper.stop();
closePrevotingAndElectionScheduler();
// 更新Leader信息和Term信息
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
assert leaderChecker.leader() == null : leaderChecker.leader();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
}
}
Elasticsearch版本:8.3
Original: https://www.cnblogs.com/shanml/p/16684887.html
Author: shanml
Title: 【Elasticsearch】ES选主流程分析
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/586271/
转载文章受原作者版权保护。转载请注明原作者出处!