# Task Queue API in Melange

## Introduction

Recently Google has released a new feature for Google Application Engine. It is Task Queue API which provides support for executing tasks in background. They can be extremely useful for various problems in which user does not have to be directly informed about completion of the task.
It is important for Melange to introduce this API, because there is a lot of areas in which Tasks usage would be very helpful.

Here is a brief list of examples:
  * Iteration through all entities in a data model
  * Updating entities in data model
  * Sending email notifications to all users/ students/ organization admins, etc.
  * Statistic collection

Most of these examples are problematic now, because they require dealing with a large number of entities, while we have to keep in mind GAE limitations which include: 30 seconds limit for handling one request and 1000 entities limitation. Currently, these tasks are performed using for example /remote\_api, but it is still worth to introduce Tasks.



&lt;Pawel&gt;

There is a solution how to iterate thru more than 1000 entities that was presented at Task Queue API Google I/O session
[(video here)](http://www.youtube.com/watch?v=o3TuRs9ANhs).

&lt;/Pawel&gt;




&lt;Daniel&gt;

Actually I meant that 1000 entities was a problem before Task Queues. Now, it is true, we can have a task which can start another task which continues the job. Previously, we could have simulated tasks in a similar way by requests, if it had been possible to redirect to the same url

&lt;/Daniel&gt;



## Initial design

### Tasks module

One module with simple functions that allow to control tasks (it is initially placed in soc.views.helper.tasks). The most important of them:



&lt;Pawel&gt;

Instead of putting it in soc.views.helper I would suggest putting it in soc.logic because it's actually logic that has nothing to do with view.
It doesn't even have to be in helper submodule.

&lt;/Pawel&gt;




&lt;Daniel&gt;

OK, I will change it

&lt;/Daniel&gt;



```
def startTask(url, context=None, **kwargs)
```

It adds a new task to the task queue (now it always uses default queue, but it can be easily changed so as to support other queues - if Melange needs them one day).



&lt;Pawel&gt;

Yes we need to change it to support different queues, we will have more queues for sure.

&lt;/Pawel&gt;




&lt;Daniel&gt;

OK, I will do it

&lt;/Daniel&gt;



Params usage:
` url ` - the relative worker URL for this Task
` context ` - a dictionary of params which will be passed to the request connected with the task. It is important to remember that this data will be delivered to the request/task as a standard POST data, so we can only send string arguments.

```
def terminateTask()
def repeatTask()
```

Functions that should be called from actual worker functions in order to terminate and repeat the task, respectively.  It is easier when one tries to imagine that task execution is very similar to standard requests. Actually, when a queue choses a task to be executed, it sends a http request to the given url and waits for a http response. The task is removed from the queue if and only if it send a OK http response (status 200). Otherwise the task is re-added to the queue and rescheduled. Therefore, the only way to permanently get rid of a task is to return OK http response.

### Tasks decorators

Now let us say, we want to write code for a new task. Of course we could write completely entire code for each task, but to reduce amount of duplicate code, we may use some decorators. There are currently two types of decorators defined in the views.helpers.decorators module.



&lt;Pawel&gt;

Maybe we should put all the tasks in soc.tasks instead of soc.views to differenciate them I know that tasks call normal URL so we need to register those urls but I think that from logical point of view it would be better to have tasks in separate module.

&lt;/Pawel&gt;




&lt;Daniel&gt;

 You are definitely right! It would be very thoughtful to have separate module just for tasks, but it will require a lot of work, so I will try to create a new module when I have more time. For example, as you mention in one of your next comments, we need to check access before a task is actually started to prevent random people from running them. As long as we have tasks in soc.views module, it is not a problem, because we can use standard methods. There is no point in duplicating code and re-writing the code in soc.tasks module, it has to be done much more wisely:)

&lt;/Daniel&gt;



#### Standard task decorator

```
def task(func)
```

It is the simplest decorator and it just adds error handling for a standard task function. Whenever the function raises  FatalTaskError, the task terminated. By all other types of Errors, the task is re-added to its task queue.

#### Iterative task decorator

As mentioned above, many tasks will require iteration through all entities for a specified model. Therefore, there is another decorator:

```
def iterative_task(func)
```

It is a wrapper that performs some operation for all entities in a model (or just for some entities specified by filter).

### Example

The easiest way to understand how this wrapper works and how it may be applied to some real jobs, let us consider the following example: we want to send all students, for a specified program, a email notification with some important information. Until now, Melange has been using remote\_api, but thanks to Task Queue API, it may become easier.

First of all we need to have an actual function for handling requests that send email notifications to the students.

```
def emailAllTask(self, request, access_type, page_name=None,
                 params=None, **kwargs):
  if 'entities' not in kwargs.keys():
    raise tasks.FatalTaskError("No entities applied")

  entities = kwargs['entities']
  for entitiy in entities:
     # Do actual stuff here
```

This function does not have to return any value, because it is wrapped in iterative\_task which is responsible for returning a HttpResponse object.

The most important thing is, however, that one can assume that kwargs contains a value for entities. Thus, he or she does not have to worry about how to retrieve data, but only care about what really has to be done.



&lt;Pawel&gt;

Where are we getting the entities ?

&lt;/Pawel&gt;




&lt;Daniel&gt;


