diff options
Diffstat (limited to 'lldb/third_party/Python/module/pexpect-4.6/tests/test_socket.py')
-rw-r--r-- | lldb/third_party/Python/module/pexpect-4.6/tests/test_socket.py | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/lldb/third_party/Python/module/pexpect-4.6/tests/test_socket.py b/lldb/third_party/Python/module/pexpect-4.6/tests/test_socket.py new file mode 100644 index 00000000000..21648f4a0f6 --- /dev/null +++ b/lldb/third_party/Python/module/pexpect-4.6/tests/test_socket.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +''' +PEXPECT LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier <noah@noah.org> + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' +import pexpect +from pexpect import fdpexpect +import unittest +from . import PexpectTestCase +import multiprocessing +import os +import signal +import socket +import time +import errno + + +class SocketServerError(Exception): + pass + + +class ExpectTestCase(PexpectTestCase.PexpectTestCase): + + def setUp(self): + print(self.id()) + PexpectTestCase.PexpectTestCase.setUp(self) + self.host = '127.0.0.1' + self.port = 49152 + 10000 + self.motd = b"""\ +------------------------------------------------------------------------------ +* Welcome to the SOCKET UNIT TEST code! * +------------------------------------------------------------------------------ +* * +* This unit test code is our best effort at testing the ability of the * +* pexpect library to handle sockets. We need some text to test buffer size * +* handling. * +* * +* A page is 1024 bytes or 1K. 80 x 24 = 1920. So a standard terminal window * +* contains more than one page. We actually want more than a page for our * +* tests. * +* * +* This is the twelfth line, and we need 24. So we need a few more paragraphs.* +* We can keep them short and just put lines between them. * +* * +* The 80 x 24 terminal size comes from the ancient past when computers were * +* only able to display text in cuneiform writing. * +* * +* The cunieform writing system used the edge of a reed to make marks on clay * +* tablets. * +* * +* It was the forerunner of the style of handwriting used by doctors to write * +* prescriptions. Thus the name: pre (before) script (writing) ion (charged * +* particle). * +------------------------------------------------------------------------------ +""".replace(b'\n', b'\n\r') + b"\r\n" + self.prompt1 = b'Press Return to continue:' + self.prompt2 = b'Rate this unit test>' + self.prompt3 = b'Press X to exit:' + self.enter = b'\r\n' + self.exit = b'X\r\n' + self.server_up = multiprocessing.Event() + self.server_process = multiprocessing.Process(target=self.socket_server, args=(self.server_up,)) + self.server_process.daemon = True + self.server_process.start() + counter = 0 + while not self.server_up.is_set(): + time.sleep(0.250) + counter += 1 + if counter > (10 / 0.250): + raise SocketServerError("Could not start socket server") + + def tearDown(self): + os.kill(self.server_process.pid, signal.SIGINT) + self.server_process.join(timeout=5.0) + PexpectTestCase.PexpectTestCase.tearDown(self) + + def socket_server(self, server_up): + sock = None + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((self.host, self.port)) + sock.listen(5) + server_up.set() + while True: + (conn, addr) = sock.accept() + conn.send(self.motd) + conn.send(self.prompt1) + result = conn.recv(1024) + if result != self.enter: + break + conn.send(self.prompt2) + result = conn.recv(1024) + if result != self.enter: + break + conn.send(self.prompt3) + result = conn.recv(1024) + if result.startswith(self.exit[0]): + conn.shutdown(socket.SHUT_RDWR) + conn.close() + except KeyboardInterrupt: + pass + if sock is not None: + try: + sock.shutdown(socket.SHUT_RDWR) + sock.close() + except socket.error: + pass + exit(0) + + def socket_fn(self, timed_out, all_read): + result = 0 + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock, timeout=10) + # Get all data from server + session.read_nonblocking(size=4096) + all_read.set() + # This read should timeout + session.read_nonblocking(size=4096) + except pexpect.TIMEOUT: + timed_out.set() + result = errno.ETIMEDOUT + exit(result) + + def test_socket(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10) + session.expect(self.prompt1) + self.assertEqual(session.before, self.motd) + session.send(self.enter) + session.expect(self.prompt2) + session.send(self.enter) + session.expect(self.prompt3) + session.send(self.exit) + session.expect(pexpect.EOF) + self.assertEqual(session.before, b'') + + def test_socket_with_write(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10) + session.expect(self.prompt1) + self.assertEqual(session.before, self.motd) + session.write(self.enter) + session.expect(self.prompt2) + session.write(self.enter) + session.expect(self.prompt3) + session.write(self.exit) + session.expect(pexpect.EOF) + self.assertEqual(session.before, b'') + + def test_not_int(self): + with self.assertRaises(pexpect.ExceptionPexpect): + session = fdpexpect.fdspawn('bogus', timeout=10) + + def test_not_file_descriptor(self): + with self.assertRaises(pexpect.ExceptionPexpect): + session = fdpexpect.fdspawn(-1, timeout=10) + + def test_timeout(self): + with self.assertRaises(pexpect.TIMEOUT): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock, timeout=10) + session.expect(b'Bogus response') + + def test_interrupt(self): + timed_out = multiprocessing.Event() + all_read = multiprocessing.Event() + test_proc = multiprocessing.Process(target=self.socket_fn, args=(timed_out, all_read)) + test_proc.daemon = True + test_proc.start() + while not all_read.is_set(): + time.sleep(1.0) + os.kill(test_proc.pid, signal.SIGWINCH) + while not timed_out.is_set(): + time.sleep(1.0) + test_proc.join(timeout=5.0) + self.assertEqual(test_proc.exitcode, errno.ETIMEDOUT) + + def test_multiple_interrupts(self): + timed_out = multiprocessing.Event() + all_read = multiprocessing.Event() + test_proc = multiprocessing.Process(target=self.socket_fn, args=(timed_out, all_read)) + test_proc.daemon = True + test_proc.start() + while not all_read.is_set(): + time.sleep(1.0) + while not timed_out.is_set(): + os.kill(test_proc.pid, signal.SIGWINCH) + time.sleep(1.0) + test_proc.join(timeout=5.0) + self.assertEqual(test_proc.exitcode, errno.ETIMEDOUT) + + def test_maxread(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10) + session.maxread = 1100 + session.expect(self.prompt1) + self.assertEqual(session.before, self.motd) + session.send(self.enter) + session.expect(self.prompt2) + session.send(self.enter) + session.expect(self.prompt3) + session.send(self.exit) + session.expect(pexpect.EOF) + self.assertEqual(session.before, b'') + + def test_fd_isalive(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10) + assert session.isalive() + sock.close() + assert not session.isalive(), "Should not be alive after close()" + + def test_fd_isalive_poll(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True) + assert session.isalive() + sock.close() + assert not session.isalive(), "Should not be alive after close()" + + def test_fd_isatty(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10) + assert not session.isatty() + session.close() + + def test_fd_isatty_poll(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock.fileno(), timeout=10, use_poll=True) + assert not session.isatty() + session.close() + + def test_fileobj(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host, self.port)) + session = fdpexpect.fdspawn(sock, timeout=10) # Should get the fileno from the socket + session.expect(self.prompt1) + session.close() + assert not session.isalive() + session.close() # Smoketest - should be able to call this again + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(ExpectTestCase, 'test') |