Scott is a Senior Software Architect at Altamira Corporation. He has been developing enterprise and web applications for over 15 years professionally, and has developed applications using Java, Ruby/Rails, Groovy/Grails and Python. His main areas of interest include object-oriented design, system architecture, testing, and frameworks of all types including Spring, Hibernate, Ruby on Rails, Grails, and Django. In addition, Scott enjoys learning new languages to make himself a better and more well-rounded developer a la The Pragmatic Programmers' advice to "learn one language per year." Scott is a DZone MVB and is not an employee of DZone and has posted 43 posts at DZone. You can read more from them at their website. View Full User Profile

Getting Started with "Blur" - Search on Top of Hadoop and Lucene.

03.16.2012
| 9068 views |
  • submit to reddit

Blur is a new Apache 2.0 licensed software project that provides a search capability built on top of Hadoop and Lucene. Elastic Search and Solr already exist so why build something new? While these projects work well, they didn't have a solid integration with the Hadoop ecosystem. Blur was built specifically for Big Data, taking scalability, redundancy, and performance into consideration from the very start, while leveraging all the goodness that already exists in the Hadoop stack.

A year and a half ago, my project began using Hadoop for data processing. Very early on, we were having networking issues that would make our HDFS cluster network connectivity spotty at best. Over one weekend in particular, we steadily lost network connection to 47 of the 90 data nodes in the cluster. When we came in on Monday morning, I noticed that the MapReduce system was a little sluggish but still working. When I checked HDFS I saw that our capacity had dropped by about 50%. After running an fsck on the cluster I was amazed to find that what seemed like a catastrophic failure over the weekend resulted in a still healthy file system. This experience left a lasting impression on me. It was then that I got the idea to somehow leverage the redundancy and fault tolerance of HDFS for the next version of a search system that I was just beginning to (re)write.

I had already written a custom sharded Lucene server that had been in a production system for a couple of years. Lucene worked really well and did everything that we needed for search. The issue that we faced was that it was running on big iron that was not redundant and could not be easily expanded. After seeing the resilient characteristics of Hadoop first hand, I decided to look into marrying the already mature and impressive feature set of Lucene with the built in redundancy and scalability of the Hadoop platform. From this experiment Blur was created.

The biggest technical issues/features that Blur solves:

  • Rapid mass indexing of entire datasets
  • Automatic Shard Server Failover
  • Near Real-time update compatibility via Lucene NRT
  • Compression of Lucene FDT files while maintaining random access performance
  • Lucene WAL (Write Ahead Log) to provide data reliability
  • Lucene R/W directly into HDFS (the seek on write problem)
  • Random access performance with block caching of the Lucene Directory

Data Model

Data in Blur is stored in Tables that contain Rows. Rows must have a unique row id and contain one or more Records. Records have a unique record id (unique within the Row) and a column family for grouping columns that logically make up a single record. Columns contain a name and a value, and a Record can contain multiple columns with the same name.

{
  rowid:"user_a@server.com",
  records:[
    {
      recordid:"324182347",
      family:"messages",
      columns:[
        {name:"to",value:"user_b@server.com"},
        {name:"to",value:"user_c@server.com"},
        {name:"subject",value:"important!"},
        {name:"body",value:"This is a very important email...."}
      ]
    }, {
      recordid:"234387219",
      family:"messages",
      columns:[
        {name:"to",value:"user_b@server.com"},
        {name:"subject",value:"This is cool!"},
        {name:"body",value:"Check this out....."}
      ]
    }, {
      recordid:"234123412",
      family:"contacts",
      columns:[
        {name:"name",value:"Jon Doe"},
        {name:"email",value:"user_d@server.com"}
      ]
    }
  ]
}

Architecture

Blur uses Hadoop's MapReduce framework for indexing data, and Hadoop's HDFS filesystem for storing indexes. Thrift is used for all inter-process communications and Zookeeper is used to know the state of the system and to store meta data. The Blur architecture is made up of two types of server processes:

  • Blur Controller Server
  • Blur Shard Server

The shard server, serves 0 or more shards from all the currently online tables. The calculation of the what shards are online in each shard server is done through the state information in Zookeeper. If a shard server goes down, through interaction with Zookeeper the remaining shard servers detect the failure and determine which if any of the missing shards they need to serve from HDFS.

The controller server provides a single point of entry (logically) to the cluster for spraying out queries, collecting the responses, and providing a single response. Both the controller and shard servers expose the same Thrift API which helps to ease debugging. It also allows developers to start a single shard server and interact with it the same way they would with a large cluster. Many controller servers can be (and should be) run for redundancy. The controllers act as gateways to all of the data that is being served by the shard servers.

