CDH通过api进行服务实例级别操作

前言

在日常的CDH集群管理工作中,大部分管理人员都是通过登录ClouderaManager进行操作,通常这种操作方式并无大问题,但若是某个时刻出现异常,而管理人员又不方便登录ClouderaManager,那故障就会持续一段时间,严重的会影响业务。实际上,ClouderaManager本身已提供相对丰富的API,管理人员可根据API对CDH服务进行不同方式的操作,下文中将呈现如何调用cm_api对CDH服务实例进行启停、配置更改操作。

实现

脚本程序以python语言编写,调用的cm_api也是官方提供的python api。cm_api结构参考如下:
图1. cm_api结构
脚本中需要ClouderaManager登录信息,保存在clouderaconfig.ini:

[CM]
cm.host=172.10.100.20
cm.port=7180
admin.user=admin
admin.password=admin
cluster.name=cluster1

程序主体cm_operator_role.py:

#!/usr/bin/env python
# encoding: utf-8

import argparse
import ConfigParser
import datetime
import cfg_role as _CFG
from time import sleep
from cm_api.api_client import ApiResource
from cm_api.endpoints import services
from cm_api.endpoints import roles

# Prep for reading config props from external file
CONFIG = ConfigParser.ConfigParser()
CONFIG.read("clouderaconfig.ini")

CM_HOST = CONFIG.get("CM", "cm.host")
CM_PORT = CONFIG.get("CM", "cm.port")
CM_USER = CONFIG.get("CM", "admin.user")
CM_PASSWORD = CONFIG.get("CM", "admin.password")
CLUSTER_NAME = CONFIG.get("CM", "cluster.name")

# get a handle on the instance of CM that we have running
api = ApiResource(CM_HOST, username=CM_USER, password=CM_PASSWORD, version=19)


class InitService:
    def __init__(self, cluster, service):
        self.cluster = cluster
        self.service = service

    def init_cluster(self):
        cnt = 0
        retry = 5
        interval = 5
        while cnt < retry:
            cnt += 1
            try:
                cluster = api.get_cluster(self.cluster)
                return cluster
            except Exception:
                print('Connect cluster:[%s] failed, will retry after %s seconds' % (self.cluster, interval))
                sleep(interval)

    def init_service(self):
        cluster = self.init_cluster()
        return cluster.get_service(self.service)


class GetClusterInfo:
    def __init__(self, cluster, service):
        self.cluster = cluster
        self.service = service

    @staticmethod
    def getClustersInfo():
        print("Clusters Name: ")

        return [c.name for c in api.get_all_clusters()]

    def getServicesInfo(self):
        cluster = api.get_cluster(self.cluster)
        print("Services Name: ")

        return [s.name for s in cluster.get_all_services()]

    def getServicesTypeInfo(self):
        _service = services.get_service(api, cluster_name=self.cluster, name=self.service)

        return _service.get_role_types()


class OperatorRole:
    def __init__(self, cluster, service):
        self.cluster = cluster
        self.service = service

    @staticmethod
    def get_hostId(hostName):
        hosts = api.get_all_hosts()
        host_dict = {h.hostname: h.hostId for h in hosts}
        if hostName in host_dict.keys():
            return host_dict.get(hostName)
        else:
            print("%s not in cluster." % hostName)
            exit(1)

    def check_srv_type(self, typeName):
        _service = services.get_service(api, cluster_name=self.cluster, name=self.service)
        if str(typeName).upper() in list(_service.get_role_types()):
            return True
        else:
            print("Input typeName Error, Please Retry Input.")
            exit(1)

    def get_role_name(self, typeName, hostName):
        hostId = self.get_hostId(hostName)
        service = InitService(self.cluster, self.service).init_service()

        # return the service and roleName, roleName like: DataNodeb9a0e8e1adc548f9bed251f6f186bf61
        if self.check_srv_type(typeName):
            for role in service.get_all_roles():
                if role.hostRef.hostId == hostId and role.type == str(typeName).upper():
                        return service, role.name

    def operator_role(self, typeName, hostName, cmd):
        service, role = self.get_role_name(typeName, hostName)
        role_name = {"items": [role]}

        # excute cmd to the role instance
        print("%s %s Instance On: %s " % (cmd, typeName, [role]))
        service._role_cmd(cmd=cmd, roles=role_name)


