1 import threading
2 import time
3 import unittest
4
5 import cherrypy
6 from cherrypy._cpcompat import get_daemon, set
7 from cherrypy.process import wspbus
8
9
10 msg = "Listener %d on channel %s: %s."
11
12
14
16 def listener(arg=None):
17 self.responses.append(msg % (index, channel, arg))
18 return listener
19
21 b = wspbus.Bus()
22
23 self.responses, expected = [], []
24
25 for channel in b.listeners:
26 for index, priority in enumerate([100, 50, 0, 51]):
27 b.subscribe(channel,
28 self.get_listener(channel, index), priority)
29
30 for channel in b.listeners:
31 b.publish(channel)
32 expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
33 b.publish(channel, arg=79347)
34 expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
35
36 self.assertEqual(self.responses, expected)
37
39 b = wspbus.Bus()
40
41 self.responses, expected = [], []
42
43 custom_listeners = ('hugh', 'louis', 'dewey')
44 for channel in custom_listeners:
45 for index, priority in enumerate([None, 10, 60, 40]):
46 b.subscribe(channel,
47 self.get_listener(channel, index), priority)
48
49 for channel in custom_listeners:
50 b.publish(channel, 'ah so')
51 expected.extend([msg % (i, channel, 'ah so')
52 for i in (1, 3, 0, 2)])
53 b.publish(channel)
54 expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
55
56 self.assertEqual(self.responses, expected)
57
59 b = wspbus.Bus()
60
61 self.responses, expected = [], []
62 channels = [c for c in b.listeners if c != 'log']
63
64 for channel in channels:
65 b.subscribe(channel, self.get_listener(channel, 1))
66
67 b.subscribe(channel, lambda: None, priority=20)
68
69 for channel in channels:
70 self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
71 expected.append(msg % (1, channel, 123))
72
73 self.assertEqual(self.responses, expected)
74
75
77
79 self._log_entries = []
80
81 def logit(msg, level):
82 self._log_entries.append(msg)
83 bus.subscribe('log', logit)
84
86 self.assertEqual(self._log_entries, entries)
87
89 def listener(arg=None):
90 self.responses.append(msg % (index, channel, arg))
91 return listener
92
94 b = wspbus.Bus()
95 self.log(b)
96
97 self.responses = []
98 num = 3
99 for index in range(num):
100 b.subscribe('start', self.get_listener('start', index))
101
102 b.start()
103 try:
104
105 self.assertEqual(
106 set(self.responses),
107 set([msg % (i, 'start', None) for i in range(num)]))
108
109
110 self.assertEqual(b.state, b.states.STARTED)
111
112 self.assertLog(['Bus STARTING', 'Bus STARTED'])
113 finally:
114
115 b.exit()
116
118 b = wspbus.Bus()
119 self.log(b)
120
121 self.responses = []
122 num = 3
123 for index in range(num):
124 b.subscribe('stop', self.get_listener('stop', index))
125
126 b.stop()
127
128
129 self.assertEqual(set(self.responses),
130 set([msg % (i, 'stop', None) for i in range(num)]))
131
132 self.assertEqual(b.state, b.states.STOPPED)
133
134 self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
135
137 b = wspbus.Bus()
138 self.log(b)
139
140 self.responses = []
141 num = 3
142 for index in range(num):
143 b.subscribe('graceful', self.get_listener('graceful', index))
144
145 b.graceful()
146
147
148 self.assertEqual(
149 set(self.responses),
150 set([msg % (i, 'graceful', None) for i in range(num)]))
151
152 self.assertLog(['Bus graceful'])
153
155 b = wspbus.Bus()
156 self.log(b)
157
158 self.responses = []
159 num = 3
160 for index in range(num):
161 b.subscribe('stop', self.get_listener('stop', index))
162 b.subscribe('exit', self.get_listener('exit', index))
163
164 b.exit()
165
166
167
168 self.assertEqual(set(self.responses),
169 set([msg % (i, 'stop', None) for i in range(num)] +
170 [msg % (i, 'exit', None) for i in range(num)]))
171
172 self.assertEqual(b.state, b.states.EXITING)
173
174 self.assertLog(
175 ['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
176
183
184 for method, states in [('start', [b.states.STARTED]),
185 ('stop', [b.states.STOPPED]),
186 ('start',
187 [b.states.STARTING, b.states.STARTED]),
188 ('exit', [b.states.EXITING]),
189 ]:
190 threading.Thread(target=f, args=(method,)).start()
191 b.wait(states)
192
193
194 if b.state not in states:
195 self.fail("State %r not in %r" % (b.state, states))
196
204
205 def g():
206 time.sleep(0.4)
207 threading.Thread(target=f).start()
208 threading.Thread(target=g).start()
209 threads = [t for t in threading.enumerate() if not get_daemon(t)]
210 self.assertEqual(len(threads), 3)
211
212 b.block()
213
214
215 self.assertEqual(b.state, b.states.EXITING)
216
217
218 threads = [t for t in threading.enumerate() if not get_daemon(t)]
219 self.assertEqual(len(threads), 1)
220
221
222 self.assertEqual(self._log_entries[:-1],
223 ['Bus STOPPING', 'Bus STOPPED',
224 'Bus EXITING', 'Bus EXITED',
225 'Waiting for child threads to terminate...'])
226
228 b = wspbus.Bus()
229 self.log(b)
230 try:
231 events = []
232
233 def f(*args, **kwargs):
234 events.append(("f", args, kwargs))
235
236 def g():
237 events.append("g")
238 b.subscribe("start", g)
239 b.start_with_callback(f, (1, 3, 5), {"foo": "bar"})
240
241 time.sleep(0.2)
242
243
244 self.assertEqual(b.state, b.states.STARTED)
245
246 self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})])
247 finally:
248 b.exit()
249
251 b = wspbus.Bus()
252 self.log(b)
253 self.assertLog([])
254
255
256 expected = []
257 for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]:
258 b.log(msg)
259 expected.append(msg)
260 self.assertLog(expected)
261
262
263 try:
264 foo
265 except NameError:
266 b.log("You are lost and gone forever", traceback=True)
267 lastmsg = self._log_entries[-1]
268 if "Traceback" not in lastmsg or "NameError" not in lastmsg:
269 self.fail("Last log message %r did not contain "
270 "the expected traceback." % lastmsg)
271 else:
272 self.fail("NameError was not raised as expected.")
273
274
275 if __name__ == "__main__":
276 unittest.main()
277