The entities are retrieved by the decorator and that is why it is transparent to a developer and he or she can focus on actual task. If you take a look at the decorator code, you will see:
`entities, next_key = logic.getOneBatchOfData(filter, order, next_key)`
and that is why we have to pass logic to the decorator which I do not like, because the one way I know how to pass the logic is by name and import it by import. Another way would be to import all logics in the decorators.py file, pass only logic name to the task. Maybe there is a better way for doing it?


&lt;/Daniel&gt;





&lt;Pawel&gt;


I would suggest renaming getOneBatchOfData to getBatchOfData or getDataInBatch. Also The next\_key parameter should be renamed to start\_key cause it's actually a key where you start and next\_key is return by the method.


&lt;/Pawel&gt;



In order to use this function, it has to be decorated as a iterative task:

```
email_all_task = decorators.iterative_task(view.emailAllTask)
```

The last but not least thing is to map desired url with email\_all\_task by adding a new django pattern:

```
patterns +=
        [(r'^%(url_name)s/(?P<access_type>email_all)/%(key_fields)s$',
          'soc.views.models.%(module_name)s.email_all_task',
          "Email all students")]
```




&lt;Pawel&gt;

We need to remember that tasks urls need access checks too, we don't want random people to call the url and execute whatever tasks would execute.

&lt;/Pawel&gt;




&lt;Daniel&gt;

The access may be checked now in the same way, we check access for standard urls

&lt;/Daniel&gt;



And that is basically all with adding a new task, but there is one another thing that has to be done: the task has to be somehow started.

So there is another function which is responsible for setting up all params and starting a new task.

```
def emailAllTaskStarter(self, request, access_type, page_name=None,
                        params=None, **kwargs):

  logic = params['logic']
  link_id = kwargs['link_id']
  scope_path = kwargs['scope_path']
    
  fields = {
      'url_name': params['url_name'],
      'scope_path': scope_path,
      'link_id': link_id
      }   
    
  url = '/%(url_name)s/email_all_task/%(scope_path)s/%(link_id)s' % fields
    
  program = logic.getFromKeyFields(kwargs)
  scope_path = program.scope_path + '/' + program.link_id
    
  context = {
      'filter': simplejson.dumps({ 'scope_path': scope_path }),
      'logic': 'soc.logic.models.student'
      }
    
  task = tasks.startTask(url=url, context=context)
    
  return self.public(request, access_type, page_name=page_name, params=params, **kwargs)
```



&lt;Pawel&gt;

It would be nice if you could provide an example that interates thru more than 1000 entities based on Task Queue API example in the video

&lt;/Pawel&gt;




&lt;Daniel&gt;

Actually the example uses this method (which I used before watching Google I/O video:) - it is all done in decorator.


&lt;/Daniel&gt;



There are four things that are worth being mentioned:
  1. Final url is generated by applying concrete values to the url.
  1. The task will be executed by GAE in background, so we still need to redirect user to another view
  1. We provide 'logic' parameter by name - this is one of disadvantages of the iterative\_task wrapper. If anyone have an idea on how to fix it, it would be great!
  1. We provide 'filter' parameter as a JSON object.
The former two points results from fact that context is passed to a newly created task by POST arguments, so they are all mapped to strings.

The very last thing that has to be done is to map emailAllTaskStarter with another url.

```
patterns +=
        [(r'^%(url_name)s/(?P<access_type>email_all_task_starter)/%(key_fields)s$',
          'soc.views.models.%(module_name)s.email_all_task_starter',
          "Email all students")]

email_all_task_starter = decorators.view(view.emailAllTaskStarter)
```

## Things to remember while playing with Task Queue API
  * A single task has the same time limits (30 seconds) as standard requests. When the time limit is exceeded, GAE also raises a catchable DeadlineExceededError. After receiving this error, the application has about 1 second for finishing all jobs related with the task (request) and return a response. Otherwise uncatchable error is raised and the application fails.
  * Be careful with adding new tasks to the queue: the aim of tasks is that they have to be completed sooner or later - so a task is always re-added to its queue unless it returns status 200 response. In particular, it means that even if there is a bug in the source code, the task will be consequently added to the queue, but it has no chance to be completed. The problem is that currently GAE provides only API for adding new tasks, but there is no way to manage existing ones. Anyway, after some time, the uncompleted tasks are magically removed from the queue, but no documentation in this area was found. (Does anyone know something?)



&lt;Pawel&gt;

No I don't think there is any documentation for that yet, since Task Queue API is still an App Engine Labs API.

&lt;/Pawel&gt;



  * The tasks can be also tested locally on SDK. They have to be manually run from Admin Console.



&lt;Pawel&gt;

You are wrong :-) but you already know that (discussion on IRC). The Tasks can be manually run from Admin Console locally. It has even more options, because tasks may be removed from queues.

&lt;/Pawel&gt;




&lt;Daniel&gt;

Updated :-)

&lt;/Daniel&gt;



  * Task Queue API is still under development, so there may be some changes when it is finally released.

## Resources
  * [Google Application Engine page](http://code.google.com/appengine/docs/python/taskqueue/overview.html)
  * [Google App Engine Blog](http://googleappengine.blogspot.com/2009/06/new-task-queue-api-on-google-app-engine.html)
  * [Set of simple demos](http://googleappengine.googlecode.com/svn/trunk/python/demos/taskqueue_examples/)