Wednesday, April 20, 2011

Using play-framework as a thrift server/endpoint.

Why?
Hosting a Play service is a great way to manage your model and provide a web presence to your audience, however, sometimes you also want to provide specialized data services to other entities or devices. Creating a REST interface for your model and calls might not be as robust as you would want.

Additionally, in most typical thrift service setups, you won't be able to use the play-framework model you use in your website for the  service. and will be stuck with some horrific ORM.

To solve these problems and allow the play-framework model system within a thrift service, I've made a THttpTransport class that takes in a URL, generates a URL connection and then bridges the input/output stream across a TIOStreamTransport. On the server, it is a bit more complex.

So, now your website and your thrift api service can both being served up by play-framework!

NOTE: this does not currently work for BlackBerry, I am working on that currently.

How?
Using the play-framework as a server and thrift as a your protocol. You can use the java.net.URL's openConnection() to make a connection and send the basic http headers and then let us hijack the input/output stream for the purpose of bridging the thrift client to the client server.

Right now, I directly use the inputstreams, however, to support blackberry, I might have to drop down to a post parameter and base64/buffer for that. It will work, but it won't be as efficient.

Additionally, the play server likes to manage connections, and I had to affix a Future to the local TServerTransport to notify play when it can start managing the connection again.

What does the client code end up looking like?
Here is the example of the THttpTransport taking in a URL and allowing Thrift access to my ApiService:

TTransport trans = new THttpTransport( new URL( "http://localhost:9000/My/endpoint" ) );
    TProtocol prot = new TBinaryProtocol( trans );
    // This is my thrift client
    Client cli = new ApiService.Client( prot );
    // This is my example method
    DeviceCreateResponse resp = cli.DeviceCreate( new RemoteDevice()
        .setAgent( "agent" )
        .setEmail( "email" )
        );
    // Verify that it works
    System.out.println( resp );
    // Close it down.
    cli.getInputProtocol().getTransport().close();
    cli.getOutputProtocol().getTransport().close();

What does the play-framework controller look like?
The url: http://localhost:9000/My/endpoint points to a controller in the play-framework that simply contains:
public static void endpoint(){
        await( Service.handle( request.body, response.out ) );
    }

This does two things. First, it calls out to my service helper to register the request (put it into the queue) and second tells the play-framework not to close the connection but instead to suspend the request until the handle is done.

What does the Thrift service implementation look like?
That Service.handle is pretty simple:
public class Service implements ApiService.Iface {
    private static final Service              service = new Service();
    private static final TServerTransportIOS  trans   = new TServerTransportIOS();
    private static final ApiService.Processor proc    = new ApiService.Processor( service );
    private static final TThreadPoolServer    srv     = new TThreadPoolServer( proc, trans, new TBinaryProtocol.Factory());
    public static Future handle( InputStream is, OutputStream os ) {
        return trans.handle( is, os );
    }
    static {
        new Thread( new Runnable() {
            @Override
            public void run() {
                srv.serve();
            }
        } ).start();
    }
    private Service() {}
}
The only tricky thing it does is implement the TServerTransportIOS transport - which allows us to inject server side requests into the thrift processor from the server side with just InputStream and OutputStream objects ( as seen above with the play-framework controller ).

Because we need to tell play-framework when the response is completed, we have to implement the Future interface for the await to work.

What do I use to make the local Thrift service parse requests?
The code for the TServerTransportIOS is as follows:
public class TServerTransportIOS extends TServerTransport {
    private final LinkedBlockingQueue queue = new LinkedBlockingQueue();
    @Override
    public void listen() throws TTransportException {}
    @Override
    public void close() {}
    public Future handle( InputStream is, OutputStream os ) {
        TIOStreamTransportWithFuture tiost = new TIOStreamTransportWithFuture( is, os );
        queue.add( tiost );
        return tiost;
    }
    @Override
    protected TTransport acceptImpl() throws TTransportException {
        try {
            return queue.take();
        } catch ( InterruptedException e ) {
            throw new TTransportException( e );
        }
    }
}
How to make play-framework stop closing our connection before our service has completed?
Since we are writing our own acceptor and handing our own requests to the transport on the server side, we use an implementation of the TServerTransport that creates subtransports for each client handled.

However, we see that I've made a TIOStreamTransportWithFuture that extends the standard
TIOStreamTransport and have allowed it to implement Future and included some state for knowing when it's closed:
public class TIOStreamTransportWithFuture extends TIOStreamTransport implements Future {
    private boolean closed = false;
    public TIOStreamTransportWithFuture( InputStream is, OutputStream os ) {
        super( is, os );
    }
    @Override
    public void close() {
        super.close();
        synchronized ( this ) {
            closed = true;
            this.notifyAll();
        }
    }
    @Override
    public boolean cancel( boolean mayInterruptIfRunning ) {
        return false;
    }
    @Override
    public boolean isCancelled() {
        return false;
    }
    @Override
    public boolean isDone() {
        synchronized ( this ) {
            return closed;
        }
    }
    @Override
    public Boolean get() throws InterruptedException, ExecutionException {
        synchronized ( this ) {
            if ( !closed ) {
                this.wait();
            }
        }
        return true;
    }
    @Override
    public Boolean get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException {
        synchronized ( this ) {
            if ( !closed ) {
                 this.wait( unit.toMillis( timeout ) );
            }
        }
        return true;
    }
}
Most of that code is boilerplate for the silly future. At the end of the day, you now have a play-framework server that is capable of serving up Thrift requests from your play-framework model!


So, what is the code for the THttpTransport?
The final piece you'll need for your client is the THttpTransport:

public class THttpTransport extends TIOStreamTransport {
        private URL               end;
        private HttpURLConnection con;
        private boolean           open = false;
        public THttpTransport( URL endpoint ) {
            this.end = endpoint;
        }
        @Override
        public boolean isOpen() {
            return open;
        }
        @Override
        public void open() throws TTransportException {
            try {
                if ( con == null ) {
                    con = (HttpURLConnection) end.openConnection();
                }
                if ( !isOpen() ) {
                    con.setUseCaches( false );
                    con.setDoOutput( true );
                    con.setDoInput( true );
                    con.setRequestMethod( "POST" );
                    con.setRequestProperty( "Connection", "Keep-Alive" );
                    con.connect();
                    open = true;
                }
            } catch ( IOException e ) {
                throw new TTransportException( e );
            }
        }
        @Override
        public void close() {
            if ( con != null ) {
                con.disconnect();
                open = false;
                super.close();
                inputStream_ = null;
                outputStream_ = null;
            }
        }
        @Override
        public int read( byte[] buf, int off, int len ) throws TTransportException {
            try {
                if ( inputStream_ == null ) {
                    open();
                    inputStream_ = con.getInputStream();
                }
                return super.read( buf, off, len );
            } catch ( IOException e ) {
                throw new TTransportException( e );
            }
        }
        @Override
        public void write( byte[] buf, int off, int len ) throws TTransportException {
            try {
                if ( outputStream_ == null ) {
                    open();
                    outputStream_ = con.getOutputStream();
                }
                super.write( buf, off, len );
            } catch ( IOException e ) {
                throw new TTransportException( e );
            }
        }
    }

No comments:

Post a Comment