Package cherrypy :: Package test :: Module test_iterator
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_iterator

  1  import cherrypy 
  2  from cherrypy._cpcompat import unicodestr 
3 4 -class IteratorBase(object):
5 6 created = 0 7 datachunk = u'butternut squash' * 256 8 9 @classmethod
10 - def incr(cls):
11 cls.created += 1
12 13 @classmethod
14 - def decr(cls):
15 cls.created -= 1
16
17 -class OurGenerator(IteratorBase):
18
19 - def __iter__(self):
20 self.incr() 21 try: 22 for i in range(1024): 23 yield self.datachunk 24 finally: 25 self.decr()
26
27 -class OurIterator(IteratorBase):
28 29 started = False 30 closed_off = False 31 count = 0 32
33 - def increment(self):
34 self.incr()
35
36 - def decrement(self):
37 if not self.closed_off: 38 self.closed_off = True 39 self.decr()
40
41 - def __iter__(self):
42 return self
43
44 - def __next__(self):
45 if not self.started: 46 self.started = True 47 self.increment() 48 self.count += 1 49 if self.count > 1024: 50 raise StopIteration 51 return self.datachunk
52 53 next = __next__ 54
55 - def __del__(self):
56 self.decrement()
57
58 -class OurClosableIterator(OurIterator):
59
60 - def close(self):
61 self.decrement()
62
63 -class OurNotClosableIterator(OurIterator):
64 65 # We can't close something which requires an additional argument.
66 - def close(self, somearg):
67 self.decrement()
68
69 -class OurUnclosableIterator(OurIterator):
70 close = 'close' # not callable!
71 72 from cherrypy.test import helper
73 -class IteratorTest(helper.CPWebCase):
74 75 @staticmethod
76 - def setup_server():
77 78 class Root(object): 79 80 @cherrypy.expose 81 def count(self, clsname): 82 cherrypy.response.headers['Content-Type'] = 'text/plain' 83 return unicodestr(globals()[clsname].created)
84 85 @cherrypy.expose 86 def getall(self, clsname): 87 cherrypy.response.headers['Content-Type'] = 'text/plain' 88 return globals()[clsname]()
89 90 @cherrypy.expose 91 def stream(self, clsname): 92 return self.getall(clsname) 93 stream._cp_config = {'response.stream': True} 94 95 cherrypy.tree.mount(Root()) 96
97 - def test_iterator(self):
98 if cherrypy.server.protocol_version != "HTTP/1.1": 99 return self.skip() 100 101 self.PROTOCOL = "HTTP/1.1" 102 103 # Check the counts of all the classes, they should be zero. 104 closables = ['OurClosableIterator', 'OurGenerator'] 105 unclosables = ['OurUnclosableIterator', 'OurNotClosableIterator'] 106 all_classes = closables + unclosables 107 108 import random 109 random.shuffle(all_classes) 110 111 for clsname in all_classes: 112 self.getPage("/count/" + clsname) 113 self.assertStatus(200) 114 self.assertBody('0') 115 116 # We should also be able to read the entire content body 117 # successfully, though we don't need to, we just want to 118 # check the header. 119 for clsname in all_classes: 120 itr_conn = self.get_conn() 121 itr_conn.putrequest("GET", "/getall/" + clsname) 122 itr_conn.endheaders() 123 response = itr_conn.getresponse() 124 self.assertEqual(response.status, 200) 125 headers = response.getheaders() 126 for header_name, header_value in headers: 127 if header_name.lower() == 'content-length': 128 assert header_value == unicodestr(1024 * 16 * 256), header_value 129 break 130 else: 131 raise AssertionError('No Content-Length header found') 132 133 # As the response should be fully consumed by CherryPy 134 # before sending back, the count should still be at zero 135 # by the time the response has been sent. 136 self.getPage("/count/" + clsname) 137 self.assertStatus(200) 138 self.assertBody('0') 139 140 # Now we do the same check with streaming - some classes will 141 # be automatically closed, while others cannot. 142 stream_counts = {} 143 for clsname in all_classes: 144 itr_conn = self.get_conn() 145 itr_conn.putrequest("GET", "/stream/" + clsname) 146 itr_conn.endheaders() 147 response = itr_conn.getresponse() 148 self.assertEqual(response.status, 200) 149 response.fp.read(65536) 150 151 # Let's check the count - this should always be one. 152 self.getPage("/count/" + clsname) 153 self.assertBody('1') 154 155 # Now if we close the connection, the count should go back 156 # to zero. 157 itr_conn.close() 158 self.getPage("/count/" + clsname) 159 160 # If this is a response which should be easily closed, then 161 # we will test to see if the value has gone back down to 162 # zero. 163 if clsname in closables: 164 165 # Sometimes we try to get the answer too quickly - we 166 # will wait for 100 ms before asking again if we didn't 167 # get the answer we wanted. 168 if self.body != '0': 169 import time 170 time.sleep(0.1) 171 self.getPage("/count/" + clsname) 172 173 stream_counts[clsname] = int(self.body) 174 175 # Check that we closed off the classes which should provide 176 # easy mechanisms for doing so. 177 for clsname in closables: 178 assert stream_counts[clsname] == 0, ( 179 'did not close off stream response correctly, expected ' 180 'count of zero for %s: %s' % (clsname, stream_counts) 181 )
182