php通过 thrift访问hadoop的hive

本文讲解php通过sql查询hadoop中的数据。主要使用的技术是:php通过thrifthive提交sql查询,hivesql查询转换成hadoop任务,等到hadoop执行完毕后,返回一个结果uri,然后我们只需要读取这个uri中的内容。

Thrift的诞生是为了解决不同语言之间的访问的问题,可以支持多种程序语言,如c++phpjavapython等。Thrift是由facebook开发的轻量级跨语言的服务框架,现在已经移交到apache基金会下。和它类似的有google出的protocol buffericeThrift的一大优势是支持的语言很丰富,它使用自己的IDL语言来服务接口和数据交换的格式。

Hive是可以使用类似sql的语言访问HBase。而HBase是一个开源的nosql产品,它实现了google bigtable论文的一个开源产品,和hadoophdfs一起,可以用来存储和处理海量column family数据。

Thrift的官方网址:http://thrift.apache.org/

一.服务器端下载安装thrift

在hadoop和hbase都已经安装好的集群上安装thrift。
(1)下载:wget http://mirror.bjtu.edu.cn/apache//thrift/0.8.0/thrift-0.8.0.tar.gz,下载thrift客户端源码包。

(2)解压:tar -xzf thrift-0.8.0.tar.gz
(3)编译安装:如果是源码编译的,首先需要执行./bootstrap.sh创建./configure文件;
接下来执行./configure;
再执行make && make install
(4)启动:./bin/hbase-daemon.sh start thrift
Thrift默认监听的端口是9090。
参考链接:http://blog.csdn.net/hguisu/article/details/7298456

二.创建.thrift文件

Thrift编译器安装好之后,需要创建一个thrift文件。该文件为一个接口定义文件,需要定义thrift的类型(types)和服务(services)。该文件中定义的服务(services)是由服务器端实现的,并由客户端进行调用。Thrift编译器的作用是将你写的thrift文件生成为客户端接口源码,该接口源码是由不同的客户端库和你写的服务来生成的。为了通过thrift文件产生不同语言的接口源码,我们需要运行:

thrift --gen <language> <Thrift filename>

三.thrift文件描述

支持的变量类型

类型           描述    
bool            #true, false    
byte            #8位的有符号整数    
i16             #16位的有符号整数    
i32             #32位的有符号整数    
i64             #64位的有符号整数    
double          #64位的浮点数    
string          #UTF-8编码的字符串    
binary          #字符数组    
struct          #结构体    
list<type>        #有序的元素列表,类似于STL的vector    
set<type>     #无序的不重复元素集,类似于STL的set    
map<type1,type2>  #key-value型的映射,类似于STL的map    
exception       #是一个继承于本地语言的exception基类    
service         #服务包含多个函数接口(纯虚函数)

四.从服务端到客户端如何开发

1.简单的helloworld程序

这里使用python做服务端,php做客户端,当然也可以使用c++来做服务端。下面开始介绍开发流程。

(1)首先我们在服务器端写个helloworld.thrift文件,如下所示:

service HelloWorld{  
   string ping(1: string name),  
   string getpng(),  
}
(2) 在服务器端编译helloworld.thrift

编译helloworld.thrift文件,会产生服务器端和客户端相应语言的接口源码。
/usr/local/thrift/bin/thrift -r --gen py helloworld.thrift  
/usr/local/thrift/bin/thrift -r --gen php helloworld.thrift  
#会在当前目录下生成 gen-* 目录。
产生的gen-py目录放在服务器上,产生的gen-php目录放在客户端上。
(3)编写服务器端代码

    import sys  
    sys.path.append('./gen-py')  
       
    from helloworld import HelloWorld  
    from helloworld.ttypes import *  
       
    from thrift.transport import TSocket  
    from thrift.transport import TTransport  
    from thrift.protocol import TBinaryProtocol  
    from thrift.server import TServer  
       
    class HellowordHandler:  
        def __init__ (self):  
            pass  
       
        def ping (self, name):  
            print name + ' from server.'  
            return "%s from server." % name  
        def getpng (self):  
            f = open("./logo.png", "rb")  
            c = f.read()  
            f.close()  
            return c  
    handler = HellowordHandler()  
    processor = HelloWorld.Processor(handler)  
    transport = TSocket.TServerSocket(9090)  
    tfactory = TTransport.TBufferedTransportFactory()  
    pfactory = TBinaryProtocol.TBinaryProtocolFactory()  
       
    server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)  
       
    # You could do one of these for a multithreaded server  
    #server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)  
    #server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)  
       
    print 'Starting the server...'  
    server.serve()  
    print 'done.'  

