blob: 2aac1fb4f4f53f1b9a710c6d6c4794096e35489e [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines input readers for MapReduce."""
__all__ = [
"AbstractDatastoreInputReader",
"ALLOW_CHECKPOINT",
"BadReaderParamsError",
"BlobstoreLineInputReader",
"BlobstoreZipInputReader",
"BlobstoreZipLineInputReader",
"COUNTER_IO_READ_BYTES",
"COUNTER_IO_READ_MSEC",
"ConsistentKeyReader",
"DatastoreEntityInputReader",
"DatastoreInputReader",
"DatastoreKeyInputReader",
"Error",
"InputReader",
"LogInputReader",
"NamespaceInputReader",
"RecordsReader",
]
# pylint: disable-msg=C6409
import base64
import copy
import StringIO
import time
import zipfile
from google.net.proto import ProtocolBuffer
from google.appengine.api import datastore
from mapreduce.lib import files
from google.appengine.api import logservice
from mapreduce.lib.files import records
from google.appengine.api.logservice import log_service_pb
from google.appengine.datastore import datastore_query
from google.appengine.datastore import datastore_rpc
from google.appengine.ext import blobstore
from google.appengine.ext import db
from mapreduce.lib import key_range
from google.appengine.ext.db import metadata
from mapreduce import context
from mapreduce import errors
from mapreduce import model
from mapreduce import namespace_range
from mapreduce import operation
from mapreduce import util
# Classes moved to errors module. Copied here for compatibility.
Error = errors.Error
BadReaderParamsError = errors.BadReaderParamsError
# Counter name for number of bytes read.
COUNTER_IO_READ_BYTES = "io-read-bytes"
# Counter name for milliseconds spent reading data.
COUNTER_IO_READ_MSEC = "io-read-msec"
# Special value that can be yielded by InputReaders if they want to give the
# framework an opportunity to save the state of the mapreduce without having
# to yield an actual value to the handler.
ALLOW_CHECKPOINT = object()
class InputReader(model.JsonMixin):
"""Abstract base class for input readers.
InputReaders have the following properties:
* They are created by using the split_input method to generate a set of
InputReaders from a MapperSpec.
* They generate inputs to the mapper via the iterator interface.
* After creation, they can be serialized and resumed using the JsonMixin
interface.
* They are cast to string for a user-readable description; it may be
valuable to implement __str__.
"""
# When expand_parameters is False, then value yielded by reader is passed
# to handler as is. If it's true, then *value is passed, expanding arguments
# and letting handler be a multi-parameter function.
expand_parameters = False
# Mapreduce parameters.
_APP_PARAM = "_app"
NAMESPACE_PARAM = "namespace"
NAMESPACES_PARAM = "namespaces" # Obsolete.
def __iter__(self):
return self
def next(self):
"""Returns the next input from this input reader as a key, value pair.
Returns:
The next input from this input reader.
"""
raise NotImplementedError("next() not implemented in %s" % self.__class__)
@classmethod
def from_json(cls, input_shard_state):
"""Creates an instance of the InputReader for the given input shard state.
Args:
input_shard_state: The InputReader state as a dict-like object.
Returns:
An instance of the InputReader configured using the values of json.
"""
raise NotImplementedError("from_json() not implemented in %s" % cls)
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A json-izable version of the remaining InputReader.
"""
raise NotImplementedError("to_json() not implemented in %s" %
self.__class__)
@classmethod
def split_input(cls, mapper_spec):
"""Returns a list of input readers for the input spec.
Args:
mapper_spec: The MapperSpec for this InputReader.
Returns:
A list of InputReaders.
"""
raise NotImplementedError("split_input() not implemented in %s" % cls)
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
raise NotImplementedError("validate() not implemented in %s" % cls)
# TODO(user): This should probably be renamed something like
# "DatastoreInputReader" and DatastoreInputReader should be called
# "DatastoreModelReader".
class AbstractDatastoreInputReader(InputReader):
"""Abstract base class for classes that iterate over datastore entities.
Concrete subclasses must implement _iter_key_range(self, k_range). See the
docstring for that method for details.
"""
# Number of entities to fetch at once while doing scanning.
_BATCH_SIZE = 50
# Maximum number of shards we'll create.
_MAX_SHARD_COUNT = 256
# __scatter__ oversampling factor
_OVERSAMPLING_FACTOR = 32
# The maximum number of namespaces that will be sharded by datastore key
# before switching to a strategy where sharding is done lexographically by
# namespace.
MAX_NAMESPACES_FOR_KEY_SHARD = 10
# Mapreduce parameters.
ENTITY_KIND_PARAM = "entity_kind"
KEYS_ONLY_PARAM = "keys_only"
BATCH_SIZE_PARAM = "batch_size"
KEY_RANGE_PARAM = "key_range"
NAMESPACE_RANGE_PARAM = "namespace_range"
CURRENT_KEY_RANGE_PARAM = "current_key_range"
# TODO(user): Add support for arbitrary queries. It's not possible to
# support them without cursors since right now you can't even serialize query
# definition.
def __init__(self,
entity_kind,
key_ranges=None,
ns_range=None,
batch_size=_BATCH_SIZE,
current_key_range=None):
"""Create new AbstractDatastoreInputReader object.
This is internal constructor. Use split_query in a concrete class instead.
Args:
entity_kind: entity kind as string.
key_ranges: a sequence of key_range.KeyRange instances to process. Only
one of key_ranges or ns_range can be non-None.
ns_range: a namespace_range.NamespaceRange to process. Only one of
key_ranges or ns_range can be non-None.
batch_size: size of read batch as int.
current_key_range: the current key_range.KeyRange being processed.
"""
assert key_ranges is not None or ns_range is not None, (
"must specify one of 'key_ranges' or 'ns_range'")
assert key_ranges is None or ns_range is None, (
"can't specify both 'key_ranges ' and 'ns_range'")
self._entity_kind = entity_kind
# Reverse the KeyRanges so they can be processed in order as a stack of
# work items.
self._key_ranges = key_ranges and list(reversed(key_ranges))
self._ns_range = ns_range
self._batch_size = int(batch_size)
self._current_key_range = current_key_range
def __iter__(self):
"""Iterates over the given KeyRanges or NamespaceRange.
This method iterates over the given KeyRanges or NamespaceRange and sets
the self._current_key_range to the KeyRange currently being processed. It
then delegates to the _iter_key_range method to yield that actual
results.
Yields:
Forwards the objects yielded by the subclasses concrete _iter_key_range()
method. The caller must consume the result yielded because self.to_json()
will not include it.
"""
if self._key_ranges is not None:
for o in self._iter_key_ranges():
yield o
elif self._ns_range is not None:
for o in self._iter_ns_range():
yield o
else:
assert False, "self._key_ranges and self._ns_range are both None"
def _iter_key_ranges(self):
"""Iterates over self._key_ranges, delegating to self._iter_key_range()."""
while True:
if self._current_key_range is None:
if self._key_ranges:
self._current_key_range = self._key_ranges.pop()
# The most recently popped key_range may be None, so continue here
# to find the next keyrange that's valid.
continue
else:
break
for key, o in self._iter_key_range(
copy.deepcopy(self._current_key_range)):
# The caller must consume yielded values so advancing the KeyRange
# before yielding is safe.
self._current_key_range.advance(key)
yield o
self._current_key_range = None
def _iter_ns_range(self):
"""Iterates over self._ns_range, delegating to self._iter_key_range()."""
while True:
if self._current_key_range is None:
query = self._ns_range.make_datastore_query()
namespace_result = query.Get(1)
if not namespace_result:
break
namespace = namespace_result[0].name() or ""
self._current_key_range = key_range.KeyRange(
namespace=namespace, _app=self._ns_range.app)
yield ALLOW_CHECKPOINT
for key, o in self._iter_key_range(
copy.deepcopy(self._current_key_range)):
# The caller must consume yielded values so advancing the KeyRange
# before yielding is safe.
self._current_key_range.advance(key)
yield o
if (self._ns_range.is_single_namespace or
self._current_key_range.namespace == self._ns_range.namespace_end):
break
self._ns_range = self._ns_range.with_start_after(
self._current_key_range.namespace)
self._current_key_range = None
def _iter_key_range(self, k_range):
"""Yields a db.Key and the value that should be yielded by self.__iter__().
Args:
k_range: The key_range.KeyRange to iterate over.
Yields:
A 2-tuple containing the last db.Key processed and the value that should
be yielded by __iter__. The returned db.Key will be used to determine the
InputReader's current position in self._current_key_range.
"""
raise NotImplementedError("_iter_key_range() not implemented in %s" %
self.__class__)
def __str__(self):
"""Returns the string representation of this InputReader."""
if self._ns_range is None:
return repr(self._key_ranges)
else:
return repr(self._ns_range)
@classmethod
def _choose_split_points(cls, sorted_keys, shard_count):
"""Returns the best split points given a random set of db.Keys."""
assert len(sorted_keys) >= shard_count
index_stride = len(sorted_keys) / float(shard_count)
return [sorted_keys[int(round(index_stride * i))]
for i in range(1, shard_count)]
# TODO(user): use query splitting functionality when it becomes available
# instead.
@classmethod
def _split_input_from_namespace(cls, app, namespace, entity_kind_name,
shard_count):
"""Return KeyRange objects. Helper for _split_input_from_params.
If there are not enough Entities to make all of the given shards, the
returned list of KeyRanges will include Nones. The returned list will
contain KeyRanges ordered lexographically with any Nones appearing at the
end.
"""
raw_entity_kind = util.get_short_name(entity_kind_name)
if shard_count == 1:
# With one shard we don't need to calculate any splitpoints at all.
return [key_range.KeyRange(namespace=namespace, _app=app)]
# we use datastore.Query instead of ext.db.Query here, because we can't
# erase ordering on db.Query once we set it.
ds_query = datastore.Query(kind=raw_entity_kind,
namespace=namespace,
_app=app,
keys_only=True)
ds_query.Order("__scatter__")
random_keys = ds_query.Get(shard_count * cls._OVERSAMPLING_FACTOR)
if not random_keys:
# There are no entities with scatter property. We have no idea
# how to split.
return ([key_range.KeyRange(namespace=namespace, _app=app)] +
[None] * (shard_count - 1))
random_keys.sort()
if len(random_keys) >= shard_count:
# We've got a lot of scatter values. Sample them down.
random_keys = cls._choose_split_points(random_keys, shard_count)
key_ranges = []
key_ranges.append(key_range.KeyRange(
key_start=None,
key_end=random_keys[0],
direction=key_range.KeyRange.ASC,
include_start=False,
include_end=False,
namespace=namespace,
_app=app))
for i in range(0, len(random_keys) - 1):
key_ranges.append(key_range.KeyRange(
key_start=random_keys[i],
key_end=random_keys[i+1],
direction=key_range.KeyRange.ASC,
include_start=True,
include_end=False,
namespace=namespace,
_app=app))
key_ranges.append(key_range.KeyRange(
key_start=random_keys[-1],
key_end=None,
direction=key_range.KeyRange.ASC,
include_start=True,
include_end=False,
namespace=namespace,
_app=app))
if len(key_ranges) < shard_count:
# We need to have as many shards as it was requested. Add some Nones.
key_ranges = key_ranges + [None] * (shard_count - len(key_ranges))
return key_ranges
@classmethod
def _split_input_from_params(cls, app, namespaces, entity_kind_name,
params, shard_count):
"""Return input reader objects. Helper for split_input."""
key_ranges = [] # KeyRanges for all namespaces
for namespace in namespaces:
key_ranges.extend(
cls._split_input_from_namespace(app,
namespace,
entity_kind_name,
shard_count))
# Divide the KeyRanges into shard_count shards. The KeyRanges for different
# namespaces might be very different in size so the assignment of KeyRanges
# to shards is done round-robin.
shared_ranges = [[] for _ in range(shard_count)]
for i, k_range in enumerate(key_ranges):
shared_ranges[i % shard_count].append(k_range)
batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
return [cls(entity_kind_name,
key_ranges=key_ranges,
ns_range=None,
batch_size=batch_size)
for key_ranges in shared_ranges if key_ranges]
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise BadReaderParamsError("Input reader class mismatch")
params = mapper_spec.params
if cls.ENTITY_KIND_PARAM not in params:
raise BadReaderParamsError("Missing mapper parameter 'entity_kind'")
if cls.BATCH_SIZE_PARAM in params:
try:
batch_size = int(params[cls.BATCH_SIZE_PARAM])
if batch_size < 1:
raise BadReaderParamsError("Bad batch size: %s" % batch_size)
except ValueError, e:
raise BadReaderParamsError("Bad batch size: %s" % e)
if cls.NAMESPACE_PARAM in params:
if not isinstance(params[cls.NAMESPACE_PARAM],
(str, unicode, type(None))):
raise BadReaderParamsError(
"Expected a single namespace string")
if cls.NAMESPACES_PARAM in params:
raise BadReaderParamsError("Multiple namespaces are no longer supported")
@classmethod
def split_input(cls, mapper_spec):
"""Splits query into shards without fetching query results.
Tries as best as it can to split the whole query result set into equal
shards. Due to difficulty of making the perfect split, resulting shards'
sizes might differ significantly from each other.
Args:
mapper_spec: MapperSpec with params containing 'entity_kind'.
May have 'namespace' in the params as a string containing a single
namespace. If specified then the input reader will only yield values
in the given namespace. If 'namespace' is not given then values from
all namespaces will be yielded. May also have 'batch_size' in the params
to specify the number of entities to process in each batch.
Returns:
A list of InputReader objects. If the query results are empty then the
empty list will be returned. Otherwise, the list will always have a length
equal to number_of_shards but may be padded with Nones if there are too
few results for effective sharding.
"""
params = mapper_spec.params
entity_kind_name = params[cls.ENTITY_KIND_PARAM]
batch_size = int(params.get(cls.BATCH_SIZE_PARAM, cls._BATCH_SIZE))
shard_count = mapper_spec.shard_count
namespace = params.get(cls.NAMESPACE_PARAM)
app = params.get(cls._APP_PARAM)
if namespace is None:
# It is difficult to efficiently shard large numbers of namespaces because
# there can be an arbitrary number of them. So the strategy is:
# 1. if there are a small number of namespaces in the datastore then
# generate one KeyRange per namespace per shard and assign each shard a
# KeyRange for every namespace. This should lead to nearly perfect
# sharding.
# 2. if there are a large number of namespaces in the datastore then
# generate one NamespaceRange per worker. This can lead to very bad
# sharding because namespaces can contain very different numbers of
# entities and each NamespaceRange may contain very different numbers
# of namespaces.
namespace_query = datastore.Query("__namespace__",
keys_only=True,
_app=app)
namespace_keys = namespace_query.Get(
limit=cls.MAX_NAMESPACES_FOR_KEY_SHARD+1)
if len(namespace_keys) > cls.MAX_NAMESPACES_FOR_KEY_SHARD:
ns_ranges = namespace_range.NamespaceRange.split(n=shard_count,
contiguous=True,
_app=app)
return [cls(entity_kind_name,
key_ranges=None,
ns_range=ns_range,
batch_size=batch_size)
for ns_range in ns_ranges]
elif not namespace_keys:
return [cls(entity_kind_name,
key_ranges=None,
ns_range=namespace_range.NamespaceRange(),
batch_size=shard_count)]
else:
namespaces = [namespace_key.name() or ""
for namespace_key in namespace_keys]
else:
namespaces = [namespace]
return cls._split_input_from_params(
app, namespaces, entity_kind_name, params, shard_count)
def to_json(self):
"""Serializes all the data in this query range into json form.
Returns:
all the data in json-compatible map.
"""
if self._key_ranges is None:
key_ranges_json = None
else:
key_ranges_json = []
for k in self._key_ranges:
if k:
key_ranges_json.append(k.to_json())
else:
key_ranges_json.append(None)
if self._ns_range is None:
namespace_range_json = None
else:
namespace_range_json = self._ns_range.to_json_object()
if self._current_key_range is None:
current_key_range_json = None
else:
current_key_range_json = self._current_key_range.to_json()
json_dict = {self.KEY_RANGE_PARAM: key_ranges_json,
self.NAMESPACE_RANGE_PARAM: namespace_range_json,
self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
self.ENTITY_KIND_PARAM: self._entity_kind,
self.BATCH_SIZE_PARAM: self._batch_size}
return json_dict
@classmethod
def from_json(cls, json):
"""Create new DatastoreInputReader from the json, encoded by to_json.
Args:
json: json map representation of DatastoreInputReader.
Returns:
an instance of DatastoreInputReader with all data deserialized from json.
"""
if json[cls.KEY_RANGE_PARAM] is None:
key_ranges = None
else:
key_ranges = []
for k in json[cls.KEY_RANGE_PARAM]:
if k:
key_ranges.append(key_range.KeyRange.from_json(k))
else:
key_ranges.append(None)
if json[cls.NAMESPACE_RANGE_PARAM] is None:
ns_range = None
else:
ns_range = namespace_range.NamespaceRange.from_json_object(
json[cls.NAMESPACE_RANGE_PARAM])
if json[cls.CURRENT_KEY_RANGE_PARAM] is None:
current_key_range = None
else:
current_key_range = key_range.KeyRange.from_json(
json[cls.CURRENT_KEY_RANGE_PARAM])
return cls(
json[cls.ENTITY_KIND_PARAM],
key_ranges,
ns_range,
json[cls.BATCH_SIZE_PARAM],
current_key_range)
class DatastoreInputReader(AbstractDatastoreInputReader):
"""Represents a range in query results.
DatastoreInputReader yields model instances from the entities in a given key
range. Iterating over DatastoreInputReader changes its range past consumed
entries.
The class shouldn't be instantiated directly. Use the split_input class method
instead.
"""
def _iter_key_range(self, k_range):
cursor = None
while True:
query = k_range.make_ascending_query(
util.for_name(self._entity_kind))
if cursor:
query.with_cursor(cursor)
results = query.fetch(limit=self._batch_size)
if not results:
break
for model_instance in results:
key = model_instance.key()
yield key, model_instance
cursor = query.cursor()
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
super(DatastoreInputReader, cls).validate(mapper_spec)
params = mapper_spec.params
keys_only = util.parse_bool(params.get(cls.KEYS_ONLY_PARAM, False))
if keys_only:
raise BadReaderParamsError("The keys_only parameter is obsolete. "
"Use DatastoreKeyInputReader instead.")
entity_kind_name = params[cls.ENTITY_KIND_PARAM]
# Fail fast if Model cannot be located.
try:
util.for_name(entity_kind_name)
except ImportError, e:
raise BadReaderParamsError("Bad entity kind: %s" % e)
class DatastoreKeyInputReader(AbstractDatastoreInputReader):
"""An input reader which takes a Kind and yields Keys for that kind."""
def _iter_key_range(self, k_range):
raw_entity_kind = util.get_short_name(self._entity_kind)
query = k_range.make_ascending_datastore_query(
raw_entity_kind, keys_only=True)
for key in query.Run(
config=datastore_query.QueryOptions(batch_size=self._batch_size)):
yield key, key
class DatastoreEntityInputReader(AbstractDatastoreInputReader):
"""An input reader which yields low level datastore entities for a kind."""
def _iter_key_range(self, k_range):
raw_entity_kind = util.get_short_name(self._entity_kind)
query = k_range.make_ascending_datastore_query(
raw_entity_kind)
for entity in query.Run(
config=datastore_query.QueryOptions(batch_size=self._batch_size)):
yield entity.key(), entity
class BlobstoreLineInputReader(InputReader):
"""Input reader for a newline delimited blob in Blobstore."""
# TODO(user): Should we set this based on MAX_BLOB_FETCH_SIZE?
_BLOB_BUFFER_SIZE = 64000
# Maximum number of shards to allow.
_MAX_SHARD_COUNT = 256
# Maximum number of blobs to allow.
_MAX_BLOB_KEYS_COUNT = 246
# Mapreduce parameters.
BLOB_KEYS_PARAM = "blob_keys"
# Serialization parmaeters.
INITIAL_POSITION_PARAM = "initial_position"
END_POSITION_PARAM = "end_position"
BLOB_KEY_PARAM = "blob_key"
def __init__(self, blob_key, start_position, end_position):
"""Initializes this instance with the given blob key and character range.
This BlobstoreInputReader will read from the first record starting after
strictly after start_position until the first record ending at or after
end_position (exclusive). As an exception, if start_position is 0, then
this InputReader starts reading at the first record.
Args:
blob_key: the BlobKey that this input reader is processing.
start_position: the position to start reading at.
end_position: a position in the last record to read.
"""
self._blob_key = blob_key
self._blob_reader = blobstore.BlobReader(blob_key,
self._BLOB_BUFFER_SIZE,
start_position)
self._end_position = end_position
self._has_iterated = False
self._read_before_start = bool(start_position)
def next(self):
"""Returns the next input from as an (offset, line) tuple."""
self._has_iterated = True
if self._read_before_start:
self._blob_reader.readline()
self._read_before_start = False
start_position = self._blob_reader.tell()
if start_position > self._end_position:
raise StopIteration()
line = self._blob_reader.readline()
if not line:
raise StopIteration()
return start_position, line.rstrip("\n")
def to_json(self):
"""Returns an json-compatible input shard spec for remaining inputs."""
new_pos = self._blob_reader.tell()
if self._has_iterated:
new_pos -= 1
return {self.BLOB_KEY_PARAM: self._blob_key,
self.INITIAL_POSITION_PARAM: new_pos,
self.END_POSITION_PARAM: self._end_position}
def __str__(self):
"""Returns the string representation of this BlobstoreLineInputReader."""
return "blobstore.BlobKey(%r):[%d, %d]" % (
self._blob_key, self._blob_reader.tell(), self._end_position)
@classmethod
def from_json(cls, json):
"""Instantiates an instance of this InputReader for the given shard spec."""
return cls(json[cls.BLOB_KEY_PARAM],
json[cls.INITIAL_POSITION_PARAM],
json[cls.END_POSITION_PARAM])
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise BadReaderParamsError("Mapper input reader class mismatch")
params = mapper_spec.params
if cls.BLOB_KEYS_PARAM not in params:
raise BadReaderParamsError("Must specify 'blob_keys' for mapper input")
blob_keys = params[cls.BLOB_KEYS_PARAM]
if isinstance(blob_keys, basestring):
# This is a mechanism to allow multiple blob keys (which do not contain
# commas) in a single string. It may go away.
blob_keys = blob_keys.split(",")
if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
if not blob_keys:
raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
for blob_key in blob_keys:
blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
if not blob_info:
raise BadReaderParamsError("Could not find blobinfo for key %s" %
blob_key)
@classmethod
def split_input(cls, mapper_spec):
"""Returns a list of shard_count input_spec_shards for input_spec.
Args:
mapper_spec: The mapper specification to split from. Must contain
'blob_keys' parameter with one or more blob keys.
Returns:
A list of BlobstoreInputReaders corresponding to the specified shards.
"""
params = mapper_spec.params
blob_keys = params[cls.BLOB_KEYS_PARAM]
if isinstance(blob_keys, basestring):
# This is a mechanism to allow multiple blob keys (which do not contain
# commas) in a single string. It may go away.
blob_keys = blob_keys.split(",")
blob_sizes = {}
for blob_key in blob_keys:
blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
blob_sizes[blob_key] = blob_info.size
shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
shards_per_blob = shard_count // len(blob_keys)
if shards_per_blob == 0:
shards_per_blob = 1
chunks = []
for blob_key, blob_size in blob_sizes.items():
blob_chunk_size = blob_size // shards_per_blob
for i in xrange(shards_per_blob - 1):
chunks.append(BlobstoreLineInputReader.from_json(
{cls.BLOB_KEY_PARAM: blob_key,
cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
chunks.append(BlobstoreLineInputReader.from_json(
{cls.BLOB_KEY_PARAM: blob_key,
cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
cls.END_POSITION_PARAM: blob_size}))
return chunks
class BlobstoreZipInputReader(InputReader):
"""Input reader for files from a zip archive stored in the Blobstore.
Each instance of the reader will read the TOC, from the end of the zip file,
and then only the contained files which it is responsible for.
"""
# Maximum number of shards to allow.
_MAX_SHARD_COUNT = 256
# Mapreduce parameters.
BLOB_KEY_PARAM = "blob_key"
START_INDEX_PARAM = "start_index"
END_INDEX_PARAM = "end_index"
def __init__(self, blob_key, start_index, end_index,
_reader=blobstore.BlobReader):
"""Initializes this instance with the given blob key and file range.
This BlobstoreZipInputReader will read from the file with index start_index
up to but not including the file with index end_index.
Args:
blob_key: the BlobKey that this input reader is processing.
start_index: the index of the first file to read.
end_index: the index of the first file that will not be read.
_reader: a callable that returns a file-like object for reading blobs.
Used for dependency injection.
"""
self._blob_key = blob_key
self._start_index = start_index
self._end_index = end_index
self._reader = _reader
self._zip = None
self._entries = None
def next(self):
"""Returns the next input from this input reader as (ZipInfo, opener) tuple.
Returns:
The next input from this input reader, in the form of a 2-tuple.
The first element of the tuple is a zipfile.ZipInfo object.
The second element of the tuple is a zero-argument function that, when
called, returns the complete body of the file.
"""
if not self._zip:
self._zip = zipfile.ZipFile(self._reader(self._blob_key))
# Get a list of entries, reversed so we can pop entries off in order
self._entries = self._zip.infolist()[self._start_index:self._end_index]
self._entries.reverse()
if not self._entries:
raise StopIteration()
entry = self._entries.pop()
self._start_index += 1
return (entry, lambda: self._read(entry))
def _read(self, entry):
"""Read entry content.
Args:
entry: zip file entry as zipfile.ZipInfo.
Returns:
Entry content as string.
"""
start_time = time.time()
content = self._zip.read(entry.filename)
ctx = context.get()
if ctx:
operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
operation.counters.Increment(
COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
return content
@classmethod
def from_json(cls, json):
"""Creates an instance of the InputReader for the given input shard state.
Args:
json: The InputReader state as a dict-like object.
Returns:
An instance of the InputReader configured using the values of json.
"""
return cls(json[cls.BLOB_KEY_PARAM],
json[cls.START_INDEX_PARAM],
json[cls.END_INDEX_PARAM])
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A json-izable version of the remaining InputReader.
"""
return {self.BLOB_KEY_PARAM: self._blob_key,
self.START_INDEX_PARAM: self._start_index,
self.END_INDEX_PARAM: self._end_index}
def __str__(self):
"""Returns the string representation of this BlobstoreZipInputReader."""
return "blobstore.BlobKey(%r):[%d, %d]" % (
self._blob_key, self._start_index, self._end_index)
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise BadReaderParamsError("Mapper input reader class mismatch")
params = mapper_spec.params
if cls.BLOB_KEY_PARAM not in params:
raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
blob_key = params[cls.BLOB_KEY_PARAM]
blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
if not blob_info:
raise BadReaderParamsError("Could not find blobinfo for key %s" %
blob_key)
@classmethod
def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
"""Returns a list of input shard states for the input spec.
Args:
mapper_spec: The MapperSpec for this InputReader. Must contain
'blob_key' parameter with one blob key.
_reader: a callable that returns a file-like object for reading blobs.
Used for dependency injection.
Returns:
A list of InputReaders spanning files within the zip.
"""
params = mapper_spec.params
blob_key = params[cls.BLOB_KEY_PARAM]
zip_input = zipfile.ZipFile(_reader(blob_key))
files = zip_input.infolist()
total_size = sum(x.file_size for x in files)
num_shards = min(mapper_spec.shard_count, cls._MAX_SHARD_COUNT)
size_per_shard = total_size // num_shards
# Break the list of files into sublists, each of approximately
# size_per_shard bytes.
shard_start_indexes = [0]
current_shard_size = 0
for i, fileinfo in enumerate(files):
current_shard_size += fileinfo.file_size
if current_shard_size >= size_per_shard:
shard_start_indexes.append(i + 1)
current_shard_size = 0
if shard_start_indexes[-1] != len(files):
shard_start_indexes.append(len(files))
return [cls(blob_key, start_index, end_index, _reader)
for start_index, end_index
in zip(shard_start_indexes, shard_start_indexes[1:])]
class BlobstoreZipLineInputReader(InputReader):
"""Input reader for newline delimited files in zip archives from Blobstore.
This has the same external interface as the BlobstoreLineInputReader, in that
it takes a list of blobs as its input and yields lines to the reader.
However the blobs themselves are expected to be zip archives of line delimited
files instead of the files themselves.
This is useful as many line delimited files gain greatly from compression.
"""
# Maximum number of shards to allow.
_MAX_SHARD_COUNT = 256
# Maximum number of blobs to allow.
_MAX_BLOB_KEYS_COUNT = 246
# Mapreduce parameters.
BLOB_KEYS_PARAM = "blob_keys"
# Serialization parameters.
BLOB_KEY_PARAM = "blob_key"
START_FILE_INDEX_PARAM = "start_file_index"
END_FILE_INDEX_PARAM = "end_file_index"
OFFSET_PARAM = "offset"
def __init__(self, blob_key, start_file_index, end_file_index, offset,
_reader=blobstore.BlobReader):
"""Initializes this instance with the given blob key and file range.
This BlobstoreZipLineInputReader will read from the file with index
start_file_index up to but not including the file with index end_file_index.
It will return lines starting at offset within file[start_file_index]
Args:
blob_key: the BlobKey that this input reader is processing.
start_file_index: the index of the first file to read within the zip.
end_file_index: the index of the first file that will not be read.
offset: the byte offset within blob_key.zip[start_file_index] to start
reading. The reader will continue to the end of the file.
_reader: a callable that returns a file-like object for reading blobs.
Used for dependency injection.
"""
self._blob_key = blob_key
self._start_file_index = start_file_index
self._end_file_index = end_file_index
self._initial_offset = offset
self._reader = _reader
self._zip = None
self._entries = None
self._filestream = None
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise BadReaderParamsError("Mapper input reader class mismatch")
params = mapper_spec.params
if cls.BLOB_KEYS_PARAM not in params:
raise BadReaderParamsError("Must specify 'blob_key' for mapper input")
blob_keys = params[cls.BLOB_KEYS_PARAM]
if isinstance(blob_keys, basestring):
# This is a mechanism to allow multiple blob keys (which do not contain
# commas) in a single string. It may go away.
blob_keys = blob_keys.split(",")
if len(blob_keys) > cls._MAX_BLOB_KEYS_COUNT:
raise BadReaderParamsError("Too many 'blob_keys' for mapper input")
if not blob_keys:
raise BadReaderParamsError("No 'blob_keys' specified for mapper input")
for blob_key in blob_keys:
blob_info = blobstore.BlobInfo.get(blobstore.BlobKey(blob_key))
if not blob_info:
raise BadReaderParamsError("Could not find blobinfo for key %s" %
blob_key)
@classmethod
def split_input(cls, mapper_spec, _reader=blobstore.BlobReader):
"""Returns a list of input readers for the input spec.
Args:
mapper_spec: The MapperSpec for this InputReader. Must contain
'blob_keys' parameter with one or more blob keys.
_reader: a callable that returns a file-like object for reading blobs.
Used for dependency injection.
Returns:
A list of InputReaders spanning the subfiles within the blobs.
There will be at least one reader per blob, but it will otherwise
attempt to keep the expanded size even.
"""
params = mapper_spec.params
blob_keys = params[cls.BLOB_KEYS_PARAM]
if isinstance(blob_keys, basestring):
# This is a mechanism to allow multiple blob keys (which do not contain
# commas) in a single string. It may go away.
blob_keys = blob_keys.split(",")
blob_files = {}
total_size = 0
for blob_key in blob_keys:
zip_input = zipfile.ZipFile(_reader(blob_key))
blob_files[blob_key] = zip_input.infolist()
total_size += sum(x.file_size for x in blob_files[blob_key])
shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
# We can break on both blob key and file-within-zip boundaries.
# A shard will span at minimum a single blob key, but may only
# handle a few files within a blob.
size_per_shard = total_size // shard_count
readers = []
for blob_key in blob_keys:
files = blob_files[blob_key]
current_shard_size = 0
start_file_index = 0
next_file_index = 0
for fileinfo in files:
next_file_index += 1
current_shard_size += fileinfo.file_size
if current_shard_size >= size_per_shard:
readers.append(cls(blob_key, start_file_index, next_file_index, 0,
_reader))
current_shard_size = 0
start_file_index = next_file_index
if current_shard_size != 0:
readers.append(cls(blob_key, start_file_index, next_file_index, 0,
_reader))
return readers
def next(self):
"""Returns the next line from this input reader as (lineinfo, line) tuple.
Returns:
The next input from this input reader, in the form of a 2-tuple.
The first element of the tuple describes the source, it is itself
a tuple (blobkey, filenumber, byteoffset).
The second element of the tuple is the line found at that offset.
"""
if not self._filestream:
if not self._zip:
self._zip = zipfile.ZipFile(self._reader(self._blob_key))
# Get a list of entries, reversed so we can pop entries off in order
self._entries = self._zip.infolist()[self._start_file_index:
self._end_file_index]
self._entries.reverse()
if not self._entries:
raise StopIteration()
entry = self._entries.pop()
value = self._zip.read(entry.filename)
self._filestream = StringIO.StringIO(value)
if self._initial_offset:
self._filestream.seek(self._initial_offset)
self._filestream.readline()
start_position = self._filestream.tell()
line = self._filestream.readline()
if not line:
# Done with this file in the zip. Move on to the next file.
self._filestream.close()
self._filestream = None
self._start_file_index += 1
self._initial_offset = 0
return self.next()
return ((self._blob_key, self._start_file_index, start_position),
line.rstrip("\n"))
def _next_offset(self):
"""Return the offset of the next line to read."""
if self._filestream:
offset = self._filestream.tell()
if offset:
offset -= 1
else:
offset = self._initial_offset
return offset
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A json-izable version of the remaining InputReader.
"""
return {self.BLOB_KEY_PARAM: self._blob_key,
self.START_FILE_INDEX_PARAM: self._start_file_index,
self.END_FILE_INDEX_PARAM: self._end_file_index,
self.OFFSET_PARAM: self._next_offset()}
@classmethod
def from_json(cls, json, _reader=blobstore.BlobReader):
"""Creates an instance of the InputReader for the given input shard state.
Args:
json: The InputReader state as a dict-like object.
_reader: For dependency injection.
Returns:
An instance of the InputReader configured using the values of json.
"""
return cls(json[cls.BLOB_KEY_PARAM],
json[cls.START_FILE_INDEX_PARAM],
json[cls.END_FILE_INDEX_PARAM],
json[cls.OFFSET_PARAM],
_reader)
def __str__(self):
"""Returns the string representation of this reader.
Returns:
string blobkey:[start file num, end file num]:current offset.
"""
return "blobstore.BlobKey(%r):[%d, %d]:%d" % (
self._blob_key, self._start_file_index, self._end_file_index,
self._next_offset())
class ConsistentKeyReader(DatastoreKeyInputReader):
"""A key reader which reads consistent data from datastore.
Datastore might have entities which were written, but not visible through
queries for some time. Typically these entities can be only read inside
transaction until they are 'applied'.
This reader reads all keys even if they are not visible. It might take
significant time to start yielding some data because it has to apply all
modifications created before its start.
"""
START_TIME_US_PARAM = "start_time_us"
UNAPPLIED_LOG_FILTER = "__unapplied_log_timestamp_us__ <"
DUMMY_KIND = "DUMMY_KIND"
DUMMY_ID = 106275677020293L
UNAPPLIED_QUERY_DEADLINE = 270 # Max supported by datastore.
def _get_unapplied_jobs_accross_namespaces(self,
namespace_start,
namespace_end,
app):
filters = {"__key__ >=": db.Key.from_path("__namespace__",
namespace_start or 1,
_app=app),
"__key__ <=": db.Key.from_path("__namespace__",
namespace_end or 1,
_app=app),
self.UNAPPLIED_LOG_FILTER: self.start_time_us}
unapplied_query = datastore.Query(filters=filters, keys_only=True, _app=app)
return unapplied_query.Get(
limit=self._batch_size,
config=datastore_rpc.Configuration(
deadline=self.UNAPPLIED_QUERY_DEADLINE))
def _iter_ns_range(self):
while True:
unapplied_jobs = self._get_unapplied_jobs_accross_namespaces(
self._ns_range.namespace_start,
self._ns_range.namespace_end,
self._ns_range.app)
if not unapplied_jobs:
break
self._apply_jobs(unapplied_jobs)
for o in super(ConsistentKeyReader, self)._iter_ns_range():
yield o
def _iter_key_range(self, k_range):
assert hasattr(self, "start_time_us"), "start_time_us property was not set"
if self._ns_range is None:
# _iter_ns_range will have already have dealt with unapplied jobs so only
# handle the case where it would not have been called.
self._apply_key_range(k_range)
for o in super(ConsistentKeyReader, self)._iter_key_range(k_range):
yield o
def _apply_key_range(self, k_range):
"""Apply all jobs in the given KeyRange."""
# The strategy used here will not work if the entire key range cannot be
# applied before the task times-out because the results of incremental work
# are not checkpointed. It also assumes that the entire key range can be
# queried without timing-out, which may not be the case.
# See b/5201059.
apply_range = copy.deepcopy(k_range)
while True:
# Creates an unapplied query and fetches unapplied jobs in the result
# range. self.split() ensures that the generated KeyRanges cover the
# entire possible key range.
unapplied_query = self._make_unapplied_query(apply_range)
unapplied_jobs = unapplied_query.Get(
limit=self._batch_size,
config=datastore_rpc.Configuration(
deadline=self.UNAPPLIED_QUERY_DEADLINE))
if not unapplied_jobs:
break
self._apply_jobs(unapplied_jobs)
# Avoid requerying parts of the key range that have already been
# applied.
apply_range.advance(unapplied_jobs[-1])
def _make_unapplied_query(self, k_range):
"""Returns a datastore.Query that finds the unapplied keys in k_range."""
unapplied_query = k_range.make_ascending_datastore_query(
kind=None, keys_only=True)
unapplied_query[
ConsistentKeyReader.UNAPPLIED_LOG_FILTER] = self.start_time_us
return unapplied_query
def _apply_jobs(self, unapplied_jobs):
"""Apply all jobs implied by the given keys."""
# There were some unapplied jobs. Roll them forward.
keys_to_apply = []
for key in unapplied_jobs:
# To apply the entity group we need to read something from it.
# We use dummy kind and id because we don't actually need any data.
path = key.to_path() + [ConsistentKeyReader.DUMMY_KIND,
ConsistentKeyReader.DUMMY_ID]
keys_to_apply.append(
db.Key.from_path(_app=key.app(), namespace=key.namespace(), *path))
db.get(keys_to_apply, config=datastore_rpc.Configuration(
deadline=self.UNAPPLIED_QUERY_DEADLINE,
read_policy=datastore_rpc.Configuration.APPLY_ALL_JOBS_CONSISTENCY))
@classmethod
def _split_input_from_namespace(cls,
app,
namespace,
entity_kind_name,
shard_count):
key_ranges = super(ConsistentKeyReader, cls)._split_input_from_namespace(
app, namespace, entity_kind_name, shard_count)
assert len(key_ranges) == shard_count
# The KeyRanges calculated by the base class may not include keys for
# entities that have unapplied jobs. So use an open key range for the first
# and last KeyRanges to ensure that they will be processed.
try:
last_key_range_index = key_ranges.index(None) - 1
except ValueError:
last_key_range_index = shard_count - 1
if last_key_range_index != -1:
key_ranges[0].key_start = None
key_ranges[0].include_start = False
key_ranges[last_key_range_index].key_end = None
key_ranges[last_key_range_index].include_end = False
return key_ranges
@classmethod
def _split_input_from_params(cls, app, namespaces, entity_kind_name,
params, shard_count):
readers = super(ConsistentKeyReader, cls)._split_input_from_params(
app,
namespaces,
entity_kind_name,
params,
shard_count)
# We always produce at least one namespace range because:
# a) there might be unapplied entities
# b) it simplifies mapper code
if not readers:
readers = [cls(entity_kind_name,
key_ranges=None,
ns_range=namespace_range.NamespaceRange(),
batch_size=shard_count)]
return readers
@classmethod
def split_input(cls, mapper_spec):
"""Splits input into key ranges."""
readers = super(ConsistentKeyReader, cls).split_input(mapper_spec)
start_time_us = mapper_spec.params.get(
cls.START_TIME_US_PARAM, long(time.time() * 1e6))
for reader in readers:
reader.start_time_us = start_time_us
return readers
def to_json(self):
"""Serializes all the data in this reader into json form.
Returns:
all the data in json-compatible map.
"""
json_dict = super(DatastoreKeyInputReader, self).to_json()
json_dict[self.START_TIME_US_PARAM] = self.start_time_us
return json_dict
@classmethod
def from_json(cls, json):
"""Create new ConsistentKeyReader from the json, encoded by to_json.
Args:
json: json map representation of ConsistentKeyReader.
Returns:
an instance of ConsistentKeyReader with all data deserialized from json.
"""
reader = super(ConsistentKeyReader, cls).from_json(json)
reader.start_time_us = json[cls.START_TIME_US_PARAM]
return reader
# TODO(user): This reader always produces only one shard, because
# namespace entities use the mix of ids/names, and KeyRange-based splitting
# doesn't work satisfactory in this case.
# It's possible to implement specific splitting functionality for the reader
# instead of reusing generic one. Meanwhile 1 shard is enough for our
# applications.
class NamespaceInputReader(InputReader):
"""An input reader to iterate over namespaces.
This reader yields namespace names as string.
It will always produce only one shard.
"""
NAMESPACE_RANGE_PARAM = "namespace_range"
BATCH_SIZE_PARAM = "batch_size"
_BATCH_SIZE = 10
def __init__(self, ns_range, batch_size = _BATCH_SIZE):
self.ns_range = ns_range
self._batch_size = batch_size
def to_json(self):
"""Serializes all the data in this query range into json form.
Returns:
all the data in json-compatible map.
"""
return {self.NAMESPACE_RANGE_PARAM: self.ns_range.to_json_object(),
self.BATCH_SIZE_PARAM: self._batch_size}
@classmethod
def from_json(cls, json):
"""Create new DatastoreInputReader from the json, encoded by to_json.
Args:
json: json map representation of DatastoreInputReader.
Returns:
an instance of DatastoreInputReader with all data deserialized from json.
"""
return cls(
namespace_range.NamespaceRange.from_json_object(
json[cls.NAMESPACE_RANGE_PARAM]),
json[cls.BATCH_SIZE_PARAM])
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise BadReaderParamsError("Input reader class mismatch")
params = mapper_spec.params
if cls.BATCH_SIZE_PARAM in params:
try:
batch_size = int(params[cls.BATCH_SIZE_PARAM])
if batch_size < 1:
raise BadReaderParamsError("Bad batch size: %s" % batch_size)
except ValueError, e:
raise BadReaderParamsError("Bad batch size: %s" % e)
@classmethod
def split_input(cls, mapper_spec):
"""Returns a list of input readers for the input spec.
Args:
mapper_spec: The MapperSpec for this InputReader.
Returns:
A list of InputReaders.
"""
batch_size = int(mapper_spec.params.get(cls.BATCH_SIZE_PARAM,
cls._BATCH_SIZE))
shard_count = mapper_spec.shard_count
namespace_ranges = namespace_range.NamespaceRange.split(shard_count,
contiguous=True)
return [NamespaceInputReader(ns_range, batch_size)
for ns_range in namespace_ranges]
def __iter__(self):
while True:
keys = self.ns_range.make_datastore_query().Get(limit=self._batch_size)
if not keys:
break
for key in keys:
namespace = metadata.Namespace.key_to_namespace(key)
self.ns_range = self.ns_range.with_start_after(namespace)
yield namespace
def __str__(self):
return repr(self.ns_range)
class RecordsReader(InputReader):
"""Reader to read a list of Files API file in records format.
The number of input shards can be specified by the SHARDS_PARAM
mapper parameter. Input files cannot be split, so there will be at most
one shard per file. Also the number of shards will not be reduced based on
the number of input files, so shards in always equals shards out.
"""
FILE_PARAM = "file"
FILES_PARAM = "files"
def __init__(self, filenames, position):
"""Constructor.
Args:
filenames: list of filenames.
position: file position to start reading from as int.
"""
self._filenames = filenames
if self._filenames:
self._reader = records.RecordsReader(
files.BufferedFile(self._filenames[0]))
self._reader.seek(position)
else:
self._reader = None
def __iter__(self):
"""Iterate over records in file.
Yields records as strings.
"""
ctx = context.get()
while self._reader:
try:
start_time = time.time()
record = self._reader.read()
if ctx:
operation.counters.Increment(
COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
operation.counters.Increment(COUNTER_IO_READ_BYTES, len(record))(ctx)
yield record
except EOFError:
self._filenames.pop(0)
if not self._filenames:
self._reader = None
else:
self._reader = records.RecordsReader(
files.BufferedFile(self._filenames[0]))
@classmethod
def from_json(cls, json):
"""Creates an instance of the InputReader for the given input shard state.
Args:
json: The InputReader state as a dict-like object.
Returns:
An instance of the InputReader configured using the values of json.
"""
return cls(json["filenames"], json["position"])
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A json-izable version of the remaining InputReader.
"""
result = {
"filenames": self._filenames,
"position": 0,
}
if self._reader:
result["position"] = self._reader.tell()
return result
@classmethod
def split_input(cls, mapper_spec):
"""Returns a list of input readers for the input spec.
Args:
mapper_spec: The MapperSpec for this InputReader.
Returns:
A list of InputReaders.
"""
params = mapper_spec.params
shard_count = mapper_spec.shard_count
if cls.FILES_PARAM in params:
filenames = params[cls.FILES_PARAM]
if isinstance(filenames, basestring):
filenames = filenames.split(",")
else:
filenames = [params[cls.FILE_PARAM]]
batch_list = [[] for _ in xrange(shard_count)]
for index, filename in enumerate(filenames):
# Simplest round robin so we don't have any short shards.
batch_list[index % shard_count].append(filenames[index])
# Sort from most shards to least shards so the short shard is last.
batch_list.sort(reverse=True, key=lambda x: len(x))
return [cls(batch, 0) for batch in batch_list]
@classmethod
def validate(cls, mapper_spec):
"""Validates mapper spec and all mapper parameters.
Args:
mapper_spec: The MapperSpec for this InputReader.
Raises:
BadReaderParamsError: required parameters are missing or invalid.
"""
if mapper_spec.input_reader_class() != cls:
raise errors.BadReaderParamsError("Input reader class mismatch")
params = mapper_spec.params
if (cls.FILES_PARAM not in params and
cls.FILE_PARAM not in params):
raise BadReaderParamsError(
"Must specify '%s' or '%s' parameter for mapper input" %
(cls.FILES_PARAM, cls.FILE_PARAM))
def __str__(self):
position = 0
if self._reader:
position = self._reader.tell()
return "%s:%s" % (self._filenames, position)
class LogInputReader(InputReader):
"""Input reader for a time range of logs via the Logs Reader API.
The number of input shards may be specified by the SHARDS_PARAM mapper
parameter. A starting and ending time (in seconds since the Unix epoch) are
required to generate time ranges over which to shard the input.
"""
# Parameters directly mapping to those available via logservice.fetch().
START_TIME_PARAM = "start_time"
END_TIME_PARAM = "end_time"
MINIMUM_LOG_LEVEL_PARAM = "minimum_log_level"
INCLUDE_INCOMPLETE_PARAM = "include_incomplete"
INCLUDE_APP_LOGS_PARAM = "include_app_logs"
VERSION_IDS_PARAM = "version_ids"
# Semi-hidden parameters used only internally or for privileged applications.
_OFFSET_PARAM = "offset"
_PROTOTYPE_REQUEST_PARAM = "prototype_request"
_PARAMS = frozenset([START_TIME_PARAM, END_TIME_PARAM, _OFFSET_PARAM,
MINIMUM_LOG_LEVEL_PARAM, INCLUDE_INCOMPLETE_PARAM,
INCLUDE_APP_LOGS_PARAM, VERSION_IDS_PARAM,
_PROTOTYPE_REQUEST_PARAM])
_KWARGS = frozenset([_OFFSET_PARAM, _PROTOTYPE_REQUEST_PARAM])
def __init__(self,
start_time=None,
end_time=None,
minimum_log_level=None,
include_incomplete=False,
include_app_logs=False,
version_ids=None,
**kwargs):
"""Constructor.
Args:
start_time: The earliest request completion or last-update time of logs
that should be mapped over, in seconds since the Unix epoch.
end_time: The latest request completion or last-update time that logs
should be mapped over, in seconds since the Unix epoch.
minimum_log_level: An application log level which serves as a filter on
the requests mapped over--requests with no application log at or above
the specified level will be omitted, even if include_app_logs is False.
include_incomplete: Whether or not to include requests that have started
but not yet finished, as a boolean. Defaults to False.
include_app_logs: Whether or not to include application level logs in the
mapped logs, as a boolean. Defaults to False.
version_ids: A list of version ids whose logs should be mapped against.
"""
InputReader.__init__(self)
# The rule for __params is that its contents will always be suitable as
# input to logservice.fetch().
self.__params = dict(kwargs)
if start_time is not None:
self.__params[self.START_TIME_PARAM] = start_time
if end_time is not None:
self.__params[self.END_TIME_PARAM] = end_time
if minimum_log_level is not None:
self.__params[self.MINIMUM_LOG_LEVEL_PARAM] = minimum_log_level
if include_incomplete is not None:
self.__params[self.INCLUDE_INCOMPLETE_PARAM] = include_incomplete
if include_app_logs is not None:
self.__params[self.INCLUDE_APP_LOGS_PARAM] = include_app_logs
if version_ids:
self.__params[self.VERSION_IDS_PARAM] = version_ids
# Any submitted prototype_request will be in encoded form.
if self._PROTOTYPE_REQUEST_PARAM in self.__params:
prototype_request = log_service_pb.LogReadRequest(
self.__params[self._PROTOTYPE_REQUEST_PARAM])
self.__params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request
@staticmethod
def __kwargs(args):
"""Return a new dictionary with all keys converted to type 'str'."""
return dict((str(name), value) for name, value in args.iteritems())
def __iter__(self):
"""Iterates over logs in a given range of time.
Yields:
A RequestLog containing all the information for a single request.
"""
for log in logservice.fetch(**self.__params):
self.__params[self._OFFSET_PARAM] = log.offset
yield log
@classmethod
def from_json(cls, json):
"""Creates an instance of the InputReader for the given input shard's state.
Args:
json: The InputReader state as a dict-like object.
Returns:
An instance of the InputReader configured using the given JSON parameters.
"""
params = cls.__kwargs(json)
# Strip out unrecognized parameters, as introduced by b/5960884.
params = dict((k, v) for k, v in params.iteritems() if k in cls._PARAMS)
# This is not symmetric with to_json() wrt. PROTOTYPE_REQUEST_PARAM because
# the constructor parameters need to be JSON-encodable, so the decoding
# needs to happen there anyways.
if cls._OFFSET_PARAM in params:
params[cls._OFFSET_PARAM] = base64.b64decode(params[cls._OFFSET_PARAM])
return cls(**params)
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A JSON serializable version of the remaining input to read.
"""
params = dict(self.__params) # Shallow copy.
if self._PROTOTYPE_REQUEST_PARAM in params:
prototype_request = params[self._PROTOTYPE_REQUEST_PARAM]
params[self._PROTOTYPE_REQUEST_PARAM] = prototype_request.Encode()
if self._OFFSET_PARAM in params:
params[self._OFFSET_PARAM] = base64.b64encode(params[self._OFFSET_PARAM])
return params
@classmethod
def split_input(cls, mapper_spec):
"""Returns a list of input readers for the given input specification.
Args:
mapper_spec: The MapperSpec for this InputReader.
Returns:
A list of InputReaders.
"""
params = cls.__kwargs(mapper_spec.params)
shard_count = mapper_spec.shard_count
# Pick out the overall start and end times and time step per shard.
start_time = params[cls.START_TIME_PARAM]
end_time = params[cls.END_TIME_PARAM]
seconds_per_shard = (end_time - start_time) / shard_count
# Create a LogInputReader for each shard, modulating the params as we go.
shards = []
for _ in xrange(shard_count - 1):
params[cls.END_TIME_PARAM] = (params[cls.START_TIME_PARAM] +
seconds_per_shard)
shards.append(LogInputReader(**params))
params[cls.START_TIME_PARAM] = params[cls.END_TIME_PARAM]
# Create a final shard to complete the time range.
params[cls.END_TIME_PARAM] = end_time
return shards + [LogInputReader(**params)]
@classmethod
def validate(cls, mapper_spec):
"""Validates the mapper's specification and all necessary parameters.
Args:
mapper_spec: The MapperSpec to be used with this InputReader.
Raises:
BadReaderParamsError: If the user fails to specify both a starting time
and an ending time, or if the starting time is later than the ending
time.
"""
if mapper_spec.input_reader_class() != cls:
raise errors.BadReaderParamsError("Input reader class mismatch")
params = cls.__kwargs(mapper_spec.params)
params_diff = set(params.keys()) - cls._PARAMS
if params_diff:
raise errors.BadReaderParamsError("Invalid mapper parameters: %s" %
",".join(params_diff))
if cls.VERSION_IDS_PARAM not in params:
raise errors.BadReaderParamsError("Must specify a list of version ids "
"for mapper input")
if (cls.START_TIME_PARAM not in params or
params[cls.START_TIME_PARAM] is None):
raise errors.BadReaderParamsError("Must specify a starting time for "
"mapper input")
if cls.END_TIME_PARAM not in params or params[cls.END_TIME_PARAM] is None:
params[cls.END_TIME_PARAM] = time.time()
if params[cls.START_TIME_PARAM] >= params[cls.END_TIME_PARAM]:
raise errors.BadReaderParamsError("The starting time cannot be later "
"than or the same as the ending time.")
if cls._PROTOTYPE_REQUEST_PARAM in params:
try:
params[cls._PROTOTYPE_REQUEST_PARAM] = log_service_pb.LogReadRequest(
params[cls._PROTOTYPE_REQUEST_PARAM])
except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
raise errors.BadReaderParamsError("The prototype request must be "
"parseable as a LogReadRequest.")
# Pass the parameters to logservice.fetch() to verify any underlying
# constraints on types or values. This only constructs an iterator, it
# doesn't trigger any requests for actual log records.
try:
logservice.fetch(**params)
except logservice.InvalidArgumentError, e:
raise errors.BadReaderParamsError("One or more parameters are not valid "
"inputs to logservice.fetch(): %s" % e)
def __str__(self):
"""Returns the string representation of this LogInputReader."""
params = []
for key, value in self.__params.iteritems():
if key is self._PROTOTYPE_REQUEST_PARAM:
params.append("%s='%s'" % (key, value))
elif key is self._OFFSET_PARAM:
params.append("%s='%s'" % (key, value))
else:
params.append("%s=%s" % (key, value))
return "LogInputReader(%s)" % ", ".join(params)