HBase 1.1.3 balance相关源码分析 一

HMaster类中与balance相关部分


1、初始化

//balancer作为HMaster的一个成员变量
LoadBalancer balancer;  
//ClusterStatusChore 这个会定时去执行balancer
private ClusterStatusChore clusterStatusChore;

//在HMaster的initializeZKBasedSystemTrackers中函数实例化一个balancer
//具体为LoadBalancerFactory.getLoadBalancer得到一个balancer实例(默认是得到StochasticLoadBalancer类型的balancer,如果有配置hbase.master.loadbalancer.class,则实例化一个这个配置对应的类实例)
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);

//在HMster的finishActiveMasterInitialization函数中初始化balancer
//initialize load balancer
    this.balancer.setClusterStatus(getClusterStatus());
    this.balancer.setMasterServices(this);
    this.balancer.initialize();

//定时任务去调用执行balancer,如果没有配置hbase.balancer.statusPeriod则是间隔时间是60000,
//如果配置了则按照配置的hbase.balancer.statusPeriod时间。
    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
	getChoreService().scheduleChore(clusterStatusChore);

2、balance函数
//在HMaster中有一个balance函数


  public boolean balance() throws IOException {
    // if master not initialized, don't run balancer.
    if (!this.initialized) {
      LOG.debug("Master has not been initialized, don't run balancer.");
      return false;
    }
    // Do this call outside of synchronized block.
    int maximumBalanceTime = getBalancerCutoffTime();
    synchronized (this.balancer) {
      // If balance not true, don't run balancer.
      if (!this.loadBalancerTracker.isBalancerOn()) return false;
      // Only allow one balance run at at time.
      if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
        Map<String, RegionState> regionsInTransition =
          this.assignmentManager.getRegionStates().getRegionsInTransition();
        LOG.debug("Not running balancer because " + regionsInTransition.size() +
          " region(s) in transition: " + org.apache.commons.lang.StringUtils.
            abbreviate(regionsInTransition.toString(), 256));
        return false;
      }
      if (this.serverManager.areDeadServersInProgress()) {
        LOG.debug("Not running balancer because processing dead regionserver(s): " +
          this.serverManager.getDeadServers());
        return false;
      }


      if (this.cpHost != null) {
        try {
			//this.cpHost.preBalance()里面会触发BaseMasterObserver中的 
			//preBalance(ObserverContext<MasterCoprocessorEnvironment> ctx)
          if (this.cpHost.preBalance()) {
            LOG.debug("Coprocessor bypassing balancer request");
            return false;
          }
        } catch (IOException ioe) {
          LOG.error("Error invoking master coprocessor preBalance()", ioe);
          return false;
        }
      }
		//this.assignmentManager.getRegionStates().getAssignmentsByTable()会根据配置如果配置的是
		//hbase.master.loadbalance.bytable为true则返回结构每个表会有一个mapentry(TableName就是表的名称)
		//如果没有配置hbase.master.loadbalance.bytable或者为false则所有表会返回一个mapentry(表名为TableName.valueOf("ensemble"))
      Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
        this.assignmentManager.getRegionStates().getAssignmentsByTable();


      List<RegionPlan> plans = new ArrayList<RegionPlan>();
      //Give the balancer the current cluster state.
      this.balancer.setClusterStatus(getClusterStatus());
      for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
        List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
        if (partialPlans != null) plans.addAll(partialPlans);
      }
      long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
      int rpCount = 0;  // number of RegionPlans balanced so far
      long totalRegPlanExecTime = 0;
      if (plans != null && !plans.isEmpty()) {
        for (RegionPlan plan: plans) {
          LOG.info("balance " + plan);
          long balStartTime = System.currentTimeMillis();
          //TODO: bulk assign
          this.assignmentManager.balance(plan);
          totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
          rpCount++;
          if (rpCount < plans.size() &&
              // if performing next balance exceeds cutoff time, exit the loop
              (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
            //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
            LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
              maximumBalanceTime);
            break;
          }
        }
      }
      if (this.cpHost != null) {
        try {
          this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
        } catch (IOException ioe) {
          // balancing already succeeded so don't change the result
          LOG.error("Error invoking master coprocessor postBalance()", ioe);
        }
      }
    }
    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
    // Return true indicating a success.
    return true;
  }


  
  


  
  
  

版权声明:本文为gua___gua原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。