class OperatorRoleConfig:
    def __init__(self, cluster, service):
        self.cluster = cluster
        self.service = service

    def getRole(self, typeName, hostName):
        service, role = OperatorRole(self.cluster, self.service).get_role_name(typeName, hostName)
        _role = roles.get_role(resource_root=api, cluster_name=self.cluster, service_name=self.service, name=role)

        return _role

    def getRoleConfig(self, typeName, hostName):
        role = self.getRole(typeName, hostName)
        cur_config = role.get_config()

        print("Current %s Role Config:" % typeName)
        print("==> " + str(cur_config))

        return cur_config

    def updateRoleConfig(self, typeName, hostName):
        role = self.getRole(typeName, hostName)

        cur_role_cfg = self.getRoleConfig(typeName, hostName)
        add_role_cfg = _CFG.ROLE_CFG

        # get the diffrent between new config to old config
        dif_cfg = dict(set(add_role_cfg.items()) - set(cur_role_cfg.items()))

        # update config to role instance
        role.update_config(dif_cfg)

        print("Update %s Role Ronfig:" % typeName)
        print("==> " + str(dif_cfg))

        # restart the instance of update config
        OperatorRole(self.cluster, self.service).operator_role(typeName, hostName, 'restart')


def init_args():
    parse = argparse.ArgumentParser(description='脚本参数')
    parse.add_argument("-getCluster", help='GET CDH cluster name.', action="store_true")
    parse.add_argument("-getService", help='GET service name, 必须带有<-clusterName>参数', action="store_true")
    parse.add_argument("-getServiceType", help='GET service type name, 必须带有<-clusterName> <-serviceName>参数.', action="store_true")
    parse.add_argument("-getRoleConfig", help='GET service role config, 必须带有<-clusterName> <-serviceName> <-typeName> <-hostName>参数.', action="store_true")
    parse.add_argument("-clusterName", '-cn', type=str, default=None, help='operator CDH cluster name. eg: cluster1')
    parse.add_argument("-serviceName", '-sn', type=str, default=None, help='operator service name. eg: kafka')
    parse.add_argument("-typeName", '-tn', type=str, default=None, help='operator service type b=name. eg: KAFKA_BROKER')
    parse.add_argument("-hostName", '-hn', type=str, default=None, help='operator service instance on host name. eg: cdh1')
    parse.add_argument("-cmd", '-c', type=str, default=None, help='对service role执行操作, 必须带有<-clusterName> <-serviceName> <-typeName> <-hostName> 参数才能执行, cmd eg: [start | stop | restart]')
    parse.add_argument("-updateRoleConfig", '-uc', help='根据cfg_role.py参数项刷到服务实例配置中, 并重启, 必须带上<-clusterName> <-serviceName> <-typeName> <-hostName> 参数才能执行', action="store_true")
    args = parse.parse_args()
    return args


if __name__ == '__main__':
    # print(datetime.datetime.now())

    args = init_args()

    if args.getCluster:
        print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getClustersInfo()))

    if all([args.getService, args.clusterName]):
        print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getServicesInfo()))

    if all([args.getServiceType, args.clusterName, args.serviceName]):
        print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getServicesTypeInfo()))

    if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.cmd]):
        OperatorRole(args.clusterName, args.serviceName).operator_role(args.typeName, args.hostName, args.cmd)

    if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.getRoleConfig]):
        OperatorRoleConfig(args.clusterName, args.serviceName).getRoleConfig(args.typeName, args.hostName)

    if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.updateRoleConfig]):
        OperatorRoleConfig(args.clusterName, args.serviceName).updateRoleConfig(args.typeName, args.hostName)

    # print(datetime.datetime.now())

若是需要对服务实例的配置做更改,需变更的参数在cfg_role.py中填写,程序会跟当前的服务实例参数做比对,进行覆盖式更新,所以传参前需确认好变更参数无误,cfg_role.py:

#!/usr/bin/env python
#coding = utf-8
ROLE_CFG = {"dfs_datanode_data_dir_perm": "755",
            "dfs_data_dir_list": "/data2/dn"
}

使用场景

具体能用在什么地方呢?比如:当前有一个上百台的集群,每个DataNode节点挂载11块机械硬盘,且因成本原因考虑不是用全新的机器,假设故损率20%以上,每天都有盘故障。这时候就可以根据监控情况,一旦检测到盘损坏,自动调用此脚本程序对CDH的HDFS DataNode服务配置进行更改,去除异常机械硬盘,并重启生效,整个过程无需管理人员手动参与。除此之外,可对集群的性能进行指标量化并进行分析,评估是否对服务实例进行参数调整,解除运行瓶颈或隐患。


版权声明:本文为u010543388原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。