刚开始我还没意识到MetaStore的作用,看了下生产环境的配置如下:

下面就来解释下系统是如何生成meta client的!
===========================================先来看几段代码!
publicvoidcreateDatabase(Databasedb,booleanifNotExist)throwsAlreadyExistsException, HiveException {
try{
getMSC().createDatabase(db);
}catch(AlreadyExistsExceptione) {
if(!ifNotExist) {
throwe;
}
}catch(Exceptione) {
thrownewHiveException(e);
}
}
=========
privateIMetaStoreClient getMSC()throwsMetaException {
if(metaStoreClient==null) {
metaStoreClient= createMetaStoreClient();
}
returnmetaStoreClient;
}
=========
privateIMetaStoreClient createMetaStoreClient()throwsMetaException {
HiveMetaHookLoaderhookLoader=newHiveMetaHookLoader() {
publicHiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Tabletbl)throwsMetaException {
try{
if(tbl==null) {
returnnull;
}
HiveStorageHandlerstorageHandler= HiveUtils.getStorageHandler(conf,
tbl.getParameters().get(META_TABLE_STORAGE));
if(storageHandler==null) {
returnnull;
}
returnstorageHandler.getMetaHook();
}catch(HiveExceptionex) {
LOG.error(StringUtils.stringifyException(ex));
thrownewMetaException("Failed to load storage handler: "+ex.getMessage());
}
}
};
returnnewHiveMetaStoreClient(conf,hookLoader);
}
=========
publicHiveMetaStoreClient(HiveConfconf, HiveMetaHookLoaderhookLoader)
throwsMetaException {
this.hookLoader=hookLoader;
if(conf==null) {
conf=newHiveConf(HiveMetaStoreClient.class);
}
this.conf=conf;
localMetaStore=conf.getBoolVar(ConfVars.METASTORE_MODE);
if(localMetaStore) {
// instantiate the metastore server handler directly instead of connecting
// through the network
client=newHiveMetaStore.HMSHandler("hive client",conf);
isConnected=true;
return;
}
// get the number retries
retries= HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTRETRIES);
retryDelaySeconds=conf.getIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY);
// user wants file store based configuration
if(conf.getVar(HiveConf.ConfVars.METASTOREURIS) !=null) {
StringmetastoreUrisString[] =conf.getVar(
HiveConf.ConfVars.METASTOREURIS).split(",");
metastoreUris=newURI[metastoreUrisString.length];
try{
inti= 0;
for(Strings:metastoreUrisString) {
URItmpUri=newURI(s);
if(tmpUri.getScheme() ==null) {
thrownewIllegalArgumentException("URI: "+s
+" does not have a scheme");
}
metastoreUris[i++] =tmpUri;
}
}catch(IllegalArgumentExceptione) {
throw(e);
}catch(Exceptione) {
MetaStoreUtils.logAndThrowMetaException(e);
}
}elseif(conf.getVar(HiveConf.ConfVars.METASTOREDIRECTORY) !=null) {
metastoreUris=newURI[1];
try{
metastoreUris[0] =newURI(conf
.getVar(HiveConf.ConfVars.METASTOREDIRECTORY));
}catch(URISyntaxExceptione) {
MetaStoreUtils.logAndThrowMetaException(e);
}
}else{
LOG.error("NOT getting uris from conf");
thrownewMetaException("MetaStoreURIs not found in conf file");
}
// finally open the store
open();
}
下面要认真分析下上面的这段代码,因为关联到一些参数的配置,对于理解生产环境的部署参数有帮助!
先看下面这段代码
localMetaStore=conf.getBoolVar(ConfVars.METASTORE_MODE);
if(localMetaStore) {
// instantiate the metastore server handler directly instead of
// connecting
// through the network
client=newHiveMetaStore.HMSHandler("hive client",conf);
isConnected=true;
return;
}
PS: ConfVars.METASTORE_MODE ---METASTORE_MODE("hive.metastore.local",true),
这里我的配置项是:


继续往下执行
// get the number retries
retries= HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTRETRIES);
retryDelaySeconds=conf.getIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY);
这两个比较简单,
// Number of times to retry a connection to a Thrift metastore
// server
METASTORETHRIFTRETRIES("hive.metastore.connect.retries", 5),
// Number of seconds the client should wait between connection
// attempts
METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", 1),
就不详细描述了!
======================================================================================
接下来看
if(conf.getVar(HiveConf.ConfVars.METASTOREURIS) !=null) {
PS:METASTOREURIS("hive.metastore.uris","")
那么我的配置是:

好,回到代码,有这么一行
StringmetastoreUrisString[] =conf.getVar(HiveConf.ConfVars.METASTOREURIS).split(",");
那么,你知道为啥生产环境的多个 thrift 服务器之间用都好分隔了吧!
-------------------------------------------------------------------------------------------------------------------------------------------
下面开始放到数组里
try{
inti= 0;
for(Strings:metastoreUrisString) {
URItmpUri=newURI(s);
if(tmpUri.getScheme() ==null) {
thrownewIllegalArgumentException("URI: "+s+" does not have a scheme");
}
metastoreUris[i++] =tmpUri;
}
}catch(IllegalArgumentExceptione) {
throw(e);
}catch(Exceptione) {
MetaStoreUtils.logAndThrowMetaException(e);
}
所以不要忘记写上具体的scheme.
===继续
// finally open the store
open();
估计就是去连接meta server.---幸亏之前看过thrift源码,轻车熟路啊 :) thrift的源码写得比netty好懂!
突然想起来Hive就是facebook出品的, thrift也是facebook出品的,所以Hive使用Thrift也是理所当然的。
=============================================================好,我们继续跟踪open...
privatevoidopen()throwsMetaException {
for(URIstore:metastoreUris) {//一个一个遍历尝试
LOG.info("Trying to connect to metastore with URI "+store);
try{
openStore(store);
}catch(MetaExceptione) {
LOG.error("Unable to connect to metastore with URI "+store,e);
}
if(isConnected) {
break;
}
}
if(!isConnected) {
thrownewMetaException("Could not connect to meta store using any of the URIs provided");
}
LOG.info("Connected to metastore.");
}
那么,我们进入openStore函数看看!具体代码不贴,点重点
-----------------------------------------------------------------------------------------
booleanuseSasl=conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
PS:METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled",false)
默认不配置,为false
---------
transport=newTSocket(store.getHost(),store.getPort());---创建socket
((TSocket)transport).setTimeout(1000 *conf.getIntVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT));
---------
client=newThriftHiveMetastore.Client(newTBinaryProtocol(transport)); --- 创建一个client
然后就是去创建连接
for(intattempt= 0; !isConnected&&attempt
if(attempt> 0 &&retryDelaySeconds> 0) {
try{
LOG.info("Waiting "+retryDelaySeconds+" seconds before next connection attempt.");
Thread.sleep(retryDelaySeconds* 1000);
}catch(InterruptedExceptionignore) {
}
}
try{
transport.open(); --- 这个时候就开始去连接MetaStore Server了。
isConnected=true;
}catch(TTransportExceptione) {
tte=e;
if(LOG.isDebugEnabled()) {
LOG.warn("Failed to connect to the MetaStore Server...",e);
}else{
// Don't print full exception trace if DEBUG is not on.
LOG.warn("Failed to connect to the MetaStore Server...");
}
}
}
--------------------
既然如此,我们接下来就去研究MetaStore Server!!!
启动metaStore server后
privatevoidopenStore(URIstore)throwsMetaException {
里面的
try{
//看到这里了
transport.open();
isConnected=true;
}catch(TTransportExceptione) {
生成完毕!