blob: eeebd49e9d8a25ae52de118e55899cf55b2e877e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for use with the mapreduce library."""
__all__ = [
"create_datastore_write_config",
"for_name",
"get_short_name",
"handler_for_name",
"is_generator",
"parse_bool",
"HugeTask",
"HugeTaskHandler",
]
import base64
import cgi
import inspect
import logging
import zlib
import types
import urllib
from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.ext import db
from google.appengine.datastore import datastore_rpc
from mapreduce import base_handler
def for_name(fq_name, recursive=False):
"""Find class/function/method specified by its fully qualified name.
Fully qualified can be specified as:
* <module_name>.<class_name>
* <module_name>.<function_name>
* <module_name>.<class_name>.<method_name> (an unbound method will be
returned in this case).
for_name works by doing __import__ for <module_name>, and looks for
<class_name>/<function_name> in module's __dict__/attrs. If fully qualified
name doesn't contain '.', the current module will be used.
Args:
fq_name: fully qualified name of something to find
Returns:
class object.
Raises:
ImportError: when specified module could not be loaded or the class
was not found in the module.
"""
# if "." not in fq_name:
# raise ImportError("'%s' is not a full-qualified name" % fq_name)
fq_name = str(fq_name)
module_name = __name__
short_name = fq_name
if fq_name.rfind(".") >= 0:
(module_name, short_name) = (fq_name[:fq_name.rfind(".")],
fq_name[fq_name.rfind(".") + 1:])
try:
result = __import__(module_name, None, None, [short_name])
return result.__dict__[short_name]
except KeyError:
# If we're recursively inside a for_name() chain, then we want to raise
# this error as a key error so we can report the actual source of the
# problem. If we're *not* recursively being called, that means the
# module was found and the specific item could not be loaded, and thus
# we want to raise an ImportError directly.
if recursive:
raise
else:
raise ImportError("Could not find '%s' on path '%s'" % (
short_name, module_name))
except ImportError, e:
logging.debug("Could not import %s from %s. Will try recursively.",
short_name, module_name, exc_info=True)
# module_name is not actually a module. Try for_name for it to figure
# out what's this.
try:
module = for_name(module_name, recursive=True)
if hasattr(module, short_name):
return getattr(module, short_name)
else:
# The module was found, but the function component is missing.
raise KeyError()
except KeyError:
raise ImportError("Could not find '%s' on path '%s'" % (
short_name, module_name))
except ImportError:
# This means recursive import attempts failed, thus we will raise the
# first ImportError we encountered, since it's likely the most accurate.
pass
# Raise the original import error that caused all of this, since it is
# likely the real cause of the overall problem.
raise
def handler_for_name(fq_name):
"""Resolves and instantiates handler by fully qualified name.
First resolves the name using for_name call. Then if it resolves to a class,
instantiates a class, if it resolves to a method - instantiates the class and
binds method to the instance.
Args:
fq_name: fully qualified name of something to find.
Returns:
handler instance which is ready to be called.
"""
resolved_name = for_name(fq_name)
if isinstance(resolved_name, (type, types.ClassType)):
# create new instance if this is type
return resolved_name()
elif isinstance(resolved_name, types.MethodType):
# bind the method
return getattr(resolved_name.im_class(), resolved_name.__name__)
else:
return resolved_name
def is_generator(obj):
"""Return true if the object is generator or generator function.
Generator function objects provides same attributes as functions.
See isfunction.__doc__ for attributes listing.
Adapted from Python 2.6.
Args:
obj: an object to test.
Returns:
true if the object is generator function.
"""
if isinstance(obj, types.GeneratorType):
return True
CO_GENERATOR = 0x20
return bool(((inspect.isfunction(obj) or inspect.ismethod(obj)) and
obj.func_code.co_flags & CO_GENERATOR))
def get_short_name(fq_name):
"""Returns the last component of the name."""
return fq_name.split(".")[-1:][0]
def parse_bool(obj):
"""Return true if the object represents a truth value, false otherwise.
For bool and numeric objects, uses Python's built-in bool function. For
str objects, checks string against a list of possible truth values.
Args:
obj: object to determine boolean value of; expected
Returns:
Boolean value according to 5.1 of Python docs if object is not a str
object. For str objects, return True if str is in TRUTH_VALUE_SET
and False otherwise.
http://docs.python.org/library/stdtypes.html
"""
if type(obj) is str:
TRUTH_VALUE_SET = ["true", "1", "yes", "t", "on"]
return obj.lower() in TRUTH_VALUE_SET
else:
return bool(obj)
def create_datastore_write_config(mapreduce_spec):
"""Creates datastore config to use in write operations.
Args:
mapreduce_spec: current mapreduce specification as MapreduceSpec.
Returns:
an instance of datastore_rpc.Configuration to use for all write
operations in the mapreduce.
"""
force_writes = parse_bool(mapreduce_spec.params.get("force_writes", "false"))
if force_writes:
return datastore_rpc.Configuration(force_writes=force_writes)
else:
# dev server doesn't support force_writes.
return datastore_rpc.Configuration()
class _HugeTaskPayload(db.Model):
"""Model object to store task payload."""
payload = db.TextProperty()
@classmethod
def kind(cls):
"""Returns entity kind."""
return "_AE_MR_TaskPayload"
class HugeTask(object):
"""HugeTask is a taskqueue.Task-like class that can store big payloads.
Payloads are stored either in the task payload itself or in the datastore.
Task handlers should inherit from HugeTaskHandler class.
"""
PAYLOAD_PARAM = "__payload"
PAYLOAD_KEY_PARAM = "__payload_key"
MAX_TASK_PAYLOAD = 100000
MAX_DB_PAYLOAD = 1000000
def __init__(self,
url,
params,
name=None,
eta=None,
countdown=None):
self.url = url
self.params = params
self.name = name
self.eta = eta
self.countdown = countdown
def add(self, queue_name, transactional=False, parent=None):
"""Add task to the queue."""
payload_str = urllib.urlencode(self.params)
if len(payload_str) < self.MAX_TASK_PAYLOAD:
# Payload is small. Don't bother with anything.
task = self.to_task()
task.add(queue_name, transactional)
return
compressed_payload = base64.b64encode(zlib.compress(payload_str))
if len(compressed_payload) < self.MAX_TASK_PAYLOAD:
# Compressed payload is small. Don't bother with datastore.
task = taskqueue.Task(
url=self.url,
params={self.PAYLOAD_PARAM: compressed_payload},
name=self.name,
eta=self.eta,
countdown=self.countdown)
task.add(queue_name, transactional)
return
if len(compressed_payload) > self.MAX_DB_PAYLOAD:
raise Exception("Payload to big to be stored in database: %s",
len(compressed_payload))
# Store payload in the datastore.
if not parent:
raise Exception("Huge tasks should specify parent entity.")
payload_entity = _HugeTaskPayload(payload=compressed_payload,
parent=parent)
payload_key = payload_entity.put()
task = taskqueue.Task(
url=self.url,
params={self.PAYLOAD_KEY_PARAM: str(payload_key)},
name=self.name,
eta=self.eta,
countdown=self.countdown)
task.add(queue_name, transactional)
def to_task(self):
"""Convert to a taskqueue task without doing any kind of encoding."""
return taskqueue.Task(
url=self.url,
params=self.params,
name=self.name,
eta=self.eta,
countdown=self.countdown)
@classmethod
def decode_payload(cls, payload_dict):
if (not payload_dict.get(cls.PAYLOAD_PARAM) and
not payload_dict.get(cls.PAYLOAD_KEY_PARAM)):
return payload_dict
if payload_dict.get(cls.PAYLOAD_PARAM):
payload = payload_dict.get(cls.PAYLOAD_PARAM)
else:
payload_key = payload_dict.get(cls.PAYLOAD_KEY_PARAM)
payload_entity = _HugeTaskPayload.get(payload_key)
payload = payload_entity.payload
payload_str = zlib.decompress(base64.b64decode(payload))
result = {}
for (name, value) in cgi.parse_qs(payload_str).items():
if len(value) == 1:
result[name] = value[0]
else:
result[name] = value
return result
class HugeTaskHandler(base_handler.TaskQueueHandler):
"""Base handler for processing HugeTasks."""
class RequestWrapper(object):
def __init__(self, request):
self._request = request
self.path = self._request.path
self.headers = self._request.headers
self._encoded = True # we have encoded payload.
if (not self._request.get(HugeTask.PAYLOAD_PARAM) and
not self._request.get(HugeTask.PAYLOAD_KEY_PARAM)):
self._encoded = False
return
self._params = HugeTask.decode_payload(
{HugeTask.PAYLOAD_PARAM:
self._request.get(HugeTask.PAYLOAD_PARAM),
HugeTask.PAYLOAD_KEY_PARAM:
self._request.get(HugeTask.PAYLOAD_KEY_PARAM)})
def get(self, name, default=""):
if self._encoded:
return self._params.get(name, default)
else:
return self._request.get(name, default)
def set(self, name, value):
if self._encoded:
self._params.set(name, value)
else:
self._request.set(name, value)
def __init__(self, *args, **kwargs):
base_handler.TaskQueueHandler.__init__(self, *args, **kwargs)
def _setup(self):
base_handler.TaskQueueHandler._setup(self)
self.request = self.RequestWrapper(self.request)