blob: 01644731b754da8edd2d5b0a400df817e32ac1e4 [file] [log] [blame]
# Copyright 2014 the Melange authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tasks related to syncing shipment tracking data."""
import csv
import datetime
import json
import logging
from django.conf.urls import url as django_url
from google.appengine.api import taskqueue
from google.appengine.ext import db
from google.appengine.ext import ndb
from google.appengine.runtime import DeadlineExceededError
from melange.logic import profile as profile_logic
from summerofcode.models.shipment import StudentShipment
from summerofcode.models.shipment_tracking import ShipmentInfo
from summerofcode.request import links
from soc.logic import dicts
from soc.tasks import responses
from soc.tasks.helper import timekeeper
from soc.modules.gsoc.views.helper import url_names
DATE_SHIPPED_FORMAT = '%d/%m/%Y'
class _ColumnNotFoundError(Exception):
"""Error to be raised when an expected column is not found in the row."""
class _ParamNotFoundError(Exception):
"""Error to be raised when an expected parameter is not found in the row."""
def _getShipmentInfo(program_key, shipment_info_id):
"""Returns the ShipmentInfo entity for the specified program and id."""
ndb_program_key = ndb.Key.from_old_key(program_key)
return ShipmentInfo.get_by_id(shipment_info_id, parent=ndb_program_key)
def _findColumnIndexes(first_row, expected_columns):
"""Find column indexes in the first row for expected columns.
Args:
first_row: List for raw header row data of the sheet.
expected_columns: List of expected columns in the first row. All
elements of the list are expected to be found in the
first row.
Returns:
Returns a dictionairy containing the indexes with the expected columns
as the keys, with as value the index in the row data.
"""
column_indexes = {}
for column_name in expected_columns:
try:
column_index = first_row.index(column_name)
except ValueError:
raise _ColumnNotFoundError(
'%s not found in %s' % (column_name, first_row))
column_indexes[column_name] = column_index
return column_indexes
def _getRowData(row, column_indexes):
"""Converts a list of row data into a dictionary with the same data."""
return {column_name: row[column_index]
for column_name, column_index in column_indexes.iteritems()}
def _paramsFromPostOrGet(request, fields):
"""Extracts a four-tuple of params from the POST or GET dict.
Returns:
program_key: the key of the program which task is runnig for.
sheet_content: sheet content data as a string.
sheet_type: 'usa' or 'intl'
shipment_info_id: id of the shipment info object that task is running
for.
"""
params = dicts.merge(request.POST, request.GET)
result = []
for field in fields:
if field not in params:
logging.error("missing %s in params: '%s'", field, params)
raise _ParamNotFoundError()
result.append(params[field])
return result
#Expected columns for USA and international student sheets
EXPECTED_COLUMNS = ['username', 'tracking']
def _parseSheetContent(sheet_type, sheet_content):
"""Extracts the that should be passed to continueShipmentTracking.
Args:
sheet_type: 'usa' or 'intl'
sheet_content: sheet content data as a string
Returns:
A tuple of column_indexes (a list of values) and sheet_rows
(a list of lists of values).
"""
iterable_sheet_content = sheet_content.splitlines()
sheet_rows = [row for row in csv.reader(iterable_sheet_content)]
if sheet_type == 'usa':
column_indexes = _findColumnIndexes(
sheet_rows[0], EXPECTED_COLUMNS)
elif sheet_type == 'intl':
column_indexes = _findColumnIndexes(
sheet_rows[0], EXPECTED_COLUMNS)
return column_indexes, sheet_rows[1:]
def _setShipmentInfoStatusToError(shipment_info):
"""Fallback function that sets shipment info status to 'error'.
"""
shipment_info.status = 'error'
shipment_info.put()
def _startShipmentSync(
program_key, shipment_info, sheet_content_json, sheet_type):
"""Start syncing shipment data.
Args:
program_key: the key of the program which task is runnig for.
shipment_info: the shipment info object that task is running for.
sheet_content_json: sheet content data in JSON format.
sheet_type: 'usa' or 'intl'
"""
sheet_content = json.loads(sheet_content_json)
column_indexes, sheet_rows = _parseSheetContent(sheet_type, sheet_content)
params = {
'program_key': str(program_key),
'shipment_info_id': shipment_info.key.id(),
'column_indexes': json.dumps(column_indexes),
'sheet_rows': json.dumps(sheet_rows),
}
task_continue_url = links.SOC_LINKER.site(
url_names.GSOC_SHIPMENT_TASK_CONTINUE)
def update_shipment_txn():
shipment_info.status = 'syncing'
shipment_info.put()
taskqueue.add(url=task_continue_url, params=params)
db.run_in_transaction(update_shipment_txn)
return responses.terminateTask()
def _updateShipmentDataForStudent(shipment_info, profile, tracking):
"""Gets or creates the StudentShipment object for the specified profile.
Args:
shipment_info: The shipment info that is being synced.
profile: the Profile of the student
tracking: the tracking number that should be set
Returns:
The StudentShipment object.
"""
query = StudentShipment.query(
StudentShipment.shipment_info == shipment_info.key,
ancestor=profile.key)
student_shipment = query.get()
if not student_shipment:
student_shipment = StudentShipment(
shipment_info=shipment_info.key, parent=profile.key)
student_shipment.tracking = tracking
student_shipment.put()
def _continueShipmentSyncRow(column_indexes, row, program_key, shipment_info):
"""Performs the shipment sync for one row of data.
Args:
column_indexes: column indexes for specific columns as a dictionairy.
row: one row from the spreadsheet.
program_key: the key of the program which sync is being done for.
shipment_info: the shipment info object that task is running for.
Returns:
The StudentShipment object.
"""
if len(row) < len(column_indexes):
shortage = len(column_indexes) - len(row)
row.extend(shortage * [''])
data = _getRowData(row, column_indexes)
username = data['username']
profile = profile_logic.getProfileForUsername(username, program_key)
if not profile:
logging.error(
"Profile with username '%s' for program '%s' is not found",
username, program_key.id())
return
if not profile.is_student:
logging.error("Profile with username '%s' is not a student", username)
return
_updateShipmentDataForStudent(shipment_info, profile, data['tracking'])
def _continueShipmentSync(
timer, program_key, shipment_info, column_indexes_json, sheet_rows_json):
"""Continue syncing shipment data.
Args:
program_key_str: the key of the program which sync is being done for.
shipment_info: the shipment info object that task is running for.
column_indexes_json: column indexes for specific columns in JSON format.
sheet_rows_json: spreadsheets CSV chunk data in JSON format.
Returns:
The StudentShipment object.
"""
column_indexes = json.loads(column_indexes_json)
sheet_rows = json.loads(sheet_rows_json)
try:
for remain, row in timer.iterate(sheet_rows):
_continueShipmentSyncRow(column_indexes, row, program_key, shipment_info)
except DeadlineExceededError:
if not remain:
return
remaining_rows = sheet_rows[(-1 * remain):]
params = {
'program_key': str(program_key),
'sheet_rows': json.dumps(remaining_rows),
'column_indexes': column_indexes,
'shipment_info_id': shipment_info.key.id(),
}
task_continue_url = links.SOC_LINKER.site(
url_names.GSOC_SHIPMENT_TASK_CONTINUE)
taskqueue.add(url=task_continue_url, params=params)
_finishSync(shipment_info)
return responses.terminateTask()
def _finishSync(shipment_info):
"""Finishes one round of shipment tracking sync.
"""
shipment_info.last_sync_time = datetime.datetime.now()
if shipment_info.status == 'syncing':
shipment_info.status = 'half-complete'
elif shipment_info.status == 'half-complete':
shipment_info.status = 'idle'
shipment_info.put()
SHIPMENT_SYNC_TIMEOUT = 20000 # 20 seconds
class ShipmentSyncTask(object):
"""Task handler syncing shipments tracking data.
"""
def __init__(self, *args, **kwargs):
super(ShipmentSyncTask, self).__init__()
self._shipment_info = None
def djangoURLPatterns(self):
"""Returns the URL patterns for the tasks in this module
"""
patterns = [
django_url(r'^tasks/gsoc/shipment_tracking/sync/start$',
self.startShipmentSync,
name=url_names.GSOC_SHIPMENT_TASK_START),
django_url(r'^tasks/gsoc/shipment_tracking/sync/continue$',
self.continueShipmentSync,
name=url_names.GSOC_SHIPMENT_TASK_CONTINUE),
]
return patterns
def startShipmentSync(self, request, *args, **kwargs):
"""Run _startShipmentSync while presuming an error for the fallback.
"""
try:
params = _paramsFromPostOrGet(request, [
'program_key', 'shipment_info_id', 'sheet_content', 'sheet_type'])
except _ParamNotFoundError:
return responses.terminateTask()
program_key_str, shipment_info_id, sheet_content, sheet_type = params
program_key = db.Key(program_key_str)
shipment_info = _getShipmentInfo(program_key, int(shipment_info_id))
try:
return _startShipmentSync(
program_key, shipment_info, sheet_content, sheet_type)
except _ColumnNotFoundError:
_setShipmentInfoStatusToError(shipment_info)
return responses.terminateTask()
except Exception:
_setShipmentInfoStatusToError(shipment_info)
raise
def continueShipmentSync(self, request, *args, **kwargs):
"""Run _continueShipmentSync while presuming an error for the fallback.
"""
try:
params = _paramsFromPostOrGet(request, [
'program_key', 'shipment_info_id', 'column_indexes', 'sheet_rows'])
except _ParamNotFoundError:
return responses.terminateTask()
program_key_str, shipment_info_id, column_indexes, sheet_rows = params
program_key = db.Key(program_key_str)
shipment_info = _getShipmentInfo(program_key, int(shipment_info_id))
timer = timekeeper.Timekeeper(SHIPMENT_SYNC_TIMEOUT)
try:
return _continueShipmentSync(
timer, program_key, shipment_info, column_indexes, sheet_rows)
except Exception:
_setShipmentInfoStatusToError(shipment_info)
raise