1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
From 402059e4a46a764632eba8a669f5b012f173ee7b Mon Sep 17 00:00:00 2001
From: Aymeric Augustin <aymeric.augustin@m4x.org>
Date: Tue, 1 May 2018 17:05:05 +0200
Subject: [PATCH] Fix behavior of recv() in the CLOSING state.
The behavior wasn't tested correctly: in some test cases, the connection
had already moved to the CLOSED state, where the close code and reason
are already known.
Refactor half_close_connection_{local,remote} to allow multiple runs of
the event loop while remaining in the CLOSING state. Refactor affected
tests accordingly.
I verified that all tests in the CLOSING state were behaving is intended
by inserting debug statements in recv/send/ping/pong and running:
$ PYTHONASYNCIODEBUG=1 python -m unittest -v websockets.test_protocol.{Client,Server}Tests.test_{recv,send,ping,pong}_on_closing_connection_{local,remote}
Fix #317, #327, #350, #357.
Signed-off-by: Joseph Kogut <joseph.kogut@gmail.com>
---
websockets/protocol.py | 10 ++---
websockets/test_protocol.py | 78 +++++++++++++++++++++++++++++--------
2 files changed, 66 insertions(+), 22 deletions(-)
diff --git a/websockets/protocol.py b/websockets/protocol.py
index f8121a1..7583fe9 100644
--- a/websockets/protocol.py
+++ b/websockets/protocol.py
@@ -303,7 +303,7 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):
# Don't yield from self.ensure_open() here because messages could be
# received before the closing frame even if the connection is closing.
- # Wait for a message until the connection is closed
+ # Wait for a message until the connection is closed.
next_message = asyncio_ensure_future(
self.messages.get(), loop=self.loop)
try:
@@ -315,15 +315,15 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):
next_message.cancel()
raise
- # Now there's no need to yield from self.ensure_open(). Either a
- # message was received or the connection was closed.
-
if next_message in done:
return next_message.result()
else:
next_message.cancel()
if not self.legacy_recv:
- raise ConnectionClosed(self.close_code, self.close_reason)
+ assert self.state in [State.CLOSING, State.CLOSED]
+ # Wait until the connection is closed to raise
+ # ConnectionClosed with the correct code and reason.
+ yield from self.ensure_open()
@asyncio.coroutine
def send(self, data):
diff --git a/websockets/test_protocol.py b/websockets/test_protocol.py
index 70348fb..bfd4e3b 100644
--- a/websockets/test_protocol.py
+++ b/websockets/test_protocol.py
@@ -105,7 +105,7 @@ class CommonTests:
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
- def make_drain_slow(self, delay=3 * MS):
+ def make_drain_slow(self, delay=MS):
# Process connection_made in order to initialize self.protocol.writer.
self.run_loop_once()
@@ -174,6 +174,8 @@ class CommonTests:
# Empty the outgoing data stream so we can make assertions later on.
self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)
+ assert self.protocol.state is State.CLOSED
+
def half_close_connection_local(self, code=1000, reason='close'):
"""
Start a closing handshake but do not complete it.
@@ -181,31 +183,56 @@ class CommonTests:
The main difference with `close_connection` is that the connection is
left in the CLOSING state until the event loop runs again.
+ The current implementation returns a task that must be awaited or
+ cancelled, else asyncio complains about destroying a pending task.
+
"""
close_frame_data = serialize_close(code, reason)
- # Trigger the closing handshake from the local side.
- self.ensure_future(self.protocol.close(code, reason))
+ # Trigger the closing handshake from the local endpoint.
+ close_task = self.ensure_future(self.protocol.close(code, reason))
self.run_loop_once() # wait_for executes
self.run_loop_once() # write_frame executes
# Empty the outgoing data stream so we can make assertions later on.
self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)
- # Prepare the response to the closing handshake from the remote side.
- self.loop.call_soon(
- self.receive_frame, Frame(True, OP_CLOSE, close_frame_data))
- self.loop.call_soon(self.receive_eof_if_client)
+
+ assert self.protocol.state is State.CLOSING
+
+ # Complete the closing sequence at 1ms intervals so the test can run
+ # at each point even it goes back to the event loop several times.
+ self.loop.call_later(
+ MS, self.receive_frame, Frame(True, OP_CLOSE, close_frame_data))
+ self.loop.call_later(2 * MS, self.receive_eof_if_client)
+
+ # This task must be awaited or cancelled by the caller.
+ return close_task
def half_close_connection_remote(self, code=1000, reason='close'):
"""
- Receive a closing handshake.
+ Receive a closing handshake but do not complete it.
The main difference with `close_connection` is that the connection is
left in the CLOSING state until the event loop runs again.
"""
+ # On the server side, websockets completes the closing handshake and
+ # closes the TCP connection immediately. Yield to the event loop after
+ # sending the close frame to run the test while the connection is in
+ # the CLOSING state.
+ if not self.protocol.is_client:
+ self.make_drain_slow()
+
close_frame_data = serialize_close(code, reason)
- # Trigger the closing handshake from the remote side.
+ # Trigger the closing handshake from the remote endpoint.
self.receive_frame(Frame(True, OP_CLOSE, close_frame_data))
- self.receive_eof_if_client()
+ self.run_loop_once() # read_frame executes
+ # Empty the outgoing data stream so we can make assertions later on.
+ self.assertOneFrameSent(True, OP_CLOSE, close_frame_data)
+
+ assert self.protocol.state is State.CLOSING
+
+ # Complete the closing sequence at 1ms intervals so the test can run
+ # at each point even it goes back to the event loop several times.
+ self.loop.call_later(2 * MS, self.receive_eof_if_client)
def process_invalid_frames(self):
"""
@@ -335,11 +362,13 @@ class CommonTests:
self.assertEqual(data, b'tea')
def test_recv_on_closing_connection_local(self):
- self.half_close_connection_local()
+ close_task = self.half_close_connection_local()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.recv())
+ self.loop.run_until_complete(close_task) # cleanup
+
def test_recv_on_closing_connection_remote(self):
self.half_close_connection_remote()
@@ -421,24 +450,29 @@ class CommonTests:
self.assertNoFrameSent()
def test_send_on_closing_connection_local(self):
- self.half_close_connection_local()
+ close_task = self.half_close_connection_local()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.send('foobar'))
+
self.assertNoFrameSent()
+ self.loop.run_until_complete(close_task) # cleanup
+
def test_send_on_closing_connection_remote(self):
self.half_close_connection_remote()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.send('foobar'))
- self.assertOneFrameSent(True, OP_CLOSE, serialize_close(1000, 'close'))
+
+ self.assertNoFrameSent()
def test_send_on_closed_connection(self):
self.close_connection()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.send('foobar'))
+
self.assertNoFrameSent()
# Test the ping coroutine.
@@ -466,24 +500,29 @@ class CommonTests:
self.assertNoFrameSent()
def test_ping_on_closing_connection_local(self):
- self.half_close_connection_local()
+ close_task = self.half_close_connection_local()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.ping())
+
self.assertNoFrameSent()
+ self.loop.run_until_complete(close_task) # cleanup
+
def test_ping_on_closing_connection_remote(self):
self.half_close_connection_remote()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.ping())
- self.assertOneFrameSent(True, OP_CLOSE, serialize_close(1000, 'close'))
+
+ self.assertNoFrameSent()
def test_ping_on_closed_connection(self):
self.close_connection()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.ping())
+
self.assertNoFrameSent()
# Test the pong coroutine.
@@ -506,24 +545,29 @@ class CommonTests:
self.assertNoFrameSent()
def test_pong_on_closing_connection_local(self):
- self.half_close_connection_local()
+ close_task = self.half_close_connection_local()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.pong())
+
self.assertNoFrameSent()
+ self.loop.run_until_complete(close_task) # cleanup
+
def test_pong_on_closing_connection_remote(self):
self.half_close_connection_remote()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.pong())
- self.assertOneFrameSent(True, OP_CLOSE, serialize_close(1000, 'close'))
+
+ self.assertNoFrameSent()
def test_pong_on_closed_connection(self):
self.close_connection()
with self.assertRaises(ConnectionClosed):
self.loop.run_until_complete(self.protocol.pong())
+
self.assertNoFrameSent()
# Test the protocol's logic for acknowledging pings with pongs.
--
2.17.0
|