Pooling a Thrift client

Thrift is an interface definition language that is used to define and create services for numerous languages. The Thrift stack relies on protocols (TBinaryProtocol, TCompactProtocol...)  and transports (TSocket, TFileTransport...). But, since the transport layer is essentially a wrapper on a socket or a file, Thrift is NOT thread-safe. Like other resources not thread-safe, you have the choice: work with a costly locking algo, create each time a new connection to the resource or think about pool your connections. 

Locking is the simpliest way to share a not thread-safe resource, Java has plenty manner to do it well, but in all case, this implies to create a bottleneck in your code. Create a new connection to the resource each time we want to access to the service, is definitely not a great idea neither: a lot of ephemeral connection will be created and destroyed, this can be really expensive. But what about pooling?

A pool is just a set of initialised resources that are kept ready to use, rather than allocated and destroyed on demand. A client of the pool will request an object from the pool and perform operations on the returned object. When the client has finished with an object (or resource), it returns it to the pool, rather than destroying it. Pooling of resources can offer a significant performance boost in situations where the cost of initializing a class instance is high and the rate of instantiation of a class is high. (thanks Wikipedia) Apache offers a well-known pooling framework: commons-pool.

Let's start with some Maven dependencies:


<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>


We need a thrift definition file, we can use the one from Nifty:
https://github.com/facebook/nifty/blob/master/nifty-examples/src/main/resources/scribe.thrift

NB: Nifty is an implementation of Thrift clients and servers on Netty developped by Facebook, open-source and available here https://github.com/facebook/nifty.

To generate the source from a thrift file run

thrift --gen java thrift/scribe.thrift

NB: Don't forget to install the thrift-compiler (apt-get install thrift-compiler on Ubuntu - version 0.8.0-0ubuntu2).

Create the class ThriftClientPool:


package com.hangar2.thrift;

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftClientPool<T extends TServiceClient> implements
AutoCloseable {

private static final Logger LOGGER = LoggerFactory
.getLogger(ThriftClientPool.class);

private final GenericObjectPool<T> internalPool;

public ThriftClientPool(ClientFactory<T> clientFactory,
GenericObjectPool.Config poolConfig, String host, int port) {
this(clientFactory, new BinaryOverSocketProtocolFactory(host, port),
poolConfig);
}

public ThriftClientPool(ClientFactory<T> clientFactory,
ProtocolFactory protocolFactory, GenericObjectPool.Config poolConfig) {
this.internalPool = new GenericObjectPool<T>(new ThriftClientFactory(
clientFactory, protocolFactory), poolConfig);
}

class ThriftClientFactory extends BasePoolableObjectFactory<T> {

private ClientFactory<T> clientFactory;
private ProtocolFactory protocolFactory;

public ThriftClientFactory(ClientFactory<T> clientFactory,
ProtocolFactory protocolFactory) {
this.clientFactory = clientFactory;
this.protocolFactory = protocolFactory;
}

@Override
public T makeObject() throws Exception {
try {
TProtocol protocol = protocolFactory.make();
return clientFactory.make(protocol);
} catch (Exception e) {
LOGGER.warn("whut?", e);
throw new ThriftClientException(
"Can not make a new object for pool", e);
}
}

@Override
public void destroyObject(T obj) throws Exception {
if (obj.getOutputProtocol().getTransport().isOpen()) {
obj.getOutputProtocol().getTransport().close();
}
if (obj.getInputProtocol().getTransport().isOpen()) {
obj.getInputProtocol().getTransport().close();
}
}
}

public static interface ClientFactory<T> {

T make(TProtocol tProtocol);
}

public static interface ProtocolFactory {

TProtocol make();
}

public static class BinaryOverSocketProtocolFactory implements
ProtocolFactory {

private String host;
private int port;

public BinaryOverSocketProtocolFactory(String host, int port) {
this.host = host;
this.port = port;
}

public TProtocol make() {
TTransport transport = new TSocket(host, port);
try {
transport.open();
} catch (TTransportException e) {
LOGGER.warn("whut?", e);
throw new ThriftClientException("Can not make protocol", e);
}
return new TBinaryProtocol(transport);
}
}

public static class ThriftClientException extends RuntimeException {

// Fucking Eclipse
private static final long serialVersionUID = -2275296727467192665L;

public ThriftClientException(String message, Exception e) {
super(message, e);
}

}

public T getResource() {
try {
return internalPool.borrowObject();
} catch (Exception e) {
throw new ThriftClientException(
"Could not get a resource from the pool", e);
}
}

public void returnResourceObject(T resource) {
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new ThriftClientException(
"Could not return the resource to the pool", e);
}
}