(4)编写客户端代码

先将gen-php目录拷贝到客户端上。

<?php  
try{  
    //包含thrift客户端库文件  
    $GLOBALS['THRIFT_ROOT'] = './php/src';   
    require_once $GLOBALS['THRIFT_ROOT'].'/Thrift.php';  
    require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';  
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';  
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/THttpClient.php';  
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php'; 
 
    error_reporting(E_NONE);  
 
   //包含helloworld接口文件
    $GEN_DIR = './gen-php';  
    require_once $GEN_DIR.'/helloworld/HelloWorld.php';  

    error_reporting(E_ALL);  
   
    $socket = new TSocket('*.*.*.*', 9090);  
    $transport = new TBufferedTransport($socket, 1024, 1024);  
    $protocol = new TBinaryProtocol($transport);  
    $client = new HelloWorldClient($protocol);  
    
    $transport->open();  
   
    $a = $client->ping('xyq ');  
    echo $a;  
   
    $transport->close();  
   
    }catch(TException $tx){  
        print 'TException: '.$tx->getMessage()."/n";  
    }  
   
?>
最后给出一篇参考链接: http://blog.csdn.net/heiyeshuwu/article/details/5982222

2.thrift官网上给出的例子

Apache thrift能够让你在一个简单的.thrift文件中,定义数据类型和服务接口。将该.thrift文件作为输入文件,通过编译器编译产生服务端和客户端源码,从而构建了RPC客户端和服务器端之间的跨语言编程。
下面直接给出关键代码。

(1)thrift定义文件

/*定义的接口数据类型*/
struct UserProfile {
        1: i32 uid,
        2: string name,
        3: string blurb
}
/*定义的接口函数*/
service UserStorage {
        void store(1: UserProfile user),
        UserProfile retrieve(1: i32 uid)
}
(2)客户端python实现

  # Make an object
  up = UserProfile(uid=1,
      name="Test User",
      blurb="Thrift is great")	

  # Talk to a server via TCP sockets, using a binary protocol
  transport = TSocket.TSocket("localhost", 9090)
  transport.open()
  protocol = TBinaryProtocol.TBinaryProtocol(transport)

  # Use the service we already defined
  service = UserStorage.Client(protocol)
  service.store(up)

  # Retrieve something as well
  up2 = service.retrieve(2)

(3)服务器端c++实现

class UserStorageHandler : virtual public UserStorageIf {
   public:
    UserStorageHandler() {
      // Your initialization goes here
    }

    void store(const UserProfile& user) {
      // Your implementation goes here
      printf("store\n");
    }

    void retrieve(UserProfile& _return, const int32_t uid) {
      // Your implementation goes here
      printf("retrieve\n");
    }
  };

  int main(int argc, char **argv) {
    int port = 9090;
    shared_ptr<UserStorageHandler> handler(new UserStorageHandler());
    shared_ptr<TProcessor> processor(new UserStorageProcessor(handler));
    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    server.serve();
}

3.实战经历

(1).thrift接口文件

文件名为hive.thrift,如下所示:

namespace java com.gj.data.hive.thrift

/**
 * 向HADOOP提交HIVE任务类。典型的用法是提交任务,轮询任务是否完成,取得任务的结果URI,读取结果文件。
 *这里展示的是客户端为java语言时的用法。
 *           long taskId = client.submitTask("abc@gj.com", "web", "select * from table1 where dt = '2013-04-10' limit 10;");
 *           if (taskId <=0) {
 *               System.out.println("error submit");
 *               return;
 *           }	
             //轮询任务是否完成
 *           int count = 50;
 *           while(count >= 0) {
 *               try {
 *                   Thread.sleep(30 * 1000);
 *               } catch (InterruptedException ex) {}
 *               if (client.isTaskFinished(taskId)) {
 *                   System.out.println(client.getResultURI(taskId));
 *                   break;
 *               } 
 *               count --;
 *          }
 */

service Hive {

    /** 提交任务
     * user - 用户名,工作邮箱,如abc@xxx.com
     * env - 提交的环境。目前支持两个环境: mobile - 移动端, web - 主站。
     * sql - 提交的sql语句。
     * 返回值:成功提交返回大于0的任务id值,此id用于后续查询。失败返回0或-1.
     */
    i64 submitTask(1:string user, 2:string env, 3:string sql);
    
    /** 查看任务是否完成
     *  taskId - 任务号
     * 返回值:完成返回true,未完成返回false
     */
    bool isTaskFinished(1:i64 taskId);
    
    /** 取得任务结果的URI,可以用此URI获得结果数据
     *  taskId - 任务号
     * 返回值:任务有结果,返回URI,否则返回空字符串
     */    
    string getResultURI(1:i64 taskId);
    
    /** 取得用户的所有任务列表
     *  user - 用户名,完整的ganji邮箱
     * 返回值:任务号列表,如果没有任务,返回空
     */
    list<i64> getTasksByUserName(1:string user);
}
(2).生成php与hbase的接口文件(服务器端实现)

