metastore作用_Hive之HiveMetaStoreClient生成过程

刚开始我还没意识到MetaStore的作用,看了下生产环境的配置如下:

4f48a268d0373b8b5d78944f814eedbe.png

下面就来解释下系统是如何生成meta client的!

===========================================先来看几段代码!

publicvoidcreateDatabase(Databasedb,booleanifNotExist)throwsAlreadyExistsException, HiveException {

try{

getMSC().createDatabase(db);

}catch(AlreadyExistsExceptione) {

if(!ifNotExist) {

throwe;

}

}catch(Exceptione) {

thrownewHiveException(e);

}

}

=========

privateIMetaStoreClient getMSC()throwsMetaException {

if(metaStoreClient==null) {

metaStoreClient= createMetaStoreClient();

}

returnmetaStoreClient;

}

=========

privateIMetaStoreClient createMetaStoreClient()throwsMetaException {

HiveMetaHookLoaderhookLoader=newHiveMetaHookLoader() {

publicHiveMetaHook getHook(org.apache.hadoop.hive.metastore.api.Tabletbl)throwsMetaException {

try{

if(tbl==null) {

returnnull;

}

HiveStorageHandlerstorageHandler= HiveUtils.getStorageHandler(conf,

tbl.getParameters().get(META_TABLE_STORAGE));

if(storageHandler==null) {

returnnull;

}

returnstorageHandler.getMetaHook();

}catch(HiveExceptionex) {

LOG.error(StringUtils.stringifyException(ex));

thrownewMetaException("Failed to load storage handler:  "+ex.getMessage());

}

}

};

returnnewHiveMetaStoreClient(conf,hookLoader);

}

=========

publicHiveMetaStoreClient(HiveConfconf, HiveMetaHookLoaderhookLoader)

throwsMetaException {

this.hookLoader=hookLoader;

if(conf==null) {

conf=newHiveConf(HiveMetaStoreClient.class);

}

this.conf=conf;

localMetaStore=conf.getBoolVar(ConfVars.METASTORE_MODE);

if(localMetaStore) {

// instantiate the metastore server handler directly instead of connecting

// through the network

client=newHiveMetaStore.HMSHandler("hive client",conf);

isConnected=true;

return;

}

// get the number retries

retries= HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTRETRIES);

retryDelaySeconds=conf.getIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY);

// user wants file store based configuration

if(conf.getVar(HiveConf.ConfVars.METASTOREURIS) !=null) {

StringmetastoreUrisString[] =conf.getVar(

HiveConf.ConfVars.METASTOREURIS).split(",");

metastoreUris=newURI[metastoreUrisString.length];

try{

inti= 0;

for(Strings:metastoreUrisString) {

URItmpUri=newURI(s);

if(tmpUri.getScheme() ==null) {

thrownewIllegalArgumentException("URI: "+s

+" does not have a scheme");

}

metastoreUris[i++] =tmpUri;

}

}catch(IllegalArgumentExceptione) {

throw(e);

}catch(Exceptione) {

MetaStoreUtils.logAndThrowMetaException(e);

}

}elseif(conf.getVar(HiveConf.ConfVars.METASTOREDIRECTORY) !=null) {

metastoreUris=newURI[1];

try{

metastoreUris[0] =newURI(conf

.getVar(HiveConf.ConfVars.METASTOREDIRECTORY));

}catch(URISyntaxExceptione) {

MetaStoreUtils.logAndThrowMetaException(e);

}

}else{

LOG.error("NOT getting uris from conf");

thrownewMetaException("MetaStoreURIs not found in conf file");

}

// finally open the store

open();

}

下面要认真分析下上面的这段代码,因为关联到一些参数的配置,对于理解生产环境的部署参数有帮助!

先看下面这段代码

localMetaStore=conf.getBoolVar(ConfVars.METASTORE_MODE);

if(localMetaStore) {

// instantiate the metastore server handler directly instead of

// connecting

// through the network

client=newHiveMetaStore.HMSHandler("hive client",conf);

isConnected=true;

return;

}

PS: ConfVars.METASTORE_MODE ---METASTORE_MODE("hive.metastore.local",true),

