| # Copyright 2014 the Melange authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Tasks related to syncing shipment tracking data.""" |
| |
| import csv |
| import datetime |
| import json |
| import logging |
| |
| from django.conf.urls import url as django_url |
| |
| from google.appengine.api import taskqueue |
| from google.appengine.ext import db |
| from google.appengine.ext import ndb |
| from google.appengine.runtime import DeadlineExceededError |
| |
| from melange.logic import profile as profile_logic |
| from summerofcode.models.shipment import StudentShipment |
| from summerofcode.models.shipment_tracking import ShipmentInfo |
| from summerofcode.request import links |
| |
| from soc.logic import dicts |
| from soc.tasks import responses |
| from soc.tasks.helper import timekeeper |
| from soc.modules.gsoc.views.helper import url_names |
| |
| DATE_SHIPPED_FORMAT = '%d/%m/%Y' |
| |
| |
| class _ColumnNotFoundError(Exception): |
| """Error to be raised when an expected column is not found in the row.""" |
| |
| |
| class _ParamNotFoundError(Exception): |
| """Error to be raised when an expected parameter is not found in the row.""" |
| |
| |
| def _getShipmentInfo(program_key, shipment_info_id): |
| """Returns the ShipmentInfo entity for the specified program and id.""" |
| ndb_program_key = ndb.Key.from_old_key(program_key) |
| return ShipmentInfo.get_by_id(shipment_info_id, parent=ndb_program_key) |
| |
| |
| def _findColumnIndexes(first_row, expected_columns): |
| """Find column indexes in the first row for expected columns. |
| |
| Args: |
| first_row: List for raw header row data of the sheet. |
| expected_columns: List of expected columns in the first row. All |
| elements of the list are expected to be found in the |
| first row. |
| Returns: |
| Returns a dictionairy containing the indexes with the expected columns |
| as the keys, with as value the index in the row data. |
| """ |
| column_indexes = {} |
| |
| for column_name in expected_columns: |
| try: |
| column_index = first_row.index(column_name) |
| except ValueError: |
| raise _ColumnNotFoundError( |
| '%s not found in %s' % (column_name, first_row)) |
| column_indexes[column_name] = column_index |
| |
| return column_indexes |
| |
| |
| def _getRowData(row, column_indexes): |
| """Converts a list of row data into a dictionary with the same data.""" |
| return {column_name: row[column_index] |
| for column_name, column_index in column_indexes.iteritems()} |
| |
| |
| def _paramsFromPostOrGet(request, fields): |
| """Extracts a four-tuple of params from the POST or GET dict. |
| |
| Returns: |
| program_key: the key of the program which task is runnig for. |
| sheet_content: sheet content data as a string. |
| sheet_type: 'usa' or 'intl' |
| shipment_info_id: id of the shipment info object that task is running |
| for. |
| """ |
| params = dicts.merge(request.POST, request.GET) |
| result = [] |
| |
| for field in fields: |
| if field not in params: |
| logging.error("missing %s in params: '%s'", field, params) |
| raise _ParamNotFoundError() |
| result.append(params[field]) |
| |
| return result |
| |
| |
| #Expected columns for USA and international student sheets |
| EXPECTED_COLUMNS = ['username', 'tracking'] |
| |
| def _parseSheetContent(sheet_type, sheet_content): |
| """Extracts the that should be passed to continueShipmentTracking. |
| |
| Args: |
| sheet_type: 'usa' or 'intl' |
| sheet_content: sheet content data as a string |
| |
| Returns: |
| A tuple of column_indexes (a list of values) and sheet_rows |
| (a list of lists of values). |
| """ |
| iterable_sheet_content = sheet_content.splitlines() |
| sheet_rows = [row for row in csv.reader(iterable_sheet_content)] |
| |
| if sheet_type == 'usa': |
| column_indexes = _findColumnIndexes( |
| sheet_rows[0], EXPECTED_COLUMNS) |
| |
| elif sheet_type == 'intl': |
| column_indexes = _findColumnIndexes( |
| sheet_rows[0], EXPECTED_COLUMNS) |
| |
| return column_indexes, sheet_rows[1:] |
| |
| |
| def _setShipmentInfoStatusToError(shipment_info): |
| """Fallback function that sets shipment info status to 'error'. |
| """ |
| shipment_info.status = 'error' |
| shipment_info.put() |
| |
| |
| def _startShipmentSync( |
| program_key, shipment_info, sheet_content_json, sheet_type): |
| """Start syncing shipment data. |
| |
| Args: |
| program_key: the key of the program which task is runnig for. |
| shipment_info: the shipment info object that task is running for. |
| sheet_content_json: sheet content data in JSON format. |
| sheet_type: 'usa' or 'intl' |
| """ |
| sheet_content = json.loads(sheet_content_json) |
| column_indexes, sheet_rows = _parseSheetContent(sheet_type, sheet_content) |
| params = { |
| 'program_key': str(program_key), |
| 'shipment_info_id': shipment_info.key.id(), |
| 'column_indexes': json.dumps(column_indexes), |
| 'sheet_rows': json.dumps(sheet_rows), |
| } |
| |
| task_continue_url = links.SOC_LINKER.site( |
| url_names.GSOC_SHIPMENT_TASK_CONTINUE) |
| |
| def update_shipment_txn(): |
| shipment_info.status = 'syncing' |
| shipment_info.put() |
| taskqueue.add(url=task_continue_url, params=params) |
| |
| db.run_in_transaction(update_shipment_txn) |
| return responses.terminateTask() |
| |
| |
| def _updateShipmentDataForStudent(shipment_info, profile, tracking): |
| """Gets or creates the StudentShipment object for the specified profile. |
| |
| Args: |
| shipment_info: The shipment info that is being synced. |
| profile: the Profile of the student |
| tracking: the tracking number that should be set |
| |
| Returns: |
| The StudentShipment object. |
| """ |
| query = StudentShipment.query( |
| StudentShipment.shipment_info == shipment_info.key, |
| ancestor=profile.key) |
| student_shipment = query.get() |
| |
| if not student_shipment: |
| student_shipment = StudentShipment( |
| shipment_info=shipment_info.key, parent=profile.key) |
| |
| student_shipment.tracking = tracking |
| student_shipment.put() |
| |
| |
| def _continueShipmentSyncRow(column_indexes, row, program_key, shipment_info): |
| """Performs the shipment sync for one row of data. |
| |
| Args: |
| column_indexes: column indexes for specific columns as a dictionairy. |
| row: one row from the spreadsheet. |
| program_key: the key of the program which sync is being done for. |
| shipment_info: the shipment info object that task is running for. |
| |
| Returns: |
| The StudentShipment object. |
| """ |
| if len(row) < len(column_indexes): |
| shortage = len(column_indexes) - len(row) |
| row.extend(shortage * ['']) |
| data = _getRowData(row, column_indexes) |
| username = data['username'] |
| |
| profile = profile_logic.getProfileForUsername(username, program_key) |
| |
| if not profile: |
| logging.error( |
| "Profile with username '%s' for program '%s' is not found", |
| username, program_key.id()) |
| return |
| |
| if not profile.is_student: |
| logging.error("Profile with username '%s' is not a student", username) |
| return |
| |
| _updateShipmentDataForStudent(shipment_info, profile, data['tracking']) |
| |
| |
| def _continueShipmentSync( |
| timer, program_key, shipment_info, column_indexes_json, sheet_rows_json): |
| """Continue syncing shipment data. |
| |
| Args: |
| program_key_str: the key of the program which sync is being done for. |
| shipment_info: the shipment info object that task is running for. |
| column_indexes_json: column indexes for specific columns in JSON format. |
| sheet_rows_json: spreadsheets CSV chunk data in JSON format. |
| |
| Returns: |
| The StudentShipment object. |
| """ |
| column_indexes = json.loads(column_indexes_json) |
| sheet_rows = json.loads(sheet_rows_json) |
| |
| try: |
| for remain, row in timer.iterate(sheet_rows): |
| _continueShipmentSyncRow(column_indexes, row, program_key, shipment_info) |
| except DeadlineExceededError: |
| if not remain: |
| return |
| |
| remaining_rows = sheet_rows[(-1 * remain):] |
| params = { |
| 'program_key': str(program_key), |
| 'sheet_rows': json.dumps(remaining_rows), |
| 'column_indexes': column_indexes, |
| 'shipment_info_id': shipment_info.key.id(), |
| } |
| task_continue_url = links.SOC_LINKER.site( |
| url_names.GSOC_SHIPMENT_TASK_CONTINUE) |
| taskqueue.add(url=task_continue_url, params=params) |
| |
| _finishSync(shipment_info) |
| return responses.terminateTask() |
| |
| |
| def _finishSync(shipment_info): |
| """Finishes one round of shipment tracking sync. |
| """ |
| shipment_info.last_sync_time = datetime.datetime.now() |
| |
| if shipment_info.status == 'syncing': |
| shipment_info.status = 'half-complete' |
| |
| elif shipment_info.status == 'half-complete': |
| shipment_info.status = 'idle' |
| |
| shipment_info.put() |
| |
| |
| SHIPMENT_SYNC_TIMEOUT = 20000 # 20 seconds |
| |
| |
| class ShipmentSyncTask(object): |
| """Task handler syncing shipments tracking data. |
| """ |
| |
| def __init__(self, *args, **kwargs): |
| super(ShipmentSyncTask, self).__init__() |
| |
| self._shipment_info = None |
| |
| def djangoURLPatterns(self): |
| """Returns the URL patterns for the tasks in this module |
| """ |
| patterns = [ |
| django_url(r'^tasks/gsoc/shipment_tracking/sync/start$', |
| self.startShipmentSync, |
| name=url_names.GSOC_SHIPMENT_TASK_START), |
| django_url(r'^tasks/gsoc/shipment_tracking/sync/continue$', |
| self.continueShipmentSync, |
| name=url_names.GSOC_SHIPMENT_TASK_CONTINUE), |
| ] |
| return patterns |
| |
| def startShipmentSync(self, request, *args, **kwargs): |
| """Run _startShipmentSync while presuming an error for the fallback. |
| """ |
| try: |
| params = _paramsFromPostOrGet(request, [ |
| 'program_key', 'shipment_info_id', 'sheet_content', 'sheet_type']) |
| except _ParamNotFoundError: |
| return responses.terminateTask() |
| |
| program_key_str, shipment_info_id, sheet_content, sheet_type = params |
| program_key = db.Key(program_key_str) |
| shipment_info = _getShipmentInfo(program_key, int(shipment_info_id)) |
| |
| try: |
| return _startShipmentSync( |
| program_key, shipment_info, sheet_content, sheet_type) |
| except _ColumnNotFoundError: |
| _setShipmentInfoStatusToError(shipment_info) |
| return responses.terminateTask() |
| except Exception: |
| _setShipmentInfoStatusToError(shipment_info) |
| raise |
| |
| def continueShipmentSync(self, request, *args, **kwargs): |
| """Run _continueShipmentSync while presuming an error for the fallback. |
| """ |
| try: |
| params = _paramsFromPostOrGet(request, [ |
| 'program_key', 'shipment_info_id', 'column_indexes', 'sheet_rows']) |
| except _ParamNotFoundError: |
| return responses.terminateTask() |
| |
| program_key_str, shipment_info_id, column_indexes, sheet_rows = params |
| program_key = db.Key(program_key_str) |
| shipment_info = _getShipmentInfo(program_key, int(shipment_info_id)) |
| timer = timekeeper.Timekeeper(SHIPMENT_SYNC_TIMEOUT) |
| |
| try: |
| return _continueShipmentSync( |
| timer, program_key, shipment_info, column_indexes, sheet_rows) |
| except Exception: |
| _setShipmentInfoStatusToError(shipment_info) |
| raise |