HMaster类中与balance相关部分
1、初始化
2、balance函数
1、初始化
//balancer作为HMaster的一个成员变量
LoadBalancer balancer;
//ClusterStatusChore 这个会定时去执行balancer
private ClusterStatusChore clusterStatusChore;
//在HMaster的initializeZKBasedSystemTrackers中函数实例化一个balancer
//具体为LoadBalancerFactory.getLoadBalancer得到一个balancer实例(默认是得到StochasticLoadBalancer类型的balancer,如果有配置hbase.master.loadbalancer.class,则实例化一个这个配置对应的类实例)
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
//在HMster的finishActiveMasterInitialization函数中初始化balancer
//initialize load balancer
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
this.balancer.initialize();
//定时任务去调用执行balancer,如果没有配置hbase.balancer.statusPeriod则是间隔时间是60000,
//如果配置了则按照配置的hbase.balancer.statusPeriod时间。
this.clusterStatusChore = new ClusterStatusChore(this, balancer);
getChoreService().scheduleChore(clusterStatusChore);2、balance函数
//在HMaster中有一个balance函数
public boolean balance() throws IOException {
// if master not initialized, don't run balancer.
if (!this.initialized) {
LOG.debug("Master has not been initialized, don't run balancer.");
return false;
}
// Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime();
synchronized (this.balancer) {
// If balance not true, don't run balancer.
if (!this.loadBalancerTracker.isBalancerOn()) return false;
// Only allow one balance run at at time.
if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
Map<String, RegionState> regionsInTransition =
this.assignmentManager.getRegionStates().getRegionsInTransition();
LOG.debug("Not running balancer because " + regionsInTransition.size() +
" region(s) in transition: " + org.apache.commons.lang.StringUtils.
abbreviate(regionsInTransition.toString(), 256));
return false;
}
if (this.serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): " +
this.serverManager.getDeadServers());
return false;
}
if (this.cpHost != null) {
try {
//this.cpHost.preBalance()里面会触发BaseMasterObserver中的
//preBalance(ObserverContext<MasterCoprocessorEnvironment> ctx)
if (this.cpHost.preBalance()) {
LOG.debug("Coprocessor bypassing balancer request");
return false;
}
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor preBalance()", ioe);
return false;
}
}
//this.assignmentManager.getRegionStates().getAssignmentsByTable()会根据配置如果配置的是
//hbase.master.loadbalance.bytable为true则返回结构每个表会有一个mapentry(TableName就是表的名称)
//如果没有配置hbase.master.loadbalance.bytable或者为false则所有表会返回一个mapentry(表名为TableName.valueOf("ensemble"))
Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
this.assignmentManager.getRegionStates().getAssignmentsByTable();
List<RegionPlan> plans = new ArrayList<RegionPlan>();
//Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus());
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
if (partialPlans != null) plans.addAll(partialPlans);
}
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
int rpCount = 0; // number of RegionPlans balanced so far
long totalRegPlanExecTime = 0;
if (plans != null && !plans.isEmpty()) {
for (RegionPlan plan: plans) {
LOG.info("balance " + plan);
long balStartTime = System.currentTimeMillis();
//TODO: bulk assign
this.assignmentManager.balance(plan);
totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
rpCount++;
if (rpCount < plans.size() &&
// if performing next balance exceeds cutoff time, exit the loop
(System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
//TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
maximumBalanceTime);
break;
}
}
}
if (this.cpHost != null) {
try {
this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
} catch (IOException ioe) {
// balancing already succeeded so don't change the result
LOG.error("Error invoking master coprocessor postBalance()", ioe);
}
}
}
// If LoadBalancer did not generate any plans, it means the cluster is already balanced.
// Return true indicating a success.
return true;
}版权声明:本文为gua___gua原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。