/usr/local/thrift/bin/thrift --gen php hive.thrift
然后会在gen-php目录下生成了Hive.php和Hive_types.php文件。
然后把Hive.php和Hive_types.php文件拷贝到:客户端php开发的目录下。

(3).配置php客户端

php作为客户端使用thrift时,需要进行如下配置。

(a)准备thrift php客户端基础类

这些基础类可以从thrift的源码包中找到。在thriftsrc/lib/php/src下,一班有如下目录和文件:ext/,protocol/,transport/目录,和thrift.php、autoload.php文件。我们把这些目录和文件拷贝到客户端的/server/www/thrift_part/thrift-0.5.0/目录下。

(b)配置php的thrift扩展,使其支持thrift

如果php想要使用thrift,还需要安装php的thrift扩展。
如下所示:
首先下载相应的php的thrift扩展,进行解压;
进入源码下的ext/thrift_protocol;
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --enable-thrift_protocol
make
make install
然后把生成的thrift_protocol.so文件配置到php.ini并重启apache服务。

(4).php客户端的实现

文件名为:updateHiveData.php

<?php
    $GLOBALS['THRIFT_ROOT'] = '/server/www/third_part/thrift-0.5.0';
    
    require_once $GLOBALS['THRIFT_ROOT'].'/Thrift.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/packages/scribe/scribe.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/THttpClient.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php';
    require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';

    //生成的文件
    require_once dirname(__FILE__) . '/Hive.php';
    //require_once dirname(__FILE__) .'/hive_types.php';
    
    ERROR_REPORTING(E_ALL);
    INI_SET('DISPLAY_ERRORS','ON');
   
    $socket = new TSocket('hive.corp.gj.com',13080);
    $socket->setDebug(TRUE); 
    
    // 设置接收超时(毫秒)
    $socket->setSendTimeout(10000);
    $socket->setRecvTimeout(10000);
    
    //$transport = new TBufferedTransport($socket, 1024, 1024);
    $transport = new TFramedTransport($socket);
    $protocol = new TBinaryProtocol($transport);
    $client = new HiveClient($protocol);
    
    try{
        $transport->open();
    }catch(TException $tx){
        echo $tx->getMessage();
    }
     
    //获取各个类目某一天的 PV UV
    $taskId = $client->submitTask('xxx@xxx.com','web',"select regexp_extract(gjch, '^/([^/]+)', 1), count(*), count(distinct uuid) from table1 where dt = '2013-04-22' and gjch regexp '[^@]*/detail' GROUP BY regexp_extract(gjch, '^/([^/]+)', 1);");
    
    if($taskId <= 0){
	echo 'error submit';
        exit;
    }
    echo $taskId . "\n";
   
    $count = 50;
    while($count > 0){
        try{
            //sleep以秒为单位,这里3分钟轮询一次
            sleep(3*60);
        }catch(TException $tx){}

		if($client->isTaskFinished($taskId)){
            //echo $client->getResultURI($taskId);
            $url = $client->getResultURI($taskId);
            //echo $url;
    	    $handle = fopen($url,"rb");
            $content = stream_get_contents($handle);
            echo $content;
            fclose($handle);

            break;
        }
        $count--;
    }
  
    $transport->close();
?>

由于服务器端不是本人负责,所以当时只根据thrift定义文件,实现了php客户端。运行结果如下:

其中这里url是$client->getResultURI取得的结果,网页内容是这个uri对应的内容。

五.thrift类说明

TSocket:采用TCP socket进行数据传输;
Transport类(传输层):
负责数据传输,介绍几个常用类:
TBufferedTransport:对某个Transport对象操作的数据进行buffer,即从buffer中读取数据进行传输,或者将数据直接写入buffer;
TFramedTransport:同TBufferdTransport类型,也会对数据进行buffer,同时它支持定长数据发送和接受;
TFileTransport:文件(日志)传输类,允许client将文件传给server,语序server将受到的数据写到文件中;

Protocol类(协议):
负责数据编码,主要有以下几个常用类:
TBinaryProtocol:二进制编码;
TJSONProtocol:JSON编码。


版权声明:本文为lianxiang_biancheng原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。