SQL Zone is brought to you in partnership with:

Marek Rogozinski is a software architect, Solr / Lucene specialist and co-founder of solr.pl. Marek is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Data Import Handler – How to import data from SQL databases (part 2)

07.27.2011
| 5747 views |
  • submit to reddit

In the first part we were able to index the information contained in the database. In the second part we will try to extend the functionality by adding incremental imports.

There was a little over 1 million documents, and the import took less than half an hour. In principle, there could we end the issue of data import, but imagine that we would like this data to be indexed on an ongoing basis, as far as they change in the source. I won’t be, of course, true RTS (real time search) – there will be interval between the change in the data and time they will be indexed in the search system, but let’s assume that update every hour is sufficient. The first thing we must do in order to implement incremental indexing the database preparation.

Database preparation

Incremental indexing needs to obtain information from the database – what documents have changed since the last indexation. If we are lucky, such data is available – if you are unlucky, you must modify the existing database structure. Depending on the database structure we have several options. In our practice we used most often:

adding an additional column with the exact date of last modification, which were automatically updated (eg trigger or default/update it in mysql), or (worse solution) manually (by application)
create a queue of orders – to write (eg with a trigger) revised identifiers of the documents in a separate table

In both solutions we need to pay attention to the changes of all entities that are included in the document.

Returning to our example from the first part of the article (Polish wikipedia, imported into a PostgreSQL database, the mediawiki application tables), our structure looks like this:

Table “page“:

ColumnTypeModifiers
page_idintegernot null default nextval(‘page_page_id_seq’::regclass)
page_titletextnot null
page_restrictionstext 
page_counterbigintnot null default 0
page_is_redirectsmallintnot null default 0
page_is_newsmallintnot null default 0
page_randomnumeric(15,14)not null default random()
page_touched timestamp with time zone 
page_latest integernot null
page_lenintegernot null
titlevectortsvector 

Table: “revision“:

ColumnTypeModifier
rev_idintegernot null default nextval('revision_rev_id_seq'::regclass)
rev_pageinteger 
rev_text_idinteger 
rev_commenttext 
rev_userintegernot null
rev_user_texttextnot null
rev_timestamptimestamp with time zonenot null
rev_minor_editsmallintnot null default 0
rev_deletedsmallintnot null default 0
rev_leninteger 
rev_parent_idinteger 

Table: “pagecontent“:

ColumnTypeModifier
old_idintegernot null default nextval('text_old_id_seq'::regclass)
old_texttext 
old_flagstext 
textvectortsvector 

The first table contains a column “page_touched” Second: “rev_timestamp” what appears to be exactly what we need: the date of modification. The third table does not have such a field, but contains the texts for a specific version of the page – these texts do not change over time – when a user modifies a page, there is only the new version.

Let us recall the definition of the source from the first part of the article:

<dataConfig>
  <dataSource driver="org.postgresql.Driver"
     url="jdbc:postgresql://localhost:5432/wikipedia"
     user="wikipedia"
     password="secret" />
  <document>
    <entity name="page" query="SELECT page_id, page_title from page">
      <field column="page_id" name="id" />
      <field column="page_title" name="name" />
      <entity name="revision" query="select rev_id from revision where rev_page=${page.page_id}">
        <entity name="pagecontent" query="select old_text from pagecontent where old_id=${revision.rev_id}">
          <field column="old_text" name="text" />
        </entity>
      </entity>
   </entity>
  </document>
</dataConfig>

What we need to do is add the definitions of queries used in incremental indexing. Nothing could be simpler:

<dataConfig>
 <dataSource driver="org.postgresql.Driver" url="jdbc:postgresql://localhost:5432/wikipedia" user="wikipedia" password="secret" />
 <document>
  <entity name="page" query="SELECT page_id, page_title from page" deltaQuery="select page_id from page where page_touched > '${dataimporter.last_index_time}'" deltaImportQuery="SELECT page_id, page_title from page where page_id=${dataimporter.delta.page_id}">
   <field column="page_id" name="id" />
   <field column="page_title" name="name" />
   <entity name="revision" query="select rev_id from revision where rev_page=${page.page_id}" deltaQuery="select rev_id from revision where rev_timestamp > '${dataimporter.last_index_time}'" parentDeltaQuery="select page_id from page where page_id=${revision.rev_page}">
    <entity name="pagecontent" query="select old_text from pagecontent where old_id=${revision.rev_id}" deltaQuery="select old_id from pagecontent where old_id < 0">
     <field column="old_text" name="text" />
    </entity>
   </entity>
  </entity>
 </document>
</dataConfig>

 

Well – there are easier things:)

Comparing those two files the only difference that we see is an additional definitions of two attributes:

  • deltaQuery – query responsible for returning the IDs of those records that have changed since the last crawl (full or incremental) – the last crawl time is provided by DIH in the variable: ${dataimporter.last_index_time}. This query is used by Solr to find those records that have changed.
  • deltaImportQuery – query requesting data for a given record identified by ID that is avaiable as a DIH variable: ${dataimport.delta.id}.
  • parentDeltaQuery – query requesting data for the parent entity record. With these queries Solr is able to retrieve all the data that make up the document, regardless of the entity from which they originate. This is necessary because the indexing engine is not possible to modify the indexed data – so we need to index the entire document, regardless of the fact that some data has not changed.


In our example we do not remove the documents. Therefore, we eliminated the problem of taking deleted documents into account and of course the process of documents deleting from the index. If this functionality will be needed, we can use the process described above (with the queue of orders) and add another attribute. In the DIH configuration we would use the attribute:

  • deletedPkQuery – provides identifiers of deleted items.


In the next part of the article we will also sort out issues of cooperation with the database, we will try revise our database integration and do it in a slightly different way.

 


References
Published at DZone with permission of Marek Rogoziński, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags: