Cloud Zone is brought to you in partnership with:

Senior Software Developer at Citrix Systems Luis is a DZone MVB and is not an employee of DZone and has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

Google AppEngine's Task Queues API

03.27.2012
| 14840 views |
  • submit to reddit
Task Queues  com.google.appengine.api.taskqueue
With Task Queues a user can initiate a request to have applications perform work outside of this request;  they are a powerful tool for background work. 

Furthermore, you can organize work into small, discrete units (tasks). The application then inserts these tasks into one or more queues based on the queue's configuration and processes them in FIFO order. Here's a diagram I took from a Google IO presentation which illustrates at a high level task insertion into the queue:






Queue Configuration


1. Push Queues (default):
Push queue will process tasks based on the processing rate configured in the queue definition (see below). App Engine automatically manages the lifetime of these queues (creation, deletion, etc) and adjusts the processing capacity to match your configuration and processing volume. These can only be used within App Engine (internal to your app). 2. Pull Queues: Allow a task consumer to lease tasks at a specific time within a specific timeframe. They are accessible internally as well as externally through the Task Queue REST API. In this scenarion, however, GAE does not manage the lifecycle and processing rate of queues automatically, it is up to the developer to do it. A backend also has access to these queues.

Tasks
They represent a unit of work performed by application. Tasks are are idempotent, i.e they are unique in a queue and according to Google documentation cannot be invoked more than once simultaneously (unless some weird internal error condition happens). 


Instances of TaskOptions class, tasks consist of URL and a payload which can be a simple string, a binary object (byte[ ]), or an instance of a DeferredTask. A DeferredTask is basically a Runnable. This allows you to chain tasks together. Our team had to do this in order to simulate long runnings tasks when GAE's max execution limit was 30 seconds. Presently, a task must finish executing and send an HTTP response value between 200–299 within 10 minutes of the original request. This deadline is separate from user requests, which have a 60-second deadline.
Furthermore, tasks use token buckets to control the rate of task execution. Each time 
task is invoked, a token is used. This leasing model (acquire a token) is typically of 
brokering systems or message-passing systems and it allows users to control the rate of
execution of these tasks  (see below on configuring queues).


Lastly, a very important feature of the Task Queue API is that it has automatic retries of 
tasks.You can configure this with the RetriesOptions parameter when creating the 
TaskOptions object.

Task within a Transaction

Tasks can be enqueued as part of a datastore transaction. Insertion (not execution) will be guaranteed if the transaction was committed successfully. The only caveat is that Transactional tasks cannot have user-defined names and there is a maximum of 5 insertions into task queues in a single transaction.



Configuration
Queues are configured via queue.xml. If omitted, default queue with default configuration is used. Since Pull Queues are for more advanced needs, they must be specifically configured (there is no default pull queue). 


An application's, queue configuration applies to all versions of the app. You can override this behavior for push queues using the target parameter in queue.xml. This is used in case you want different versions of your app (different sites) with different queue processing configuration.


Here are some of things you are allowed to configure (the documentation is more extensive):
bucket-size: how fast the queue is processed when many tasks are in the queue and 
   the rate is high (push only). (Warning: Development server ignores this value) • max-concurrent-requests: maximum number of tasks that can be executed at any \
   given time in the specified queue (push only). • mode: whether it’s push or pull. • name: queue name • rate: How often tasks are processed on this queue (s=seconds, m=minutes, h=hours, 
   d=days). If 0, queue is considered paused. (Warning: Development server ignores this 
   value) • target: target a task to a specfic backend or application version.
<queue-entries>
<!--Set the number of max concurrent requests to 10-->  
<queue>    
     <name>optimize-queue</name>                 <rate>20/s</rate>  
     <bucket-size>40</bucket-size>      
   <max-concurrent-requests>10</max-concurrent-requests>     </queue> </queue-entries> 

 Sample Code
This is a very straightforward example. As I said before, task queues are basically a URL handler. In this servlet, the GET will handle enqueueing a task. The task will POST to this same servlet and execute the doPost( ) method carrying out the task. In this case, it's just a simple counter. Notice the counter is a volatile property. If you access this servlet as GET request, it will enqueue another task. So, you will see the counter being incremented by both tasks.

public class TaskQInfo extends HttpServlet {

   private static volatile int TASK_COUNTER = 0;


   // Executed by user menu click
   public void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws IOException {

       // Build a task using the TaskOptions Builder pattern from ** above
       Queue queue = QueueFactory.getDefaultQueue();
       queue.add(withUrl("/taskq_demo").method(TaskOptions.Method.POST)); 

       resp.getWriter().println( 
             "Task have been added to default queue...
");
       
       resp.getWriter().println(
             "Refresh this page to add another count task");
}

   // Executed by TaskQueue
   @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
       
       // This is the body of the task
       for(int i = 0; i < 1000; i++) {
             log.info("Processing: " + req.getHeader("X-AppEngine-TaskName") + "-" +           
               TASK_COUNTER++); 
              try { 
                 // Sleep for a second (if the rate is set to 1/s this will allow at 
                           most 1 more task to be processed)
                 Thread.sleep(1000); 
              } catch (InterruptedException e) { // ignore}
       }
    }
}

Task queues allow you to achieve some level of concurrency in your application by invoking background processes on demand. For very lengthy tasks, you might want to take a look at App Engine backends, which are basically special App Engine instances with no request time limit.

 

 

Published at DZone with permission of Luis Atencio, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)