Updating / Loading Data

Currently there are two ways to load and update data. The first is through a bulk load in MapReduce and the second is through mutation calls in Thrift.

Bulk Load MapReduce Example

public class BlurMapReduce {
  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration configuration = new Configuration();
    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: blurindexer <in> <out>");
      System.exit(2);
    }

    AnalyzerDefinition ad = new AnalyzerDefinition();

    TableDescriptor td = new TableDescriptor();
    td.setShardCount(16);
    // Location in HDFS
    td.setTableUri("hdfs://<namenode>:<port>/blur/tables/test-table");
    td.setAnalyzerDefinition(ad);

    BlurTask blurTask = new BlurTask();
    blurTask.setTableDescriptor(td);
    blurTask.setSpinLockPath("/copy-locks");
    blurTask.setZookeeperConnectionStr("localhost");
    blurTask.setMaxNumberOfConcurrentCopies(10);

    // The copy locks are used to throttle how many concurrent
    // copies from the reducers are occuring at the same time.
    // This is normally needed because the indexing cluster is
    // typically larger in size than the blur cluster.

    Job job = blurTask.configureJob(new Configuration());
    job.setJarByClass(BlurExampleMapper.class);
    job.setMapperClass(BlurExampleMapper.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
    @Override
    protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
      // Reset record
      _record.clearColumns();

      // Set row id
      _record.setRowId("rowid");

      // Set record id
      _record.setRecordId("recordid");

      // Set column family
      _record.setColumnFamily("cf1");
      
      _record.addColumn("name", "value");

      // Set the key which is usual the rowid
      byte[] bs = _record.getRowId().getBytes();
      _key.set(bs, 0, bs.length);
      context.write(_key, _record);
      _recordCounter.increment(1);
      context.progress();
    }
  }
}

Data Mutation Thrift Example

import static com.nearinfinity.blur.utils.BlurUtil.*;

public class ThriftMutationExample {
  public static void main(String[] args) throws BlurException, TException, IOException {
    final RowMutation mutation = newRowMutation("test-table", "rowid-1234",
        newRecordMutation("column-family", "recordid-5678",
            newColumn("columnname", "value")));
    
    BlurClientManager.execute("controller1:40010", new BlurCommand<Void>() {
      @Override
      public Void call(Client client) throws BlurException, TException {
        client.mutate(mutation);
        return null;
      }
    });
  }
}

Searching Data

Any element in the Blur data model is searchable through the normal Lucene semantics: analyzers. Analyzers are defined per Blur table.

The standard Lucene query syntax is the default way to search Blur. If anything outside of the standard syntax is needed, you can create a Lucene query directly with Java objects, and submit them through the expert query API.

The column family grouping within Rows allows for results to be discovered across column families similar to what you would get with an inner join across two tables that share the same key (or in this case rowid). For complicated data models that have multiple column families, this makes for a very powerful search capability.

The following example searches for "value" as a full text search. If I had wanted to search for "value" in a single field like column "colA" in column family "famB" the query would look like "famB.colA:value".

public class ThriftSearchExample {

  public static void main(String[] args) throws BlurException, TException, IOException {
    BlurResults blurResults = BlurClientManager.execute("controller1:40010", new BlurCommand<BlurResults>() {
      @Override
      public BlurResults call(Client client) throws BlurException, TException {
        BlurQuery blurQuery = new BlurQuery();
        SimpleQuery simpleQuery = new SimpleQuery();
        simpleQuery.setQueryStr("value");
        blurQuery.setSimpleQuery(simpleQuery);
        blurQuery.setSelector(new Selector());
        return client.query("test-table", blurQuery);
      }
    });
    for (BlurResult result : blurResults.getResults()) {
       // do something with the result
    }
  }
}

Fetching Data

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

public class ThriftFetchExample {
  public static void main(String[] args) throws BlurException, TException, IOException {
    Row row = BlurClientManager.execute("controller1:40010", new BlurCommand<Row>() {
      @Override
      public Row call(Client client) throws BlurException, TException {
        Selector selector = new Selector();
        selector.setRowId("rowid-1234");
        FetchResult fetchRow = client.fetchRow("test-table", selector);
        FetchRowResult rowResult = fetchRow.getRowResult();
        return rowResult.getRow();
      }
    });
  }
}

Current State

Blur is nearing it's first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io




Published at DZone with permission of Scott Leberknight, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)