You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
gerritjvv edited this page Nov 27, 2012
·
1 revision
Continuous Aggregates
Overview
When processing a potential endless streams of messages we want to use fast in-memory structures to pre-aggregate the data into chunks and then send to a final storage.
For example:
We have a kafka queue (the potential endless message stream) and want to insert the messages into HBase.
If we were to do a one message one HBase insert we'll have a situation where 1000 000 messsages mean 1000 000 HBase inserts, and this really will slow down our message consumption and is not optimum.
Using partial aggregates we can reduce this to maybe 1000 0000 messages and 100 HBase inserts.
Example
For a complete example see the class org.simplesql.om.data.TestChunkedProcessor.java
ChunkedProcessor cn = new ChunkedProcessor(conf,
"CREATE TABLE my (name STRING, hit INT )",
"SELECT name, COUNT(hit) FROM my GROUP BY name");
MySink sink = new MySink();
long start = System.currentTimeMillis();
cn.runAsync(new MyMultiThreadedDataSource(), sink, chunkSize);
Thread.sleep(1000);
long iterations = cn.stopWait();
long end = System.currentTimeMillis() - start;
System.out.println("Made: " + iterations + " in " + end + "ms");
Map<Key, Cell<?>[]> map = sink.map;
for(Entry<Key, Cell<?>[]> entry : map.entrySet()){
System.out.println("key: " + entry.getKey().asString()
+ " : " + entry.getValue()[0] + ", " + entry.getValue()[1]);
}
Explanation
The class ChunkProcessor has a single method:
runAync(final DataSource dataSource, final DataSink sink,
final int chunkSize)
this method takes a DataSource, read and aggregate the data continuously and on each chunkSize (or end of data) will write out the partial aggregates so far to the DataSink. The partial aggregates are restarted to zero the class loops.
Kafka support
Kafka is the ideal situation for these kinds of aggregates, and the kafka subproject has a KafkaDataSource implementation that will return the KafkaSream(s) as Iterators via the MultiThreadedDataSource implementation, allowing KafkaStream(s) to be aggregated in parallel. Note that each stream will carry its own aggregation.
The KafkaDataSource takes as argument a Transform instance that allows you to plug in any message format support to the data source.
e.g.
new KafkaDataSource(streamsList, new Transform<Message>{
Object[] apply(Message m){
new Object[]{getName(m), getURL(m)}
}
});