flume thrift协议入门与使用

使用flume thrift 首先要了解thrift协议. Apache Thrift软件框架用于可伸缩的跨语言服务开发,它结合了一个软件栈和一个代码生成引擎来构建C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml等多种语言的服务器端和客户端程序。

Thrift的技术栈

技术栈 Thrift软件栈分层从下向上分别为:传输层(Transport Layer)、协议层(Protocol Layer)、处理层(Processor Layer)和服务层(Server Layer)。

  • 传输层(Transport Layer):传输层负责直接从网络中读取和写入数据,它定义了具体的网络传输协议;比如说TCP/IP传输等。

  • 协议层(Protocol Layer):协议层定义了数据传输格式,负责网络传输数据的序列化和反序列化;比如说JSON、XML、二进制数据等。

  • 处理层(Processor Layer):处理层是由具体的IDL(接口描述语言)生成的,封装了具体的底层网络传输和序列化方式,并委托给用户实现的Handler进行处理。

  • 服务层(Server Layer):整合上述组件,提供具体的网络线程/IO服务模型,形成最终的服务。

Thrift的协议(应用层)

Thrift可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目/产品中的实际需求。常用协议有以下几种:

  • TBinaryProtocol:二进制编码格式进行数据传输
  • TCompactProtocol:高效率的、密集的二进制编码格式进行数据传输 (常用的)
  • TJSONProtocol: 使用JSON文本的数据编码协议进行数据传输
  • TSimpleJSONProtocol:只提供JSON只写的协议,适用于通过脚本语言解析

Thrift的传输层

  • TSocket:使用阻塞式I/O进行传输,是最常见的模式
  • TNonblockingTransport:使用非阻塞方式,用于构建异步客户端
  • TFramedTransport:使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO

Thrift的服务端类型

  • TSimpleServer:单线程服务器端,使用标准的阻塞式I/O
  • TThreadPoolServer:多线程服务器端,使用标准的阻塞式I/O
  • TNonblockingServer:单线程服务器端,使用非阻塞式I/O
  • THsHaServer:半同步半异步服务器端,基于非阻塞式IO读写和多线程工作任务处理
  • TThreadedSelectorServer:多线程选择器服务器端,对THsHaServer在异步IO模型上进行增强

python 通过thrift向flume发送event

Flume的Thrift接口定义(IDL)语言

namespace java org.apache.flume.thrift
 
struct ThriftFlumeEvent {
  1: required map <string, string> headers,
  2: required binary body,
}
 
enum Status {
  OK,
  FAILED,
  ERROR,
  UNKNOWN
}
 
service ThriftSourceProtocol {
  Status append(1: ThriftFlumeEvent event),
  Status appendBatch(1: list<ThriftFlumeEvent> events),

  • 存放在在flume源码包apache-flume-1.7.0-src\flume-ng-sdk\src\main\thrift\flume.thrift

生成客户端基础代码

thrift --gen py flume.thrit 

flume客户端源码


    #coding=utf-8  
    from genpy.flume import ThriftSourceProtocol  
    from genpy.flume.ttypes import ThriftFlumeEvent  
    from thrift.transport import TTransport, TSocket  
    from thrift.protocol import TCompactProtocol  
      
      
    class _Transport(object):  
        def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):  
            self.thrift_host = thrift_host  
            self.thrift_port = thrift_port  
            self.timeout = timeout  
            self.unix_socket = unix_socket  
              
            self._socket = TSocket.TSocket(self.thrift_host, self.thrift_port, self.unix_socket)  
            self._transport_factory = TTransport.TFramedTransportFactory()  
            self._transport = self._transport_factory.getTransport(self._socket)  
              
        def connect(self):  
            try:  
                if self.timeout:  
                    self._socket.setTimeout(self.timeout)  
                if not self.is_open():  
                    self._transport = self._transport_factory.getTransport(self._socket)  
                    self._transport.open()  
            except Exception, e:  
                print(e)  
                self.close()  
          
        def is_open(self):  
            return self._transport.isOpen()  
          
        def get_transport(self):  
            return self._transport  
          
        def close(self):  
            self._transport.close()  
              
    class FlumeClient(object):  
        def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):  
            self._transObj = _Transport(thrift_host, thrift_port, timeout=timeout, unix_socket=unix_socket)  
            self._protocol = TCompactProtocol.TCompactProtocol(trans=self._transObj.get_transport())  
            self.client = ThriftSourceProtocol.Client(iprot=self._protocol, oprot=self._protocol)  
            self._transObj.connect()  
              
        def send(self, event):  
            try:  
                self.client.append(event)  
            except Exception, e:  
                print(e)  
            finally:  
                self._transObj.connect()  
          
        def send_batch(self, events):  
            try:  
                self.client.appendBatch(events)  
            except Exception, e:  
                print(e)  
            finally:  
                self._transObj.connect()  
          
        def close(self):  
            self._transObj.close()  
          
    if __name__ == '__main__':  
        import random  
        flume_client = FlumeClient('127.0.0.1', 9413)  
        event = ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world2')  
        events = [ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world%s' % random.randint(0, 1000)) for _ in range(100)]  
          
        flume_client.send(event)  
        flume_client.send_batch(events)  
        flume_client.close() 

相关链接

Apache Thrift系列详解(一) - 概述与入门
Apache Thrift系列详解(二) - 网络服务模型
Apache Thrift系列详解(三) - 序列化机制
python 通过thrift协议向flume发送数据

Life is more than the present.