| # Copyright 2008 the Melange authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Common testing utilities.""" |
| |
| import cgi |
| import hashlib |
| import os |
| import datetime |
| import httplib |
| import StringIO |
| import unittest |
| |
| # TODO(daniel): gaetestbed is deprecated; it should not be used. |
| import gaetestbed |
| from mox import stubout |
| |
| from google.appengine.api import datastore |
| from google.appengine.datastore import datastore_stub_util |
| from google.appengine.ext import blobstore |
| from google.appengine.ext import db |
| from google.appengine.ext import testbed |
| |
| from django.test import client |
| from django.test import TestCase |
| |
| from soc.logic.helper import xsrfutil |
| from soc.middleware import xsrf as xsrf_middleware |
| from soc.modules import callback |
| from soc.views import template |
| |
| from tests import profile_utils |
| from tests import program_utils |
| from tests import timeline_utils |
| |
| |
| # key of request argument associated with testbed object |
| TESTBED_ARG_KEY = 'TESTBED' |
| |
| class MockRequest(object): |
| """Shared dummy request object to mock common aspects of a request. |
| |
| Before using the object, start should be called, when done (and |
| before calling start on a new request), end should be called. |
| """ |
| |
| def __init__(self, path=None, method='GET'): |
| """Creates a new empty request object. |
| |
| self.REQUEST, self.GET and self.POST are set to an empty |
| dictionary, and path to the value specified. |
| """ |
| |
| self.REQUEST = {} |
| self.GET = {} |
| self.POST = {} |
| self.META = {} |
| self.path = path |
| self.method = method |
| |
| def get_full_path(self): |
| # TODO: if needed add GET params |
| return self.path |
| |
| def start(self): |
| """Readies the core for a new request. |
| """ |
| |
| core = callback.getCore() |
| core.startNewRequest(self) |
| |
| def end(self): |
| """Finishes up the current request. |
| """ |
| |
| core = callback.getCore() |
| core.endRequest(self, False) |
| |
| |
| def get_general_raw(args_names): |
| """Gets a general_raw function object. |
| """ |
| |
| def general_raw(*args, **kwargs): |
| """Sends a raw information. |
| |
| That is the parameters passed to the return function that is mentioned |
| in corresponding stubout.Set |
| """ |
| |
| num_args = len(args) |
| result = kwargs.copy() |
| for i, name in enumerate(args_names): |
| if i < num_args: |
| result[name] = args[i] |
| else: |
| result[name] = None |
| if len(args_names) < num_args: |
| result['__args__'] = args[num_args:] |
| return result |
| return general_raw |
| |
| |
| class StuboutHelper(object): |
| """Utilities for view test. |
| """ |
| |
| def __init__(self): |
| """Creates a new ViewTest object. |
| """ |
| |
| #Creates a StubOutForTesting object |
| self.stubout = stubout.StubOutForTesting() |
| |
| def tearDown(self): |
| """Tear down the stubs that were set up. |
| """ |
| |
| self.stubout.UnsetAll() |
| |
| def stuboutBase(self): |
| """Applies basic stubout replacements. |
| """ |
| pass |
| |
| def stuboutElement(self, parent, child_name, args_names): |
| """Applies a specific stubout replacement. |
| |
| Replaces child_name's old definition with the new definition which has |
| a list of arguments (args_names), in the context of the given parent. |
| """ |
| |
| self.stubout.Set(parent, child_name, get_general_raw(args_names)) |
| |
| |
| class NonFailingFakePayload(object): |
| """Extension of Django FakePayload class that includes seek and readline |
| methods. |
| """ |
| |
| def __init__(self, content): |
| self.__content = StringIO.StringIO(content) |
| self.__len = len(content) |
| |
| def read(self, num_bytes=None): |
| if num_bytes is None: |
| num_bytes = self.__len or 1 |
| assert self.__len >= num_bytes, \ |
| "Cannot read more than the available bytes from the HTTP incoming data." |
| content = self.__content.read(num_bytes) |
| self.__len -= num_bytes |
| return content |
| |
| def seek(self, pos, mode=0): |
| return self.__content.seek(pos, mode) |
| |
| def readline(self, length=None): |
| return self.__content.readline(length) |
| |
| |
| class SoCTestCase(unittest.TestCase): |
| """Base test case to be subclassed. |
| |
| Common data are seeded and common helpers are created to make testing easier. |
| """ |
| |
| def programType(self): |
| """Returns a string identifying the type of the program being tested. |
| |
| Extending classes must override this abstract method. |
| |
| Returns: |
| A string such as "gsoc" or "gci" identifying the type of program being |
| tested. |
| """ |
| raise NotImplementedError() |
| |
| def init(self): |
| """Performs test setup. |
| |
| Sets the following attributes: |
| dev_test: True iff DEV_TEST is in environment (in parent) |
| """ |
| self.dev_test = 'DEV_TEST' in os.environ |
| |
| self.testbed = testbed.Testbed() |
| self.testbed.activate() |
| |
| self.policy = datastore_stub_util.PseudoRandomHRConsistencyPolicy( |
| probability=1) |
| self.testbed.init_datastore_v3_stub(consistency_policy=self.policy) |
| |
| self.testbed.init_blobstore_stub() |
| |
| @staticmethod |
| def use_hr_schema(func): |
| """Wrapper that makes the specified function to be called in HR datastore |
| schema environment. |
| |
| It is important to mention that only the tested function is executed in |
| this environment. In particular, setUp() and tearDown() function are |
| run in the regular schema that is similar to Master/Slave. |
| """ |
| |
| def wrapper(self): |
| """Wrapper that sets the probability to zero and calls the wrapped |
| function. |
| |
| Args: |
| self: an instance of SoCTestCase that invoked the wrapped function |
| """ |
| self.policy.SetProbability(0) |
| func(self) |
| self.policy.SetProbability(1) |
| |
| return wrapper |
| |
| def assertSameEntity(self, expected_entity, actual_entity, msg=''): |
| """App Engine entities comparison. |
| |
| It asserts that both expected_entity and actual_entity represent |
| the same entity. The check is performed by comparing keys of the |
| two specified entities. |
| |
| Args: |
| expected_entity: the expected entity |
| actual_entity: the actual entity |
| msg: the optional message |
| """ |
| if not isinstance(expected_entity, db.Model): |
| raise TypeError('expected_entity has wrong type: %s' % |
| type(expected_entity)) |
| |
| if not isinstance(actual_entity, db.Model): |
| raise TypeError('actual_entity has wrong type: %s' % |
| type(actual_entity)) |
| |
| self.assertEqual(expected_entity.key(), actual_entity.key(), msg) |
| |
| def createBlob(self, path, content=None): |
| """Puts new Blob upload for the specified path and optional content. |
| |
| Args: |
| path: path of the file to upload. |
| content: content of the file. |
| |
| Returns: |
| blobstore.BlobKey of the created blob. |
| """ |
| filename = os.path.basename(path) |
| content = content or 'fake content' |
| # for some reason CreateBlob function returns datastore_types.Key instead |
| # of datastore_types.BlobKey, whereas the latter type is required for |
| # BlobReferenceProperty to work. |
| self.testbed.get_stub('blobstore').CreateBlob(filename, content) |
| return blobstore.BlobKey(filename) |
| |
| |
| # TODO(nathaniel): Drop "gsoc" attribute in favor of "program". |
| class GSoCTestCase(SoCTestCase): |
| """GSoCTestCase for GSoC tests. |
| |
| Common data are seeded and common helpers are created to make testing easier. |
| |
| Attributes: |
| gsoc: A GSoCProgram. |
| program: The same GSoCProgram as "gsoc". |
| org: A GSoCOrganization. |
| org_app: An OrgAppSurvey. |
| sponsor: A Sponsor. |
| site: A Site. |
| profile_helper: A GSoCProfileHelper. |
| program_helper: A GSoCProgramHelper. |
| timeline_helper: A GSoCTimelineHelper. |
| """ |
| |
| def programType(self): |
| """See SoCTestCase.programType for specification.""" |
| return 'gsoc' |
| |
| def init(self): |
| """Performs test set-up by seeding data and setting attributes.""" |
| super(GSoCTestCase, self).init() |
| self.program_helper = program_utils.GSoCProgramHelper() |
| self.sponsor = self.program_helper.createSponsor() |
| self.gsoc = self.program = self.program_helper.createProgram() |
| self.site = self.program_helper.createSite() |
| self.org = self.program_helper.createOrg() |
| self.org_app = self.program_helper.createOrgApp() |
| self.timeline_helper = timeline_utils.GSoCTimelineHelper( |
| self.gsoc.timeline, self.org_app) |
| self.profile_helper = profile_utils.GSoCProfileHelper( |
| self.gsoc, self.dev_test) |
| |
| |
| # TODO(nathaniel): Drop "gci" attribute in favor of "program". |
| class GCITestCase(SoCTestCase): |
| """GCITestCase for GCI tests. |
| |
| Common data are seeded and common helpers are created to make testing easier. |
| |
| Attributes: |
| gci: A GCIProgram. |
| program: The same GCIProgram as "gci". |
| site: A Site. |
| sponsor: A Sponsor. |
| org: A GCIOrganization. |
| org_app: An OrgAppSurvey. |
| profile_helper: A GCIProfileHelper. |
| program_helper: A GCIProgramHelper. |
| timeline_helper: A GCITimelineHelper. |
| """ |
| |
| def programType(self): |
| """See SoCTestCase.programType for specification.""" |
| return 'gci' |
| |
| def init(self): |
| """Performs test set-up by seeding data and setting attributes.""" |
| super(GCITestCase, self).init() |
| self.program_helper = program_utils.GCIProgramHelper() |
| self.sponsor = self.program_helper.createSponsor() |
| self.gci = self.program = self.program_helper.createProgram() |
| self.site = self.program_helper.createSite() |
| self.org = self.program_helper.createOrg() |
| self.org_app = self.program_helper.createOrgApp() |
| self.timeline_helper = timeline_utils.GCITimelineHelper( |
| self.gci.timeline, self.org_app) |
| self.profile_helper = profile_utils.GCIProfileHelper( |
| self.gci, self.dev_test) |
| |
| |
| class DjangoTestCase(TestCase): |
| """Class extending Django TestCase in order to extend its functions. |
| |
| As well as remove the functions which are not supported by Google App Engine, |
| e.g. database flush and fixtures loading without the assistance of Google |
| App Engine Helper for Django. |
| """ |
| |
| _request_id = 0 |
| |
| def _pre_setup(self): |
| """Performs any pre-test setup. |
| """ |
| client.FakePayload = NonFailingFakePayload |
| |
| def _post_teardown(self): |
| """ Performs any post-test cleanup.""" |
| # presence of this function is required by Django test runner |
| pass |
| |
| def createOrg(self, override={}): |
| """Creates an organization for the defined properties. |
| """ |
| pass |
| |
| def seed(self, model, properties): |
| """Returns a instance of model, seeded with properties. |
| """ |
| from soc.modules.seeder.logic.seeder import logic as seeder_logic |
| return seeder_logic.seed(model, properties, recurse=False) |
| |
| def seedProperties(self, model, properties): |
| """Returns seeded properties for the specified model. |
| """ |
| from soc.modules.seeder.logic.seeder import logic as seeder_logic |
| return seeder_logic.seed_properties(model, properties, recurse=False) |
| |
| def gen_request_id(self): |
| """Generate a request id. |
| """ |
| os.environ['REQUEST_ID_HASH'] = hashlib.sha1(str( |
| DjangoTestCase._request_id)).hexdigest()[:8].upper() |
| DjangoTestCase._request_id += 1 |
| |
| def get(self, url): |
| """Performs a get to the specified url. |
| """ |
| self.gen_request_id() |
| response = self.client.get(url) |
| return response |
| |
| def post(self, url, postdata={}): |
| """Performs a post to the specified url with postdata. |
| |
| Takes care of setting the xsrf_token. |
| """ |
| self.gen_request_id() |
| postdata['xsrf_token'] = self.getXsrfToken(url, site=self.site) |
| |
| extra = {TESTBED_ARG_KEY: self.testbed} |
| response = self.client.post(url, postdata, **extra) |
| postdata.pop('xsrf_token') |
| return response |
| |
| def modelPost(self, url, model, properties): |
| """Performs a post to the specified url after seeding for model. |
| |
| Calls post(). |
| """ |
| properties = self.seedProperties(model, properties) |
| response = self.post(url, properties) |
| return response, properties |
| |
| def buttonPost(self, url, button_name, postdata=None): |
| """Performs a post to url simulating that button_name is clicked. |
| |
| Calls post(). |
| """ |
| combined_postdata = {button_name: ''} |
| if postdata: |
| combined_postdata.update(postdata) |
| url = '%s?button' % url |
| response = self.post(url, combined_postdata) |
| return response |
| |
| def createDocumentForPrefix(self, prefix, override={}): |
| """Creates a document for the specified properties. |
| """ |
| from soc.models.document import Document |
| from soc.modules.seeder.logic.providers.string import ( |
| DocumentKeyNameProvider) |
| properties = { |
| 'modified_by': self.profile_helper.user, |
| 'author': self.profile_helper.user, |
| 'home_for': None, |
| 'prefix': prefix, |
| 'scope': self.program, |
| 'read_access': 'public', |
| 'key_name': DocumentKeyNameProvider(), |
| } |
| properties.update(override) |
| return self.seed(Document, properties) |
| |
| @classmethod |
| def getXsrfToken(cls, path=None, method='POST', data={}, site=None, **extra): |
| """Returns an XSRF token for request context. |
| |
| It is signed by Melange XSRF middleware. |
| Add this token to POST data in order to pass the validation check of |
| Melange XSRF middleware for HTTP POST. |
| """ |
| |
| """ |
| request = HttpRequest() |
| request.path = path |
| request.method = method |
| """ |
| # request is currently not used in _getSecretKey |
| class SiteContainingRequest(object): |
| def __init__(self, site): |
| if site: |
| self.site = site |
| request = SiteContainingRequest(site) |
| # TODO(nathaniel): module API violation. |
| key = xsrf_middleware._GetSecretKey(request) |
| user_id = xsrfutil._getCurrentUserId() |
| xsrf_token = xsrfutil._generateToken(key, user_id) |
| return xsrf_token |
| |
| def getJsonResponse(self, url): |
| """Returns the list reponse for the specified url and index. |
| """ |
| return self.client.get(url + '?fmt=json&marker=1') |
| |
| def getListResponse(self, url, idx, start=None, limit=None): |
| """Returns the list reponse for the specified url and index. |
| """ |
| url = [url,'?fmt=json&marker=1&&idx=', str(idx)] |
| if limit: |
| url += ["&limit=", str(limit)] |
| if start: |
| url += ['&start=', start] |
| return self.client.get(''.join(url)) |
| |
| def getListData(self, url, idx): |
| """Returns all data from a list view. |
| """ |
| result = [] |
| start = '' |
| |
| i = 0 |
| |
| while start != 'done': |
| i += 1 |
| response = self.getListResponse(url, idx, start, 1000) |
| data = response.context['data'][start] |
| self.assertIsJsonResponse(response) |
| result += data |
| start = response.context['next'] |
| |
| return result |
| |
| def assertRenderAll(self, response): |
| """Calls render on all objects that are renderable. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| for contexts in response.context or []: |
| for context in contexts: |
| values = context.values() if isinstance(context, dict) else [context] |
| for value in values: |
| try: |
| iterable = iter(value) |
| except TypeError: |
| iterable = [value] |
| for i in iterable: |
| # make it easier to debug render failures |
| if isinstance(i, template.Template): |
| i.render() |
| |
| def assertErrorTemplatesUsed(self, response): |
| """Assert that all the error templates were used. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| self.assertNotEqual(response.status_code, httplib.OK) |
| # TODO(SRabbelier): update this when we use an error template again |
| # self.assertTemplateUsed(response, 'soc/error.html') |
| |
| def assertResponseCode(self, response, status_code): |
| """Asserts that the response status is status_code. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| status_code: expected status code of the response. |
| """ |
| # first ensure that no render failures occurred |
| self.assertRenderAll(response) |
| |
| if response.status_code != status_code: |
| verbose_codes = [ |
| httplib.FOUND, |
| ] |
| message_codes = [ |
| httplib.FORBIDDEN, httplib.BAD_REQUEST, httplib.NOT_FOUND, |
| ] |
| url_codes = [httplib.NOT_FOUND] |
| |
| if response.status_code in verbose_codes: |
| print response |
| |
| if response.context and response.status_code in message_codes: |
| try: |
| print response.context['message'] |
| except KeyError: |
| pass |
| |
| if response.status_code in url_codes: |
| print response.request['PATH_INFO'] |
| |
| self.assertEqual(status_code, response.status_code) |
| |
| def assertResponseOK(self, response): |
| """Asserts that the response status is OK. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| self.assertResponseCode(response, httplib.OK) |
| |
| def assertResponseRedirect(self, response, url=None): |
| """Asserts that the response status is FOUND. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| url: expected URL to which the response should redirect. |
| """ |
| self.assertResponseCode(response, httplib.FOUND) |
| if url: |
| url = "http://testserver" + url |
| self.assertEqual(url, response["Location"]) |
| |
| def assertResponseForbidden(self, response): |
| """Asserts that the response status is FORBIDDEN. |
| |
| Does not raise an error if dev_test is set. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| if self.dev_test: |
| return |
| self.assertResponseCode(response, httplib.FORBIDDEN) |
| |
| def assertResponseBadRequest(self, response): |
| """Asserts that the response status is BAD_REQUEST. |
| """ |
| self.assertResponseCode(response, httplib.BAD_REQUEST) |
| |
| def assertResponseNotFound(self, response): |
| """Asserts that the response status is NOT_FOUND. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| |
| self.assertResponseCode(response, httplib.NOT_FOUND) |
| |
| def assertIsJsonResponse(self, response): |
| """Asserts that all the templates from the base view were used. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| self.assertResponseOK(response) |
| self.assertEqual('application/json', response['Content-Type']) |
| self.assertTemplateUsed(response, 'json_marker.html') |
| |
| def assertPropertiesEqual(self, properties, entity): |
| """Asserts that all properties are set on the specified entity. |
| |
| Reference properties are compared by their key. |
| Any date/time objects are ignored. |
| """ |
| errors = [] |
| |
| for key, value in properties.iteritems(): |
| if key == 'key_name': |
| prop = entity.key().name() |
| elif key == 'parent': |
| prop = entity.parent() |
| else: |
| prop = getattr(entity, key) |
| |
| if isinstance(value, db.Model) or isinstance(prop, db.Model): |
| value = repr(value.key()) if value else value |
| prop = repr(prop.key()) if prop else prop |
| |
| if isinstance(value, datetime.date) or isinstance(value, datetime.time): |
| continue |
| |
| msg = "property %s: '%r' != '%r'" % (key, value, prop) |
| |
| try: |
| self.assertEqual(value, prop, msg=msg) |
| except AssertionError: |
| errors.append(msg) |
| |
| if errors: |
| self.fail("\n".join(errors)) |
| |
| |
| class GSoCDjangoTestCase(DjangoTestCase, GSoCTestCase): |
| """DjangoTestCase specifically for GSoC view tests. |
| """ |
| |
| def init(self): |
| """Performs test setup. |
| """ |
| # Initialize instances in the parent first |
| super(GSoCDjangoTestCase, self).init() |
| |
| def createOrg(self, override={}): |
| """Creates an organization for the defined properties. |
| """ |
| from soc.modules.gsoc.models.organization import GSoCOrganization |
| |
| properties = {'scope': self.gsoc, 'status': 'active', |
| 'scoring_disabled': False, 'max_score': 5, |
| 'home': None, 'program': self.gsoc} |
| properties.update(override) |
| return self.seed(GSoCOrganization, properties) |
| |
| def createDocument(self, override={}): |
| return self.createDocumentForPrefix('gsoc_program', override) |
| |
| def assertGSoCTemplatesUsed(self, response): |
| """Asserts that all the templates from the base view were used. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| self.assertTemplateUsed(response, 'modules/gsoc/base.html') |
| self.assertTemplateUsed(response, 'modules/gsoc/footer.html') |
| self.assertTemplateUsed(response, 'modules/gsoc/header.html') |
| self.assertTemplateUsed(response, 'modules/gsoc/mainmenu.html') |
| |
| |
| class GCIDjangoTestCase(DjangoTestCase, GCITestCase): |
| """DjangoTestCase specifically for GCI view tests. |
| """ |
| |
| def init(self): |
| """Performs test setup. |
| """ |
| # Initialize instances in the parent first |
| super(GCIDjangoTestCase, self).init() |
| super(GCIDjangoTestCase, self).init() |
| |
| def assertGCITemplatesUsed(self, response): |
| """Asserts that all the templates from the base view were used. |
| |
| Args: |
| response: Django's http.HttpResponse object. |
| """ |
| self.assertResponseOK(response) |
| for contexts in response.context: |
| for context in contexts: |
| for value in context.values(): |
| # make it easier to debug render failures |
| if isinstance(value, template.Template): |
| value.render() |
| self.assertTemplateUsed(response, 'modules/gci/base.html') |
| self.assertTemplateUsed(response, 'modules/gci/_footer.html') |
| self.assertTemplateUsed(response, 'modules/gci/_header.html') |
| self.assertTemplateUsed(response, 'modules/gci/_mainmenu.html') |
| |
| def createDocument(self, override={}): |
| return self.createDocumentForPrefix('gci_program', override) |
| |
| |
| class MailTestCase(gaetestbed.mail.MailTestCase, SoCTestCase): |
| """Class extending gaetestbed.mail.MailTestCase to extend its functions. |
| |
| Difference: |
| * Subclass unittest.TestCase so that all its subclasses need not subclass |
| unittest.TestCase in their code. |
| * Override assertEmailSent method. |
| """ |
| |
| def get_sent_messages(self, to=None, cc=None, bcc=None, sender=None, |
| subject=None, body=None, html=None): |
| """Override gaetestbed.mail.MailTestCase.get_sent_messages method. |
| |
| Difference: |
| * It checks cc and bcc as well. |
| """ |
| messages = super(MailTestCase, self).get_sent_messages(to, sender, subject, |
| body, html) |
| if cc: |
| messages = [m for m in messages if cc in m.cc_list()] |
| if bcc: |
| messages = [m for m in messages if bcc in m.bcc_list()] |
| return messages |
| |
| def assertEmailSent(self, to=None, cc=None, bcc=None, sender=None, |
| subject=None, body=None, html=None, n=None, fullbody=False): |
| """Override gaetestbed.mail.MailTestCase.assertEmailSent method. |
| |
| Difference: |
| * It runs all mail tasks first. |
| * It prints out all sent messages to facilitate debug in case of failure. |
| * It accepts an optional argument n which is used to assert exactly n |
| messages satisfying the criteria are sent out. |
| * Clips textbody to the first 50 characters, unless fullbody is True. |
| """ |
| |
| # Run all mail tasks first so that all mails will be sent out |
| self.runTasks(url = '/tasks/mail/send_mail', queue_names = ['mail']) |
| messages = self.get_sent_messages( |
| to = to, |
| cc = cc, |
| bcc = bcc, |
| sender = sender, |
| subject = subject, |
| body = body, |
| html = html, |
| ) |
| failed = False |
| if not messages: |
| failed = True |
| failure_message = "Expected e-mail message sent. No messages sent" |
| details = self._get_email_detail_string(to, sender, subject, body, html) |
| if details: |
| failure_message += ' with %s.' % details |
| else: |
| failure_message += '.' |
| elif n: |
| actual_n = len(messages) |
| if n != actual_n: |
| failed = True |
| failure_message = ("Expected e-mail message sent." |
| "Expected %d messages sent" % n) |
| details = self._get_email_detail_string(to, sender, subject, body, html) |
| if details: |
| failure_message += ' with %s;' % details |
| else: |
| failure_message += ';' |
| failure_message += ' but actually %d.' % actual_n |
| # If failed, raise error and display all messages sent |
| if failed: |
| all_messages = self.get_sent_messages() |
| failure_message += '\nAll messages sent: ' |
| if all_messages: |
| failure_message += '\n' |
| for message in all_messages: |
| if not fullbody: |
| message.set_textbody(message.textbody()[:50]) |
| message.set_htmlbody(message.htmlbody()[:50]) |
| failure_message += str(message) |
| else: |
| failure_message += 'None' |
| self.fail(failure_message) |
| |
| def runTasks(self, url=None, name=None, queue_names=None): |
| """Run tasks with specified URL and name in specified task queues. |
| |
| Args: |
| url: URL associated with the task to run. |
| name: name of the task. |
| queue_names: names of task queues in which the tasks should be executed. |
| """ |
| task_queue_test_case = gaetestbed.taskqueue.TaskQueueTestCase() |
| # Get all tasks with specified url and name in specified task queues |
| tasks = task_queue_test_case.get_tasks( |
| url=url, name=name, queue_names=queue_names) |
| for task in tasks: |
| postdata = task['params'] |
| xsrf_token = GSoCDjangoTestCase.getXsrfToken(url, data=postdata) |
| postdata.update(xsrf_token=xsrf_token) |
| # Run the task with Django test client |
| self.post(url, postdata) |
| |
| |
| class TaskQueueTestCase(gaetestbed.taskqueue.TaskQueueTestCase, |
| unittest.TestCase): |
| """Class extending gaetestbed.taskqueue.TaskQueueTestCase. |
| |
| Difference: |
| * Subclass unittest.TestCase so that all its subclasses need not subclass |
| unittest.TestCase in their code. |
| """ |
| |
| def setUp(self): |
| """Sets up gaetestbed.taskqueue.TaskQueueTestCase. |
| """ |
| |
| super(TaskQueueTestCase, self).setUp() |
| |
| |
| class FakeBlobstoreMiddleware(object): |
| """Middleware class to pre-process file uploads so that they are accessible |
| by test functions. |
| |
| Actual files cannot be handled properly by the regular, as Django does not |
| recognize them as Blob uploads, as in production or on devappserver they |
| are first processed by AppEngine. |
| |
| This class manually finds file uploads by checking if there is 'filename' |
| element in their Content-Disposition header. Mock BlobInfo instances |
| are added to the processed request so that actual views see them. |
| """ |
| |
| def process_request(self, request): |
| """Processes request by handling all possible file uploads. |
| |
| It implements a hook for Django middleware as described in its |
| documentation. |
| |
| Args: |
| request: A django.http.HttpRequest. |
| """ |
| request.file_uploads = {} |
| |
| # we only care about POST and which has form data with file. |
| if request.method == 'POST' and ( |
| 'multipart/form-data' in request.META.get('CONTENT_TYPE', '')): |
| |
| testbed = request.META[TESTBED_ARG_KEY] |
| wsgi_input = request.META['wsgi.input'] |
| wsgi_input.seek(0) |
| |
| fields = cgi.FieldStorage(wsgi_input, environ=request.META) |
| |
| for key in fields: |
| field = fields[key] |
| if isinstance(field, cgi.FieldStorage): |
| if ('content-disposition' in field.headers and |
| 'filename' in field.disposition_options): |
| |
| # create a mock blob info and assign it to request data |
| filename = field.disposition_options['filename'] |
| blob_info = testbed.get_stub('blobstore').CreateBlob( |
| filename, 'fake content') |
| |
| # set other properties of blob info |
| blob_info['filename'] = filename |
| blob_info['content_type'] = field.headers['content-type'] |
| datastore.Put(blob_info) |
| |
| # set request data |
| request.file_uploads[key] = blob_info |
| request.POST[key] = filename |
| |
| # format blob info for Django by adding the name property. |
| blob_info.name = field.disposition_options['filename'] |
| blob_info.size = blob_info['size'] |