ContinuousAggregates · gerritjvv/SimpleSQL Wiki · GitHub
Skip to content

ContinuousAggregates

gerritjvv edited this page Nov 27, 2012 · 1 revision

Continuous Aggregates

Overview

When processing a potential endless streams of messages we want to use fast in-memory structures to pre-aggregate the data into chunks and then send to a final storage.

For example:

We have a kafka queue (the potential endless message stream) and want to insert the messages into HBase.

If we were to do a one message one HBase insert we'll have a situation where 1000 000 messsages mean 1000 000 HBase inserts, and this really will slow down our message consumption and is not optimum.

Using partial aggregates we can reduce this to maybe 1000 0000 messages and 100 HBase inserts.

Example

For a complete example see the class org.simplesql.om.data.TestChunkedProcessor.java

ChunkedProcessor cn = new ChunkedProcessor(conf,
			"CREATE TABLE my (name STRING,  hit INT )",
			"SELECT name, COUNT(hit) FROM my GROUP BY name");


MySink sink = new MySink();
long start = System.currentTimeMillis();
cn.runAsync(new MyMultiThreadedDataSource(), sink, chunkSize);

Thread.sleep(1000);

	long iterations = cn.stopWait();

	long end = System.currentTimeMillis() - start;
	System.out.println("Made: " + iterations + " in " + end + "ms");
	
	Map<Key, Cell<?>[]> map = sink.map;
	
	for(Entry<Key, Cell<?>[]> entry : map.entrySet()){
		System.out.println("key: " + entry.getKey().asString()
				+ " : " + entry.getValue()[0] + ", " + entry.getValue()[1]);
	}

Explanation

The class ChunkProcessor has a single method:

runAync(final DataSource dataSource, final DataSink sink,
		final int chunkSize)

this method takes a DataSource, read and aggregate the data continuously and on each chunkSize (or end of data) will write out the partial aggregates so far to the DataSink. The partial aggregates are restarted to zero the class loops.

Kafka support

Kafka is the ideal situation for these kinds of aggregates, and the kafka subproject has a KafkaDataSource implementation that will return the KafkaSream(s) as Iterators via the MultiThreadedDataSource implementation, allowing KafkaStream(s) to be aggregated in parallel. Note that each stream will carry its own aggregation.

The KafkaDataSource takes as argument a Transform instance that allows you to plug in any message format support to the data source.

e.g.

new KafkaDataSource(streamsList, new Transform<Message>{
   Object[] apply(Message m){
     new Object[]{getName(m), getURL(m)}
   }
 });

Clone this wiki locally