Rick

Rick
Rick

Monday, December 28, 2015

WebSocket microservice vs REST microservice

WebSocket microservice vs REST microservice


With WebSocket, you are connecting to a server and sending requests what the server sends responses. You can essentially stream calls. How does this compare with REST which uses straight HTTP? WebSocket has 8x to 20x more throughput and uses less resources.
Let's demonstrate, for this test I will run everything on my MacBook pro. I have made no tweaks to the TCP/IP OS stack (but you can expect slightly higher throughput if you do).
To test the REST code we will use wrk. The wrk load tester is capable of generating significant load when run on a single multi-core CPU. The wrk load tester uses multithreaded and event notification systems such as epoll and kqueue to maximize the number of HTTP requests per second.
The REST service code:

REST service code

package io.advantageous.qbit.example.perf.websocket;

import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.http.GET;
import io.advantageous.qbit.annotation.http.PUT;
import static io.advantageous.qbit.admin.ManagedServiceBuilder.managedServiceBuilder;

/**
 * curl  -H "Content-Type: application/json"  -X PUT http://localhost:8080/trade -d '{"name":"ibm", "amount":1}'
 * curl  http://localhost:8080/count
 */
@RequestMapping("/")
public class TradeService {

    private long count;

    @PUT("/trade")
    public boolean trade(final Trade trade) {
        trade.getName().hashCode();
        trade.getAmount();
        count++;
        return true;
    }

    @GET("/count")
    public long count() {
        return count;
    }

    public static void main(final String... args) {

        final ManagedServiceBuilder managedServiceBuilder = managedServiceBuilder();

        managedServiceBuilder
                .addEndpointService(new TradeService())
                .setRootURI("/");

        managedServiceBuilder.startApplication();
    }
}
To test this with wrk, we need a Lua script to run the PUT operations.
wrk.method = "PUT"
wrk.body   = '{"name":"ibm", "amount":1}'
wrk.headers["Content-Type"] = "application/json"
Throughput

100 connections 70K TPS

$ wrk -c100 -d10s -strade.lua http://localhost:8080/trade
Running 10s test @ http://localhost:8080/trade
  2 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.45ms  199.79us   7.61ms   88.80%
    Req/Sec    34.69k     0.87k   36.02k    85.15%
  696962 requests in 10.10s, 49.19MB read
Requests/sec:  68993.88
Transfer/sec:      4.87MB
We can tweak the server a bit and reduce the flush rate or reduce the batch size to get higher throughput with lower connections. We can also tweak the OS so we can have more ephemeral ports available and then use a lot more connections. With those tweaks experience tells me that we can get close to 90K TPS or so on a MacBook Pro. Also we could test from two machines with one of those machines being a Linux server and we can get even more throughput. This test has the disadvantage of all being run on the same machine, but it will be the same disadvantage that the WebSocket version will have so it is somewhat fair. We could also employ HTTP pipelining to increase the throughput, but this trick is great for benchmarks but rarely works in production environments with real clients. On an average server, we can get close to 150K TPS to 200K TPS from experience (I will show this later perhaps).
Ok let's see how WebSocket version does on the same machine with the same server. QBitMicroservice lib supports REST and WebSocket.
In order to use WebSocket, we need to create an async interface so we can build a client proxy.

Async interface

package io.advantageous.qbit.example.perf.websocket;

import io.advantageous.qbit.reactive.Callback;

public interface TradeServiceAsync {


    void trade(Callback<Boolean> callback, final Trade trade);
    void count(Callback<Long> callback);
}
Here is the code to create clients and run them against the same server on the same machine.

Load testing with WebSocket client

package io.advantageous.qbit.example.perf.websocket;

import io.advantageous.boon.core.Str;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.client.Client;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

import static io.advantageous.boon.core.IO.puts;
import static io.advantageous.qbit.client.ClientBuilder.clientBuilder;

public class TradeServiceLoadTestWebSocket {


    public static void main(final String... args) {

        /** Hold the number of clients we will run. */
        final int numClients = 3;

        /** Hold the number of calls each thread will make. */
        final int numCalls = 50_000_000;

        /** Hold the client threads to run. */
        final List<Thread> threadList = new ArrayList<>(numClients);

        /** Hold the counts to total. */
        final List<AtomicInteger> counts = new ArrayList<>();


        /** Create the client threads. */
        for (int c =0; c < numClients; c++) {
            final AtomicInteger count = new AtomicInteger();
            counts.add(count);
            threadList.add(new Thread(() -> {
                runCalls(numCalls, count);
            }));
        }

        /** Start the threads. */
        threadList.forEach(Thread::start);

        /** Grab the start time. */
        long startTime = System.currentTimeMillis();

        for (int index =0; index<1000; index++) {
            Sys.sleep(1000);

            long totalCount = 0L;

            for (int c = 0; c < counts.size(); c++) {
                totalCount += counts.get(c).get();
            }

            puts("total", Str.num(totalCount),
                    "\telapsed time", Str.num(System.currentTimeMillis()-startTime),
                    "\trate", Str.num(totalCount/(System.currentTimeMillis()-startTime)*1000));
        }

    }

