Big Data/Analytics Zone is brought to you in partnership with:

Dmitriy Setrakyan manages daily operations of GridGain Systems and brings over 12 years of experience to GridGain Systems which spans all areas of application software development from design and architecture to team management and quality assurance. His experience includes architecture and leadership in development of distributed middleware platforms, financial trading systems, CRM applications, and more. Dmitriy is a DZone MVB and is not an employee of DZone and has posted 54 posts at DZone. You can read more from them at their website. View Full User Profile

Can You Use HBase as Your Persistent Store?

06.28.2012
| 6657 views |
  • submit to reddit

We have added many cool features in GridGain 4.1. One of them is tight integration with Hadoop ecosystem. There are two ways you can integrate with Hadoop. One way is upstream integration in which you efficiently load data from HDFS into In-Memory Cache (aka Data Grid) where it gets indexed for low-latency query access. Another way is downstream integration where HDFS is used as a persistent store and data gets flushed into it periodically from In-Memory Cache using write-behind cache feature.

With downstream integration users are able to hold the latest data in memory and use HDFS as historical data warehouse without any extra ETL process and without lags in data. Business applications can run queries and get instant analytical feedback on the whole data set which includes most recent data held in-memory and HDFS-based warehoused data, hence not loosing any data in query results at all.

Here is an example of how HDFS-based cache store would look like in this case. I show how HBase would be used, but you can write directly to HDFS if you like. Note that all we do here is override methods load(...), put(...), and remove(...) to tell GridGain how to load and update entries in HBase.

Full source of this example can also be found on GitHub GridGain Project:

Cache Store with HBase

public class GridCacheHBasePersonStore 
    extends GridCacheStoreAdapter<Long, Person> {
    // Default config path.
    private static final String CONFIG_PATH "/my/hbase/hbase-site.xml";

    // Table name.
    private static final String TABLE_NAME = "persons";

    // Maximum allowed pool size.
    private static final int MAX_POOL_SIZE = 4;

    // HBase table pool.
    private HTablePool tblPool;

    // HBase column descriptor for first name.
    private HColumnDescriptor first = new HColumnDescriptor("firstName");

    // HBase column descriptor for last name.
    private HColumnDescriptor last = new HColumnDescriptor("lastName");

    public GridCacheHBasePersonStore() throws Exception {
        prepareDb();
    }

    // Load entry from HBase.
    @Override 
    public Person load(String cacheName, GridCacheTx tx, Long key) 
        throws GridException {
        HTableInterface t = tblPool.getTable(TABLE_NAME);

        try {
            Result r = t.get(new Get(Bytes.toBytes(key)));

            if (r == null)
                throw new GridException("Failed to load key: " + key);

            if (r.isEmpty())
                return null;

            Person p = new Person();

            p.setId(Bytes.toLong(r.getRow()));
            p.setFirst(Bytes.toString(r.getValue(first.getName(), null)));
            p.setLast(Bytes.toString(r.getValue(last.getName(), null)));
            
            return p;
        }
        catch (IOException e) {
            throw new GridException(e);
        }
        finally {
            close(t);
        }
    }

    // Store entry in HBase.
    @Override 
    public void put(String cacheName, GridCacheTx tx, Long key, Person val)
        throws GridException {
        HTableInterface t = tblPool.getTable(TABLE_NAME);

        try {
            t.put(new Put(Bytes.toBytes(key))
                .add(first.getName(), null, Bytes.toBytes(val.getFirst()))
                .add(last.getName(), null, Bytes.toBytes(val.getLast()));
        }
        catch (IOException e) {
            throw new GridException(e);
        }
        finally {
            close(t);
        }
    }

    // Remove entry from HBase.
    @Override 
    public void remove(String cacheName, GridCacheTx tx, Long key) 
        throws GridException {
        HTableInterface t = tblPool.getTable(TABLE_NAME);

        try {
            t.delete(new Delete(Bytes.toBytes(key)));
        }
        catch (IOException e) {
            throw new GridException(e);
        }
        finally {
            close(t);
        }
    }

    // Initialize HBase database.
    private void prepareDb() throws IOException {
        Configuration cfg = new Configuration();

        cfg.addResource(CONFIG_PATH);

        HBaseAdmin admin = new HBaseAdmin(cfg);

        if (!admin.tableExists(TABLE_NAME)) {
            HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);

            desc.addFamily(first);
            desc.addFamily(last);

            admin.createTable(desc);
        }

        tblPool = new HTablePool(cfg, MAX_POOL_SIZE);
    }

    // Close HBase Table.
    private void close(@Nullable HTableInterface t) {
        ...
    }
}

  To configure this store you should simply specify it in cache configuration and enable write-behind if you need data to be periodically flushed to HBase like so:

HBase Store Cache Configuration

><bean class="org.gridgain.grid.cache.GridCacheConfigurationAdapter">
    ...
    <!-- Setup HBase Cache Store. -->
    <property name="store">
        <bean class="GridCacheHBasePersonStore" scope="singleton"/>
    </property>

    <!-- Enable write-behind. -->
    <property name="writeBehindEnabled" value="true"/>
    ...
</bean>

 

 

Published at DZone with permission of Dmitriy Setrakyan, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)