这里我的配置项是:

a84528d91092435346c5178dd5e8dda9.png

f9415b5234c3007de91c3bdf7d7437dc.png

继续往下执行

// get the number retries

retries= HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTRETRIES);

retryDelaySeconds=conf.getIntVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY);

这两个比较简单,

// Number of times to retry a connection to a Thrift metastore

// server

METASTORETHRIFTRETRIES("hive.metastore.connect.retries", 5),

// Number of seconds the client should wait between connection

// attempts

METASTORE_CLIENT_CONNECT_RETRY_DELAY("hive.metastore.client.connect.retry.delay", 1),

就不详细描述了!

======================================================================================

接下来看

if(conf.getVar(HiveConf.ConfVars.METASTOREURIS) !=null) {

PS:METASTOREURIS("hive.metastore.uris","")

那么我的配置是:

aa8e5245155518815dd316eb84aef787.png

好,回到代码,有这么一行

StringmetastoreUrisString[] =conf.getVar(HiveConf.ConfVars.METASTOREURIS).split(",");

那么,你知道为啥生产环境的多个 thrift 服务器之间用都好分隔了吧!

-------------------------------------------------------------------------------------------------------------------------------------------

下面开始放到数组里

try{

inti= 0;

for(Strings:metastoreUrisString) {

URItmpUri=newURI(s);

if(tmpUri.getScheme() ==null) {

thrownewIllegalArgumentException("URI: "+s+" does not have a scheme");

}

metastoreUris[i++] =tmpUri;

}

}catch(IllegalArgumentExceptione) {

throw(e);

}catch(Exceptione) {

MetaStoreUtils.logAndThrowMetaException(e);

}

所以不要忘记写上具体的scheme.

===继续

// finally open the store

open();

估计就是去连接meta server.---幸亏之前看过thrift源码,轻车熟路啊 :) thrift的源码写得比netty好懂!

突然想起来Hive就是facebook出品的, thrift也是facebook出品的,所以Hive使用Thrift也是理所当然的。

=============================================================好,我们继续跟踪open...

privatevoidopen()throwsMetaException {

for(URIstore:metastoreUris) {//一个一个遍历尝试

LOG.info("Trying to connect to metastore with URI "+store);

try{

openStore(store);

}catch(MetaExceptione) {

LOG.error("Unable to connect to metastore with URI "+store,e);

}

if(isConnected) {

break;

}

}

if(!isConnected) {

thrownewMetaException("Could not connect to meta store using any of the URIs provided");

}

LOG.info("Connected to metastore.");

}

那么,我们进入openStore函数看看!具体代码不贴,点重点

-----------------------------------------------------------------------------------------

booleanuseSasl=conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);

PS:METASTORE_USE_THRIFT_SASL("hive.metastore.sasl.enabled",false)

默认不配置,为false

---------

transport=newTSocket(store.getHost(),store.getPort());---创建socket

((TSocket)transport).setTimeout(1000 *conf.getIntVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT));

---------

client=newThriftHiveMetastore.Client(newTBinaryProtocol(transport)); --- 创建一个client

然后就是去创建连接

for(intattempt= 0; !isConnected&&attempt

if(attempt> 0 &&retryDelaySeconds> 0) {

try{

LOG.info("Waiting "+retryDelaySeconds+" seconds before next connection attempt.");

Thread.sleep(retryDelaySeconds* 1000);

}catch(InterruptedExceptionignore) {

}

}

try{

transport.open(); --- 这个时候就开始去连接MetaStore Server了。

isConnected=true;

}catch(TTransportExceptione) {

tte=e;

if(LOG.isDebugEnabled()) {

LOG.warn("Failed to connect to the MetaStore Server...",e);

}else{

// Don't print full exception trace if DEBUG is not on.

LOG.warn("Failed to connect to the MetaStore Server...");

}

}

}

--------------------

既然如此,我们接下来就去研究MetaStore Server!!!

启动metaStore server后

privatevoidopenStore(URIstore)throwsMetaException {

里面的

try{

//看到这里了

transport.open();

isConnected=true;

}catch(TTransportExceptione) {

生成完毕!


版权声明:本文为weixin_36385240原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。