#!/usr/bin/env python
"""Datastore models used by the Google App Engine Pipeline API."""
from google.appengine.ext import db
from google.appengine.ext import blobstore
# Relative imports
from mapreduce.lib import simplejson
class _PipelineRecord(db.Model):
"""Represents a Pipeline.
class_path: Path of the Python class to use for this pipeline.
root_pipeline: The root of the whole workflow; set to itself this pipeline
is its own root.
fanned_out: List of child _PipelineRecords that were started when this
generator pipeline moved from WAITING to RUN.
start_time: For pipelines with no start _BarrierRecord, when this pipeline
was enqueued to run immediately.
finalized_time: When this pipeline moved from WAITING or RUN to DONE.
params: Serialized parameter dictionary.
status: The current status of the pipeline.
current_attempt: The current attempt (starting at 0) to run.
max_attempts: Maximum number of attempts (starting at 0) to run.
next_retry_time: ETA of the next retry attempt.
retry_message: Why the last attempt failed; None or empty if no message.
Root pipeline properties:
is_root_pipeline: This is a root pipeline.
abort_message: Why the whole pipeline was aborted; only saved on
root pipelines.
abort_requested: If an abort signal has been requested for this root
pipeline; only saved on root pipelines
WAITING = 'waiting'
RUN = 'run'
DONE = 'done'
ABORTED = 'aborted'
class_path = db.StringProperty()
root_pipeline = db.SelfReferenceProperty(
fanned_out = db.ListProperty(db.Key, indexed=False)
start_time = db.DateTimeProperty(indexed=False)
finalized_time = db.DateTimeProperty(indexed=False)
# One of these two will be set, depending on the size of the params.
params_text = db.TextProperty(name='params')
params_blob = blobstore.BlobReferenceProperty(indexed=False)
status = db.StringProperty(choices=(WAITING, RUN, DONE, ABORTED),
# Retry behavior
current_attempt = db.IntegerProperty(default=0, indexed=False)
max_attempts = db.IntegerProperty(default=1, indexed=False)
next_retry_time = db.DateTimeProperty(indexed=False)
retry_message = db.TextProperty()
# Root pipeline properties
is_root_pipeline = db.BooleanProperty()
abort_message = db.TextProperty()
abort_requested = db.BooleanProperty(indexed=False)
def kind(cls):
return '_AE_Pipeline_Record'
def params(self):
"""Returns the dictionary of parameters for this Pipeline."""
if hasattr(self, '_params_decoded'):
return self._params_decoded
if self.params_blob is not None:
value_encoded =
value_encoded = self.params_text
value = simplejson.loads(value_encoded)
if isinstance(value, dict):
kwargs = value.get('kwargs')
if kwargs:
adjusted_kwargs = {}
for arg_key, arg_value in kwargs.iteritems():
# Python only allows non-unicode strings as keyword arguments.
adjusted_kwargs[str(arg_key)] = arg_value
value['kwargs'] = adjusted_kwargs
self._params_decoded = value
return self._params_decoded
class _SlotRecord(db.Model):
"""Represents an output slot.
root_pipeline: The root of the workflow.
filler: The pipeline that filled this slot.
value: Serialized value for this slot.
status: The current status of the slot.
fill_time: When the slot was filled by the filler.
FILLED = 'filled'
WAITING = 'waiting'
root_pipeline = db.ReferenceProperty(_PipelineRecord)
filler = db.ReferenceProperty(_PipelineRecord,
# One of these two will be set, depending on the size of the value.
value_text = db.TextProperty(name='value')
value_blob = blobstore.BlobReferenceProperty(indexed=False)
status = db.StringProperty(choices=(FILLED, WAITING), default=WAITING,
fill_time = db.DateTimeProperty(indexed=False)
def kind(cls):
return '_AE_Pipeline_Slot'
def value(self):
"""Returns the value of this Slot."""
if hasattr(self, '_value_decoded'):
return self._value_decoded
if self.value_blob is not None:
encoded_value =
encoded_value = self.value_text
self._value_decoded = simplejson.loads(encoded_value)
return self._value_decoded
class _BarrierRecord(db.Model):
"""Represents a barrier.
root_pipeline: The root of the workflow.
target: The pipeline to run when the barrier fires.
blocking_slots: The slots that must be filled before this barrier fires.
trigger_time: When this barrier fired.
status: The current status of the barrier.
# Barrier statuses
FIRED = 'fired'
WAITING = 'waiting'
# Barrier trigger reasons (used as key names)
START = 'start'
FINALIZE = 'finalize'
ABORT = 'abort'
root_pipeline = db.ReferenceProperty(_PipelineRecord)
target = db.ReferenceProperty(_PipelineRecord,
blocking_slots = db.ListProperty(db.Key)
trigger_time = db.DateTimeProperty(indexed=False)
status = db.StringProperty(choices=(FIRED, WAITING), default=WAITING,
def kind(cls):
return '_AE_Pipeline_Barrier'
class _StatusRecord(db.Model):
"""Represents the current status of a pipeline.
message: The textual message to show.
console_url: URL to iframe as the primary console for this pipeline.
link_names: Human display names for status links.
link_urls: URLs corresponding to human names for status links.
status_time: When the status was written.
root_pipeline = db.ReferenceProperty(_PipelineRecord)
message = db.TextProperty()
console_url = db.TextProperty()
link_names = db.ListProperty(db.Text, indexed=False)
link_urls = db.ListProperty(db.Text, indexed=False)
status_time = db.DateTimeProperty(indexed=False)
def kind(cls):
return '_AE_Pipeline_Status'