Recently Google has released a new feature for Google Application Engine. It is Task Queue API which provides support for executing tasks in background. They can be extremely useful for various problems in which user does not have to be directly informed about completion of the task. It is important for Melange to introduce this API, because there is a lot of areas in which Tasks usage would be very helpful.
Here is a brief list of examples:
Most of these examples are problematic now, because they require dealing with a large number of entities, while we have to keep in mind GAE limitations which include: 30 seconds limit for handling one request and 1000 entities limitation. Currently, these tasks are performed using for example /remote_api, but it is still worth to introduce Tasks.
<Pawel>
There is a solution how to iterate thru more than 1000 entities that was presented at Task Queue API Google I/O session (video here).
</Pawel>
<Daniel>
Actually I meant that 1000 entities was a problem before Task Queues. Now, it is true, we can have a task which can start another task which continues the job. Previously, we could have simulated tasks in a similar way by requests, if it had been possible to redirect to the same url
</Daniel>
One module with simple functions that allow to control tasks (it is initially placed in soc.views.helper.tasks). The most important of them:
<Pawel>
Instead of putting it in soc.views.helper I would suggest putting it in soc.logic because it‘s actually logic that has nothing to do with view. It doesn’t even have to be in helper submodule.
</Pawel>
<Daniel>
OK, I will change it
</Daniel>
def startTask(url, context=None, **kwargs)
It adds a new task to the task queue (now it always uses default queue, but it can be easily changed so as to support other queues - if Melange needs them one day).
<Pawel>
Yes we need to change it to support different queues, we will have more queues for sure.
</Pawel>
<Daniel>
OK, I will do it
</Daniel>
Params usage: url
- the relative worker URL for this Task context
- a dictionary of params which will be passed to the request connected with the task. It is important to remember that this data will be delivered to the request/task as a standard POST data, so we can only send string arguments.
def terminateTask() def repeatTask()
Functions that should be called from actual worker functions in order to terminate and repeat the task, respectively. It is easier when one tries to imagine that task execution is very similar to standard requests. Actually, when a queue choses a task to be executed, it sends a http request to the given url and waits for a http response. The task is removed from the queue if and only if it send a OK http response (status 200). Otherwise the task is re-added to the queue and rescheduled. Therefore, the only way to permanently get rid of a task is to return OK http response.
Now let us say, we want to write code for a new task. Of course we could write completely entire code for each task, but to reduce amount of duplicate code, we may use some decorators. There are currently two types of decorators defined in the views.helpers.decorators module.
<Pawel>
Maybe we should put all the tasks in soc.tasks instead of soc.views to differenciate them I know that tasks call normal URL so we need to register those urls but I think that from logical point of view it would be better to have tasks in separate module.
</Pawel>
<Daniel>
You are definitely right! It would be very thoughtful to have separate module just for tasks, but it will require a lot of work, so I will try to create a new module when I have more time. For example, as you mention in one of your next comments, we need to check access before a task is actually started to prevent random people from running them. As long as we have tasks in soc.views module, it is not a problem, because we can use standard methods. There is no point in duplicating code and re-writing the code in soc.tasks module, it has to be done much more wisely:)
</Daniel>
def task(func)
It is the simplest decorator and it just adds error handling for a standard task function. Whenever the function raises FatalTaskError, the task terminated. By all other types of Errors, the task is re-added to its task queue.
As mentioned above, many tasks will require iteration through all entities for a specified model. Therefore, there is another decorator:
def iterative_task(func)
It is a wrapper that performs some operation for all entities in a model (or just for some entities specified by filter).
The easiest way to understand how this wrapper works and how it may be applied to some real jobs, let us consider the following example: we want to send all students, for a specified program, a email notification with some important information. Until now, Melange has been using remote_api, but thanks to Task Queue API, it may become easier.
First of all we need to have an actual function for handling requests that send email notifications to the students.
def emailAllTask(self, request, access_type, page_name=None, params=None, **kwargs): if 'entities' not in kwargs.keys(): raise tasks.FatalTaskError("No entities applied") entities = kwargs['entities'] for entitiy in entities: # Do actual stuff here
This function does not have to return any value, because it is wrapped in iterative_task which is responsible for returning a HttpResponse object.
The most important thing is, however, that one can assume that kwargs contains a value for entities. Thus, he or she does not have to worry about how to retrieve data, but only care about what really has to be done.
<Pawel>
Where are we getting the entities ?
</Pawel>
<Daniel>
The entities are retrieved by the decorator and that is why it is transparent to a developer and he or she can focus on actual task. If you take a look at the decorator code, you will see: entities, next_key = logic.getOneBatchOfData(filter, order, next_key)
and that is why we have to pass logic to the decorator which I do not like, because the one way I know how to pass the logic is by name and import it by import. Another way would be to import all logics in the decorators.py file, pass only logic name to the task. Maybe there is a better way for doing it?
</Daniel>
<Pawel>
I would suggest renaming getOneBatchOfData to getBatchOfData or getDataInBatch. Also The next_key parameter should be renamed to start_key cause it's actually a key where you start and next_key is return by the method.
</Pawel>
In order to use this function, it has to be decorated as a iterative task:
email_all_task = decorators.iterative_task(view.emailAllTask)
The last but not least thing is to map desired url with email_all_task by adding a new django pattern:
patterns += [(r'^%(url_name)s/(?P<access_type>email_all)/%(key_fields)s$', 'soc.views.models.%(module_name)s.email_all_task', "Email all students")]
<Pawel>
We need to remember that tasks urls need access checks too, we don't want random people to call the url and execute whatever tasks would execute.
</Pawel>
<Daniel>
The access may be checked now in the same way, we check access for standard urls
</Daniel>
And that is basically all with adding a new task, but there is one another thing that has to be done: the task has to be somehow started.
So there is another function which is responsible for setting up all params and starting a new task.
def emailAllTaskStarter(self, request, access_type, page_name=None, params=None, **kwargs): logic = params['logic'] link_id = kwargs['link_id'] scope_path = kwargs['scope_path'] fields = { 'url_name': params['url_name'], 'scope_path': scope_path, 'link_id': link_id } url = '/%(url_name)s/email_all_task/%(scope_path)s/%(link_id)s' % fields program = logic.getFromKeyFields(kwargs) scope_path = program.scope_path + '/' + program.link_id context = { 'filter': simplejson.dumps({ 'scope_path': scope_path }), 'logic': 'soc.logic.models.student' } task = tasks.startTask(url=url, context=context) return self.public(request, access_type, page_name=page_name, params=params, **kwargs)
<Pawel>
It would be nice if you could provide an example that interates thru more than 1000 entities based on Task Queue API example in the video
</Pawel>
<Daniel>
Actually the example uses this method (which I used before watching Google I/O video:) - it is all done in decorator.
</Daniel>
There are four things that are worth being mentioned:
The very last thing that has to be done is to map emailAllTaskStarter with another url.
patterns += [(r'^%(url_name)s/(?P<access_type>email_all_task_starter)/%(key_fields)s$', 'soc.views.models.%(module_name)s.email_all_task_starter', "Email all students")] email_all_task_starter = decorators.view(view.emailAllTaskStarter)
<Pawel>
No I don't think there is any documentation for that yet, since Task Queue API is still an App Engine Labs API.
</Pawel>
<Pawel>
You are wrong :-) but you already know that (discussion on IRC). The Tasks can be manually run from Admin Console locally. It has even more options, because tasks may be removed from queues.
</Pawel>
<Daniel>
Updated :-)
</Daniel>