    /** Each client will run this
     *
     * @param numCalls number of times to make calls
     * @param count holds the total count
     */
    private static void runCalls(final int numCalls, final AtomicInteger count) {
        final Client client = clientBuilder().setAutoFlush(false).build();

        final TradeServiceAsync tradeService = client.createProxy(TradeServiceAsync.class, "tradeservice");

        client.startClient();

        for (int call=0; call < numCalls; call++) {
            tradeService.trade(response -> {
                if (response) {
                    count.incrementAndGet();
                }
            }, new Trade("IBM", 1));

            /** Apply some back pressure so the server is not overwhelmed. */
            if (call % 10 == 0) {
                while (call - 5_000 > count.get()) {
                    Sys.sleep(10);
                }
            }
        }

        flushServiceProxy(tradeService);
    }

}
How does WebSocket do? Quite well!
total 26,668,058    elapsed time 52,186     rate 511,000
That is a total of 1,022,000 messages a second (request / response) using just three WebSocket connections. A single WebSocket connection seems to handle around 700K TPS, and then we start running into more and more IO contention, which again can be solved by having bigger pipes or by tweaking the TCP/IP stack. But in this simple test, we can see that we have 7.5X improvement over REST by using WebSocket.

Monday, December 21, 2015

Adapting QBit microservice lib queues to Reactive Streams

At the core of QBit, microservices lib, is the queue. The QBit microservice lib centers aroundqueues and micro-batching. A queue is a stream of messages. Micro-batching is employed to improve throughput by reducing thread handoffs, and improving IO efficiency. You can implement back pressure in QBit. However, reactive streams, has a very clean interface for implementing back pressure. Increasingly APIs for new SQL databases, data grids, messaging, are using some sort of streaming API.
If you are using Apache Spark, or Kafka or Apache Storm or Cassandra, then you are likely already familiar with streaming APIs. Java 8 ships with a streaming API and Java 9 improves on the concepts with Flow. Streams are coming to you one way or another if you are a Java programmer.
There are even attempts to make a base level stream API for compatibility sakes, called Reactive Streams.
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Realizing the importance of streaming standards and the ability to integrate into a larger ecosystem, QBit now supports reactive streams and will support it more as time goes on.

QBit endeavors to make more and more things look like QBit queues which is a convenient interface for dealing with IO, interprocess and inter-thread communication with speeds up to 100M TPS to 200M TPS.
We have added two classes to adapt a QBit queue to a stream. Later there will be classes to adapte a stream to a queue. The two classes are: QueueToStreamRoundRobin and QueueToStreamUnicast. The overhead of backpressure support is around 20 to 30%. There are more QBit centric ways of implementing back pressure that will have less overhead, but in the grand scheme of thing this overhead is quite small. QueueToStreamRoundRobin and QueueToStreamUnicast have been clocked between 70M TPS and 80M TPS with the advantage of easily handling resource consumption management so that a fast data source does not overwhelm the stream destination. QueueToStreamRoundRobin allows many reactive stream destinations from the same QBit queue.
Let's see these two in action:

Simple trade class

public class Trade {
        final String name;
        final long amount;

        private Trade(String name, long amount) {
            this.name = name;
            this.amount = amount;
        }
        ...
    }

Adapting a Queue into a Reactive Stream

final Queue<Trade> queue = 
       QueueBuilder
          .queueBuilder()
          .build();

/* Adapt the queue to a stream. */
final QueueToStreamUnicast<Trade> stream = 
                new QueueToStreamUnicast<>(queue);

stream.subscribe(new Subscriber<Trade>() {
   private Subscription subscription;
   private int count;

   @Override
   public void onSubscribe(Subscription s) {
          this.subscription = s;
          s.request(10);
   }

   @Override
   public void onNext(Trade trade) {

         //Do something useful with the trade 
         count++;
         if (count > 9) {
            count = 0;
            subscription.request(10);
         } else {
            count++;
         }

   }

   @Override
   public void onError(Throwable t) {
        error.set(t);
   }

            @Override
   public void onComplete() {
               /* shut down. */
   }
});

