Facebook Twitter Gplus LinkedIn YouTube E-mail RSS
Home GigaSpaces Elephants and Eyeballs: Real-Time Big Data in the Real World

Elephants and Eyeballs: Real-Time Big Data in the Real World

Published on June 26, 2012 by in GigaSpaces

I recently had the opportunity to work on a client's architecture that illustrates both the flexibility of the GigaSpaces platform,  and the appeal of marrying datagrid technology with eventually consistent, big data platforms.  As a by-product, I spun off some code that can ease the construction of such systems in the future.

Dump the Database

First off, the client had the desire to break his dependency on a traditional relational database back end, both for performance and financial reasons.  A high performing data store was needed both for reading and writing, because of the client's desire to not run with all data in memory.  Of course, GigaSpaces can run in a partially memory resident LRU mode.  A high performance datastore is particularly important for an LRU cache, since the client will bear the brunt of cache misses.  We proposed using Cassandra as the grid backing store, running in eventually consistent mode.  This proposition reveals a key benefit that emerges from a datagrid / nosql combination.

Kiss Eventual Consistency Goodbye

NoSQL stores trade consistency for write speed: rather than requiring a writer to wait for all replicas to be written, the writer generally returns after writing a single copy.  Behind the scenes, the data is copied to the other replicas.  This is tunable in Cassandra, but to get high speed, eventual consistency is necessary.  This means that readers are not guaranteed of having the same, or the most recent, data.  By positioning GigaSpaces as an LRU cache in front of the NoSQL store, both high write speed and consistency are available.  Writers write to the datagrid, which writes to Cassandra (in this case) asynchronously.  Subsequent readers read from the grid, which still holds the written data.  Assuming any reasonable datagrid in-memory size, the data will remain in the grid far longer than the underlying NoSQL store takes to sync its replicas.

As for reads, the GigaSpaces EDS (External Data Store) architecture provides the means to implement bridges from the datagrid to arbitrary data stores.  The Cassandra EDS not only provides a means to funnel writes to the datagrid to Cassandra, it also translates object queries to Cassandra CQL queries.  This enables client code to query the LRU cache, and have cache misses generate equivalent queries on the underlying NoSQL store.

A Transactional Bridge

Another feature of NoSQL datastores is the lack of transactions.  Updates to a single key are generally atomic, but the ACID properties of relational database are missing.  For reasons identical to the eventually consistency benefits described above, a transactional datagrid running in LRU mode confers ACID properties to a non-transactional datastore (at least from a user perspective).  Since the user interacts with the datagrid, which in the case of GigaSpaces is transactional, full ACID properties are available.  Imagine performing a process that involved combining a number of operations on several objects enlisted in a transaction and then committing.  Client code, which is interacting with the datagrid only, is always presented a consistent view of the data, and the transaction commits atomically after which the changes flow to the underlying NoSQL store.  Likewise for a rollback.  Imagine the previous transaction failing, and the changes getting rolled back.  The underlying NoSQL store is untouched, and all users still see atomic, consistent updates.  The addition of the datagrid to a NoSQL architecture expands the applicability of eventually consistent datastores greatly.

Back To The Solution

So Cassandra works great as the backing store, but that was just the appetizer.  Like many clients we're seeing, this client had begun to explore crunching through raw user event data using Hadoop.  Initially, the client visualized branching the stream of events flowing into the system into two parts: stream one would flow to GigaSpaces for handling real time system transactional data and event processing, and the other stream would flow to HDFS for later batch processing.  Later, results from the batch processing would be added to the datagrid for user viewing.  With or without GigaSpaces in the architecture, this is a common pattern.  Collect raw data during operations, and then periodically perform ETL into HDFS/Hive tables for periodic batch processing.

 

Why Wait?  Real-time is better

Clearly, some processing done by batch systems doesn't contain data that has any urgency, or is run over gigantic data sets that make an in-memory solution untenable.   But there is a class of data that is batch processed even though real-time access to the data could be highly valuable.  It is for this class of data that real-time + big data is designed for.  We suggested rather than branching the event flow prior to the datagrid, the raw events should flow into the grid, where events and derived events (many of which would have been produced by slow batch processes) could be detected and produced in real time.

 

In the end, this is what was produced.  The datagrid, besides acting as a real time event engine, serves to host a web tier and a workflow mechanism based on events using the common tuple spaces paradigm.  In addition, the datagrid buffers events to handle traffic spikes, and of course benefits from GigaSpaces horizontal scalability and WAN replication capabilities.

Spin Offs: Cassandra + HDFS Multi-EDS, and annotation driven configuration for documents

Some side effects of the project include:

  • A fully functioning Cassandra EDS.  Previously, only a POJO compatible mirror implementation had been completed. Now a full EDS exists that can persist and retrieve both documents and POJOs.
  • An HDFS persister EDS.  HDFS does not store application state, and so only needs to be write only.
  • A general purpose multi-EDS.  This simple class lets you plug in any number of EDSs, and delegate operations to them.  Writes are done in parallel across all underlying EDSs.
  • Annotation driven persistence.  Previously, lists of classes or document types had to be configured in spring, and there was no way to target individual EDSs in a general way.  No more.
  • Annotation driven document type definitions.  Previously, type information for documents had to be defined in the Spring config file where the space is defined.  This is because the type info has to be defined in the space prior to use. Since I hate defining types in a non-typesafe way in xml files, I built an annotation driven system for defining document type definitions like POJOs are currently defined.

