前言
在日常的CDH集群管理工作中,大部分管理人员都是通过登录ClouderaManager进行操作,通常这种操作方式并无大问题,但若是某个时刻出现异常,而管理人员又不方便登录ClouderaManager,那故障就会持续一段时间,严重的会影响业务。实际上,ClouderaManager本身已提供相对丰富的API,管理人员可根据API对CDH服务进行不同方式的操作,下文中将呈现如何调用cm_api对CDH服务实例进行启停、配置更改操作。
实现
脚本程序以python语言编写,调用的cm_api也是官方提供的python api。cm_api结构参考如下:
脚本中需要ClouderaManager登录信息,保存在clouderaconfig.ini:
[CM]
cm.host=172.10.100.20
cm.port=7180
admin.user=admin
admin.password=admin
cluster.name=cluster1
程序主体cm_operator_role.py:
#!/usr/bin/env python
# encoding: utf-8
import argparse
import ConfigParser
import datetime
import cfg_role as _CFG
from time import sleep
from cm_api.api_client import ApiResource
from cm_api.endpoints import services
from cm_api.endpoints import roles
# Prep for reading config props from external file
CONFIG = ConfigParser.ConfigParser()
CONFIG.read("clouderaconfig.ini")
CM_HOST = CONFIG.get("CM", "cm.host")
CM_PORT = CONFIG.get("CM", "cm.port")
CM_USER = CONFIG.get("CM", "admin.user")
CM_PASSWORD = CONFIG.get("CM", "admin.password")
CLUSTER_NAME = CONFIG.get("CM", "cluster.name")
# get a handle on the instance of CM that we have running
api = ApiResource(CM_HOST, username=CM_USER, password=CM_PASSWORD, version=19)
class InitService:
def __init__(self, cluster, service):
self.cluster = cluster
self.service = service
def init_cluster(self):
cnt = 0
retry = 5
interval = 5
while cnt < retry:
cnt += 1
try:
cluster = api.get_cluster(self.cluster)
return cluster
except Exception:
print('Connect cluster:[%s] failed, will retry after %s seconds' % (self.cluster, interval))
sleep(interval)
def init_service(self):
cluster = self.init_cluster()
return cluster.get_service(self.service)
class GetClusterInfo:
def __init__(self, cluster, service):
self.cluster = cluster
self.service = service
@staticmethod
def getClustersInfo():
print("Clusters Name: ")
return [c.name for c in api.get_all_clusters()]
def getServicesInfo(self):
cluster = api.get_cluster(self.cluster)
print("Services Name: ")
return [s.name for s in cluster.get_all_services()]
def getServicesTypeInfo(self):
_service = services.get_service(api, cluster_name=self.cluster, name=self.service)
return _service.get_role_types()
class OperatorRole:
def __init__(self, cluster, service):
self.cluster = cluster
self.service = service
@staticmethod
def get_hostId(hostName):
hosts = api.get_all_hosts()
host_dict = {h.hostname: h.hostId for h in hosts}
if hostName in host_dict.keys():
return host_dict.get(hostName)
else:
print("%s not in cluster." % hostName)
exit(1)
def check_srv_type(self, typeName):
_service = services.get_service(api, cluster_name=self.cluster, name=self.service)
if str(typeName).upper() in list(_service.get_role_types()):
return True
else:
print("Input typeName Error, Please Retry Input.")
exit(1)
def get_role_name(self, typeName, hostName):
hostId = self.get_hostId(hostName)
service = InitService(self.cluster, self.service).init_service()
# return the service and roleName, roleName like: DataNodeb9a0e8e1adc548f9bed251f6f186bf61
if self.check_srv_type(typeName):
for role in service.get_all_roles():
if role.hostRef.hostId == hostId and role.type == str(typeName).upper():
return service, role.name
def operator_role(self, typeName, hostName, cmd):
service, role = self.get_role_name(typeName, hostName)
role_name = {"items": [role]}
# excute cmd to the role instance
print("%s %s Instance On: %s " % (cmd, typeName, [role]))
service._role_cmd(cmd=cmd, roles=role_name)
class OperatorRoleConfig:
def __init__(self, cluster, service):
self.cluster = cluster
self.service = service
def getRole(self, typeName, hostName):
service, role = OperatorRole(self.cluster, self.service).get_role_name(typeName, hostName)
_role = roles.get_role(resource_root=api, cluster_name=self.cluster, service_name=self.service, name=role)
return _role
def getRoleConfig(self, typeName, hostName):
role = self.getRole(typeName, hostName)
cur_config = role.get_config()
print("Current %s Role Config:" % typeName)
print("==> " + str(cur_config))
return cur_config
def updateRoleConfig(self, typeName, hostName):
role = self.getRole(typeName, hostName)
cur_role_cfg = self.getRoleConfig(typeName, hostName)
add_role_cfg = _CFG.ROLE_CFG
# get the diffrent between new config to old config
dif_cfg = dict(set(add_role_cfg.items()) - set(cur_role_cfg.items()))
# update config to role instance
role.update_config(dif_cfg)
print("Update %s Role Ronfig:" % typeName)
print("==> " + str(dif_cfg))
# restart the instance of update config
OperatorRole(self.cluster, self.service).operator_role(typeName, hostName, 'restart')
def init_args():
parse = argparse.ArgumentParser(description='脚本参数')
parse.add_argument("-getCluster", help='GET CDH cluster name.', action="store_true")
parse.add_argument("-getService", help='GET service name, 必须带有<-clusterName>参数', action="store_true")
parse.add_argument("-getServiceType", help='GET service type name, 必须带有<-clusterName> <-serviceName>参数.', action="store_true")
parse.add_argument("-getRoleConfig", help='GET service role config, 必须带有<-clusterName> <-serviceName> <-typeName> <-hostName>参数.', action="store_true")
parse.add_argument("-clusterName", '-cn', type=str, default=None, help='operator CDH cluster name. eg: cluster1')
parse.add_argument("-serviceName", '-sn', type=str, default=None, help='operator service name. eg: kafka')
parse.add_argument("-typeName", '-tn', type=str, default=None, help='operator service type b=name. eg: KAFKA_BROKER')
parse.add_argument("-hostName", '-hn', type=str, default=None, help='operator service instance on host name. eg: cdh1')
parse.add_argument("-cmd", '-c', type=str, default=None, help='对service role执行操作, 必须带有<-clusterName> <-serviceName> <-typeName> <-hostName> 参数才能执行, cmd eg: [start | stop | restart]')
parse.add_argument("-updateRoleConfig", '-uc', help='根据cfg_role.py参数项刷到服务实例配置中, 并重启, 必须带上<-clusterName> <-serviceName> <-typeName> <-hostName> 参数才能执行', action="store_true")
args = parse.parse_args()
return args
if __name__ == '__main__':
# print(datetime.datetime.now())
args = init_args()
if args.getCluster:
print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getClustersInfo()))
if all([args.getService, args.clusterName]):
print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getServicesInfo()))
if all([args.getServiceType, args.clusterName, args.serviceName]):
print("==> " + str(GetClusterInfo(args.clusterName, args.serviceName).getServicesTypeInfo()))
if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.cmd]):
OperatorRole(args.clusterName, args.serviceName).operator_role(args.typeName, args.hostName, args.cmd)
if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.getRoleConfig]):
OperatorRoleConfig(args.clusterName, args.serviceName).getRoleConfig(args.typeName, args.hostName)
if all([args.clusterName, args.serviceName, args.typeName, args.hostName, args.updateRoleConfig]):
OperatorRoleConfig(args.clusterName, args.serviceName).updateRoleConfig(args.typeName, args.hostName)
# print(datetime.datetime.now())
若是需要对服务实例的配置做更改,需变更的参数在cfg_role.py中填写,程序会跟当前的服务实例参数做比对,进行覆盖式更新,所以传参前需确认好变更参数无误,cfg_role.py:
#!/usr/bin/env python
#coding = utf-8
ROLE_CFG = {"dfs_datanode_data_dir_perm": "755",
"dfs_data_dir_list": "/data2/dn"
}
使用场景
具体能用在什么地方呢?比如:当前有一个上百台的集群,每个DataNode节点挂载11块机械硬盘,且因成本原因考虑不是用全新的机器,假设故损率20%以上,每天都有盘故障。这时候就可以根据监控情况,一旦检测到盘损坏,自动调用此脚本程序对CDH的HDFS DataNode服务配置进行更改,去除异常机械硬盘,并重启生效,整个过程无需管理人员手动参与。除此之外,可对集群的性能进行指标量化并进行分析,评估是否对服务实例进行参数调整,解除运行瓶颈或隐患。
版权声明:本文为u010543388原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。