from Testing import ZopeTestCase from Products.CMFPlone.tests import PloneTestCase from Products.CMFCore.utils import getToolByName from Products.Archetypes.tests import ArchetypesTestCase from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import noSecurityManager from Products.PloneSMSCommunicator.pyXIAM import SMSsubmitRequest, getAllTextFromTag from xml.dom.minidom import * from StringIO import StringIO import SimpleXiamServer import BaseHTTPServer import thread ZopeTestCase.installProduct('PloneSMSCommunicator') tests=[] class SmSHandler(SimpleXiamServer.SimpleXiamHandler): server_url = None def testCommunicator(self): cl = int(self.headers['content-length']) doc = parseString(self.rfile.read(cl)) self.assertEqual(getAllTextFromTag(doc, 'from')[0], '+380979312198') self.assertEqual(getAllTextFromTag(doc, 'to')[0], '+380979987348') self.assertEqual(getAllTextFromTag(doc, 'content')[0], 'hello how are you?') def do_POST(self): SimpleXiamServer.SimpleXiamHandler.do_POST(self) self.testCommunicator() class TestPloneSMSCommunicator(ArchetypesTestCase.ArcheSiteTestCase, ZopeTestCase.Functional): httpd = BaseHTTPServer.HTTPServer(("", 10010), SmSHandler) def createManager(self, portal): acl_users = portal.acl_users return acl_users._doAddUser('PortalManager', '', ['Manager'], (), (), ) def startServer(self): print 'serving at port', 10010 self.httpd.serve_forever() def installProducts(self, portal): #create manager user self.createManager(portal) user = portal.acl_users.getUserById('PortalManager') #login as manager newSecurityManager(None, user) #install products qi = getToolByName(portal, 'portal_quickinstaller') qi.installProduct('PloneSMSCommunicator') #log out noSecurityManager() def afterSetUp(self): portal = self.portal self.installProducts(portal) def test_sendRequest(self): portal = self.portal #start new thread th = thread.start_new_thread(self.startServer, ()) communicator = portal.portal_smsCommunicator communicator.setLogFlag(False) communicator.setServer('http://localhost:10010/') communicator.send_Request(originator = '+380979312198', destination = ['+380979987348'], body = 'hello how are you?') #destroy thread del th def test_Response(self): portal = self.portal communicator = portal.portal_smsCommunicator communicator.setLogFlag(False) response = """ +380979987348 """ out = StringIO() out.write(response) communicator = portal.portal_smsCommunicator data = self.publish(portal.id+'/portal_smsCommunicator/Response', 'mgr:mgrpw', env=None, extra=None, request_method='POST', stdin = out) response = data.getBody() self.assertEqual(response, """\n\nXML contained your response messages""") tests.append(TestPloneSMSCommunicator) def test_suite(): from unittest import TestSuite, makeSuite suite = TestSuite() suite.addTest(makeSuite(TestPloneSMSCommunicator)) return suite if __name__ == '__main__': framework()