The Cassandra and HDFS Annotation Driven EDSs

I wanted to use annotations exclusively to configure these two EDSs for simplicity sake.  Fortunately, Spring makes it easy to add custom annotations and then detect and react to them at runtime.  In a multi-EDS system, you need a way to select which objects/documents are persisted to which EDS, and a way to determine which are loaded during the initial load phase.  For this purpose I defined a single annotation: @Persistent.  

The @Persistent annotation lets you associate specific multiple persister classes, and a single loader class to POJOs.  In order to make use of the annotation, it needs to be detected at runtime.  Since the annotation itself is decorated with the @Component annotation, we can use Springs "context:component-scan" directive to detect it.  Both EDSs inherit from an abstract base class that implements the ApplicationContextAware interface.  When the EDS is defined in the processing unit Spring config file (pu.xml), the setApplicationContext is called by the framework.  In this method the annotated classes are detected and the list of classes (or documents) that this EDS is responsible for are identified.  Having the ability to target individual EDSs in a simple way is an essential building block to constructing systems like this.  If you're interested in the implementation details of the annotation processing, take a look at the AbstractAnnotationDrivenEDS class in the org.openspaces.eds.support package in the source.

The Multi-EDS

So what if we want to generalize this idea of having multiple persisters for different classes of data?  If I swap MongoDB for Cassandra in this architecture, I don't want to have to write a special EDS that handles both each time.  So I wrote a simple multi-EDS that delegates to a single reader and multiple writers.  The multi-EDS is configured simply and conventionally in the pu.xml.  

Besides simple delegation, the only important thing to note about the multi-EDS is that it doesn't just write to destinations in sequence, making the overall time the sum of the write times.  The writing, being obviously IO bound, is ideal for parallelism.  So the multi-EDS writes to all the destinations simultaneously using a thread pool and and an ExecutorCompletion service.  The details can be seen in the org.openspaces.eds.support.MultiEDS class.

Annotation Driven Document Definitions

The final spinoff from this effort has been the ability to define Space Document type definition in java code.  This is more a personal preference than anything, but I'm not a fan of putting type definitions in XML files, and I felt that Documents could be conceptually closer to Pojos by defining them in code.  To accomplish this, I created a set of annotations (in the org.openspaces.document.annotations package) and a utility class called SpaceTypeMaker.

There is no getting around the fact that space definitions that want to use SpaceDocuments need to have the type definitions fed into them.  This looks something like this:

The same definition with annotations looks like this:

There, isn't that more pleasant?  In order for this magic to work, however, an additional bean has to be added to the Spring config file.  This is the SpaceTypeMaker. It slogs through the code base looking for annotated classes and creates a list of type definitions, which then get fed to the space definition.  Here's a sample usage:

Wrap Up

The creation of real-time big data processing solutions is at hand.  The combination of the GigaSpaces transactional in-memory processing grid with eventually consistent, big data storage yields an architecture unparalleled in performance and scalability, without requiring applications to devolve to a pure key/value store, eventually consistent, non-transactional model.  The source code I've been discussing here is available on github: git://github.com/dfilppi/realtime-bigdata.git .  To run the full example, you'll have to be running both Cassandra 1.09 or later and Cloudera Hadoop cdh3u3 or later.  Of course you can configure it to run with either independently.

 
 Share on Facebook Share on Twitter Share on Reddit Share on LinkedIn
2 Comments  comments 
  • http://twitter.com/ukjbrown Jamie Brown

    Great article, however I was led here through searching for an annotation driven solution for SpaceDocument extended (wrapper) classes.

    The issue I have with the addition of the new SpaceDocument annotation is its ambiguity in the sense that the class you are annotating won’t be registered as a document wrapper class as one may perceive, therefore any objects read back would be SpaceDocument types (unless I’m mistaken).

    Whilst this is a significant improvement when using Document-POJO interoperability, I believe if this annotation set was to be added to the OpenSpaces API, that SpaceDocument wrapper classes are also catered for in a generic way.

    For my own projects, I have created a similar set of annotations and scanner that solves the problem of SpaceDocument extended classes through annotation of the class and it’s static members. E.g.

    (at)SpaceDocument(“Product”)
    public class Product extends SpaceDocument 
    {
      private static final String TYPE_NAME = “Product”;

      (at)SpaceId(autoGenerate = true)
      private static final String PROP_ID = “Id”;

      (at)SpaceIndex(type = SpaceIndexType.EXTENDED)
      private static final String PROP_NAME = “Name”;

      public Product() { super(TYPE_NAME); }

      public String getId() { return super.getProperty(PROP_ID); }

      public Product setId(String id) {
        super.setProperty(PROP_ID, id);
        return this;
      }

      …
    }

    Whilst I have replicated the SpaceId, SpaceIndex and SpaceRouting annotations,  I see no reason why the existing annotations couldn’t be used as well as creating a new SpaceDocument annotation.

    • dfilppi

      Jamie, glad you enjoyed it. For the time being there aren’t any plans to include the annotations I used in the openspaces code base. It was merely my attempt to avoid XML configuration, and provide a measure of compile time type safety. It is true that the annotated @SpaceDocument java class in my scheme is essentially a throwaway.  It wasn’t really targeted at POJO/Document interop, but rather for a pure SpaceDocument implementation, which I feel fits the flexible nature of the underlying Cassandra data store nicely.

© GigaSpaces on Application Scalability | Open Source PaaS and More