/* Send some sample trades. */
final SendQueue<Trade> tradeSendQueue = queue.sendQueue();

for (int index = 0; index < 20; index++) {
            tradeSendQueue.send(new Trade("TESLA", 100L + index));
}
tradeSendQueue.flushSends();
For the multicast version, here is a simple unit test showing how it works.

Unit test showing how mulitcast works

package io.advantageous.qbit.stream;

import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.queue.Queue;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.queue.SendQueue;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class QueueToStreamMulticast {



    private class Trade {
        final String name;
        final long amount;

        private Trade(String name, long amount) {
            this.name = name;
            this.amount = amount;
        }
    }

    @Test
    public void test() throws InterruptedException {
        final Queue<Trade> queue = QueueBuilder.queueBuilder().build();

        final QueueToStreamRoundRobin<Trade> stream = new QueueToStreamRoundRobin<>(queue);

        final CopyOnWriteArrayList<Trade> trades = new CopyOnWriteArrayList<>();
        final AtomicBoolean stop = new AtomicBoolean();
        final AtomicReference<Throwable> error = new AtomicReference<>();
        final AtomicReference<Subscription> subscription = new AtomicReference<>();

        final CountDownLatch latch = new CountDownLatch(1);
        final CountDownLatch stopLatch = new CountDownLatch(1);

        stream.subscribe(new Subscriber<Trade>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
                subscription.set(s);
            }

            @Override
            public void onNext(Trade trade) {

                trades.add(trade);

                if (trades.size()==10) {
                    latch.countDown();
                }
            }

            @Override
            public void onError(Throwable t) {
                error.set(t);
            }

            @Override
            public void onComplete() {
                stop.set(true);
                stopLatch.countDown();
            }
        });

        final SendQueue<Trade> tradeSendQueue = queue.sendQueue();

        for (int index = 0; index < 100; index++) {
            tradeSendQueue.send(new Trade("TESLA", 100L + index));
        }
        tradeSendQueue.flushSends();
        Sys.sleep(100);
        latch.await(10, TimeUnit.SECONDS);

        assertEquals(10, trades.size());

        assertEquals(false, stop.get());

        assertNotNull(subscription.get());


        subscription.get().cancel();

        stopLatch.await(10, TimeUnit.SECONDS);


        assertEquals(true, stop.get());

    }


    @Test
    public void test2Subscribe() throws InterruptedException {
        final Queue<Trade> queue = QueueBuilder.queueBuilder().setBatchSize(5).build();

        final QueueToStreamRoundRobin<Trade> stream = new QueueToStreamRoundRobin<>(queue);

        final CopyOnWriteArrayList<Trade> trades = new CopyOnWriteArrayList<>();
        final AtomicBoolean stop = new AtomicBoolean();
        final AtomicReference<Throwable> error = new AtomicReference<>();
        final AtomicReference<Subscription> subscription = new AtomicReference<>();

        final CountDownLatch latch = new CountDownLatch(1);
        final CountDownLatch stopLatch = new CountDownLatch(1);

        stream.subscribe(new Subscriber<Trade>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
                subscription.set(s);
            }

            @Override
            public void onNext(Trade trade) {

                trades.add(trade);

                if (trades.size()==20) {
                    latch.countDown();
                }
            }

            @Override
            public void onError(Throwable t) {
                error.set(t);
            }

            @Override
            public void onComplete() {
                stop.set(true);
                stopLatch.countDown();
            }
        });


        stream.subscribe(new Subscriber<Trade>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
                subscription.set(s);
            }

            @Override
            public void onNext(Trade trade) {

                trades.add(trade);

                if (trades.size()==20) {
                    latch.countDown();
                }
            }

            @Override
            public void onError(Throwable t) {
                error.set(t);
            }

            @Override
            public void onComplete() {
                stop.set(true);
                stopLatch.countDown();
            }
        });

        final SendQueue<Trade> tradeSendQueue = queue.sendQueue();

        for (int index = 0; index < 40; index++) {
            tradeSendQueue.send(new Trade("TESLA", 100L + index));
        }
        tradeSendQueue.flushSends();
        Sys.sleep(100);
        latch.await(10, TimeUnit.SECONDS);

        assertEquals(20, trades.size());

        assertEquals(false, stop.get());

        assertNotNull(subscription.get());


        subscription.get().cancel();

        stopLatch.await(10, TimeUnit.SECONDS);


        assertEquals(true, stop.get());

    }
}

QBit has support for a Queue API that works over Kafka, JMS, WebSocket and in-memory queues. You can now use reactive streams with these queues and the performance is very good.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training