使用flume thrift 首先要了解thrift协议. Apache Thrift软件框架用于可伸缩的跨语言服务开发,它结合了一个软件栈和一个代码生成引擎来构建C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml等多种语言的服务器端和客户端程序。
Thrift的技术栈
Thrift软件栈分层从下向上分别为:传输层(Transport Layer)、协议层(Protocol Layer)、处理层(Processor Layer)和服务层(Server Layer)。
-
传输层(Transport Layer):传输层负责直接从网络中读取和写入数据,它定义了具体的网络传输协议;比如说TCP/IP传输等。
-
协议层(Protocol Layer):协议层定义了数据传输格式,负责网络传输数据的序列化和反序列化;比如说JSON、XML、二进制数据等。
-
处理层(Processor Layer):处理层是由具体的IDL(接口描述语言)生成的,封装了具体的底层网络传输和序列化方式,并委托给用户实现的Handler进行处理。
-
服务层(Server Layer):整合上述组件,提供具体的网络线程/IO服务模型,形成最终的服务。
Thrift的协议(应用层)
Thrift可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本(text)和二进制(binary)传输协议。为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目/产品中的实际需求。常用协议有以下几种:
- TBinaryProtocol:二进制编码格式进行数据传输
- TCompactProtocol:高效率的、密集的二进制编码格式进行数据传输 (常用的)
- TJSONProtocol: 使用JSON文本的数据编码协议进行数据传输
- TSimpleJSONProtocol:只提供JSON只写的协议,适用于通过脚本语言解析
Thrift的传输层
- TSocket:使用阻塞式I/O进行传输,是最常见的模式
- TNonblockingTransport:使用非阻塞方式,用于构建异步客户端
- TFramedTransport:使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO
Thrift的服务端类型
- TSimpleServer:单线程服务器端,使用标准的阻塞式I/O
- TThreadPoolServer:多线程服务器端,使用标准的阻塞式I/O
- TNonblockingServer:单线程服务器端,使用非阻塞式I/O
- THsHaServer:半同步半异步服务器端,基于非阻塞式IO读写和多线程工作任务处理
- TThreadedSelectorServer:多线程选择器服务器端,对THsHaServer在异步IO模型上进行增强
python 通过thrift向flume发送event
Flume的Thrift接口定义(IDL)语言
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
- 存放在在flume源码包apache-flume-1.7.0-src\flume-ng-sdk\src\main\thrift\flume.thrift
生成客户端基础代码
thrift --gen py flume.thrit
flume客户端源码
#coding=utf-8
from genpy.flume import ThriftSourceProtocol
from genpy.flume.ttypes import ThriftFlumeEvent
from thrift.transport import TTransport, TSocket
from thrift.protocol import TCompactProtocol
class _Transport(object):
def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):
self.thrift_host = thrift_host
self.thrift_port = thrift_port
self.timeout = timeout
self.unix_socket = unix_socket
self._socket = TSocket.TSocket(self.thrift_host, self.thrift_port, self.unix_socket)
self._transport_factory = TTransport.TFramedTransportFactory()
self._transport = self._transport_factory.getTransport(self._socket)
def connect(self):
try:
if self.timeout:
self._socket.setTimeout(self.timeout)
if not self.is_open():
self._transport = self._transport_factory.getTransport(self._socket)
self._transport.open()
except Exception, e:
print(e)
self.close()
def is_open(self):
return self._transport.isOpen()
def get_transport(self):
return self._transport
def close(self):
self._transport.close()
class FlumeClient(object):
def __init__(self, thrift_host, thrift_port, timeout=None, unix_socket=None):
self._transObj = _Transport(thrift_host, thrift_port, timeout=timeout, unix_socket=unix_socket)
self._protocol = TCompactProtocol.TCompactProtocol(trans=self._transObj.get_transport())
self.client = ThriftSourceProtocol.Client(iprot=self._protocol, oprot=self._protocol)
self._transObj.connect()
def send(self, event):
try:
self.client.append(event)
except Exception, e:
print(e)
finally:
self._transObj.connect()
def send_batch(self, events):
try:
self.client.appendBatch(events)
except Exception, e:
print(e)
finally:
self._transObj.connect()
def close(self):
self._transObj.close()
if __name__ == '__main__':
import random
flume_client = FlumeClient('127.0.0.1', 9413)
event = ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world2')
events = [ThriftFlumeEvent({'a':'hello', 'b':'world'}, 'events under hello world%s' % random.randint(0, 1000)) for _ in range(100)]
flume_client.send(event)
flume_client.send_batch(events)
flume_client.close()
相关链接
Apache Thrift系列详解(一) - 概述与入门
Apache Thrift系列详解(二) - 网络服务模型
Apache Thrift系列详解(三) - 序列化机制
python 通过thrift协议向flume发送数据