public void returnBrokenResource(T resource) {
returnBrokenResourceObject(resource);
}

public void returnResource(T resource) {
returnResourceObject(resource);
}

protected void returnBrokenResourceObject(T resource) {
try {
internalPool.invalidateObject(resource);
} catch (Exception e) {
throw new ThriftClientException(
"Could not return the resource to the pool", e);
}
}

public void destroy() {
close();
}

public void close() {
try {
internalPool.close();
} catch (Exception e) {
throw new ThriftClientException("Could not destroy the pool", e);
}
}
}

ThriftClientPool delegates all the pooling logic on a GenericObjectPool. A custom BasePoolableObjectFactory holds the creation of new clients. Clients used the default ProtocolFactory (BinaryOverSocketProtocolFactory), which is a simple binary protocol transported via a socket.

Now, you can use it, with a default configuration:

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, new Config(), "localhost", 7911);

Or with a custom pooling configuration:

Config poolConfig = new Config();
poolConfig.maxActive = 80;
poolConfig.minIdle = 5;
poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
poolConfig.testOnBorrow = true;
poolConfig.testWhileIdle = true;
poolConfig.numTestsPerEvictionRun = 10;
poolConfig.maxWait = 3000;

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, poolConfig, "localhost", 7911);

Or with a custom protocol, for instance binary over framed socket (you'll need a TThreadedSelectorServer on the server side):


final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, new ProtocolFactory() {
public TProtocol make() {
TFramedTransport transport = new TFramedTransport(
new TSocket("localhost", 7911));
try {
transport.open();
} catch (TTransportException e) {
throw new ThriftClientException(
"Can not make protocol", e);
}
return new TBinaryProtocol(transport);
}
}, new Config());

Now, some tests...

Server side:

package com.hangar2.thrift;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.facebook.nifty.test.LogEntry;
import com.facebook.nifty.test.ResultCode;
import com.facebook.nifty.test.Scribe;

public class BaseNativeServer {

private static final Logger log = LoggerFactory
.getLogger(BaseNativeServer.class);

public static void main(String[] args) throws Exception {

final MetricRegistry registry = new MetricRegistry();
final Meter requests = registry.meter(MetricRegistry.name(
BaseNativeSelectorServer.class, "requests"));
final ConsoleReporter reporter = ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS).build();
reporter.start(1, TimeUnit.SECONDS);

// Create the handler
Scribe.Iface serviceInterface = new Scribe.Iface() {

public ResultCode log(List<LogEntry> messages) throws TException {
requests.mark();
for (LogEntry message : messages) {
log.info("{}: {}", message.getCategory(),
message.getMessage());
}
return ResultCode.OK;
}
};

TServerSocket serverTransport = new TServerSocket(7911);
Scribe.Processor<Scribe.Iface> processor = new Scribe.Processor<Scribe.Iface>(
serviceInterface);

final TServer server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport)
.processor(processor));

server.serve();

// Arrange to stop the server at shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.stop();
}
});
}

}

Client side:

package com.hangar2.thrift;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.apache.thrift.protocol.TProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.facebook.nifty.test.LogEntry;
import com.facebook.nifty.test.Scribe;
import com.facebook.nifty.test.Scribe.Client;
import com.hangar2.thrift.ThriftClientPool.ClientFactory;

public class BaseNativeClient {

private static final Logger log = LoggerFactory
.getLogger(BaseNativeClient.class);

public static void main(String[] args) throws Exception {

Config poolConfig = new Config();
poolConfig.maxActive = 80;
poolConfig.minIdle = 5;
poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
poolConfig.testOnBorrow = true;
poolConfig.testWhileIdle = true;
poolConfig.numTestsPerEvictionRun = 10;
poolConfig.maxWait = 3000;

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, poolConfig, "localhost", 7911);

ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
public void run() {

Scribe.Client resource = pool.getResource();
try {
for (int i = 0; i < 10000; i++) {
try {
resource.log(Collections
.singletonList(new LogEntry("cat1",
"test" + i)));
} catch (Exception e) {
e.printStackTrace();
}
}
pool.returnResource(resource);
} catch (Exception e) {
pool.returnBrokenResource(resource);
log.warn("whut?", e);
}
}
});
}

Thread.sleep(3000);
pool.close();
}
}

Sources:
Wikipedia: http://en.wikipedia.org/wiki/Apache_Thrift
Thrift homepage: http://thrift.apache.org/
Nifty, the Thift over Netty framework: https://github.com/facebook/nifty

Labels: , , ,