Viewing file: test_poller.py (16.3 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
import sys import unittest import errno import select from supervisor.tests.base import Mock
from supervisor.poller import SelectPoller, PollPoller, KQueuePoller from supervisor.poller import implements_poll, implements_kqueue from supervisor.tests.base import DummyOptions
# this base class is used instead of unittest.TestCase to hide # a TestCase subclass from test runner when the implementation is # not available SkipTestCase = object
class BasePollerTests(unittest.TestCase): def _makeOne(self, options): from supervisor.poller import BasePoller return BasePoller(options)
def test_register_readable(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.register_readable, None)
def test_register_writable(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.register_writable, None)
def test_unregister_readable(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.unregister_readable, None)
def test_unregister_writable(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.unregister_writable, None)
def test_poll(self): inst = self._makeOne(None) self.assertRaises(NotImplementedError, inst.poll, None)
def test_before_daemonize(self): inst = self._makeOne(None) self.assertEqual(inst.before_daemonize(), None)
def test_after_daemonize(self): inst = self._makeOne(None) self.assertEqual(inst.after_daemonize(), None)
class SelectPollerTests(unittest.TestCase):
def _makeOne(self, options): return SelectPoller(options)
def test_register_readable(self): poller = self._makeOne(DummyOptions()) poller.register_readable(6) poller.register_readable(7) self.assertEqual(sorted(poller.readables), [6,7])
def test_register_writable(self): poller = self._makeOne(DummyOptions()) poller.register_writable(6) poller.register_writable(7) self.assertEqual(sorted(poller.writables), [6,7])
def test_unregister_readable(self): poller = self._makeOne(DummyOptions()) poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) poller.register_writable(9) poller.unregister_readable(6) poller.unregister_readable(9) poller.unregister_readable(100) # not registered, ignore error self.assertEqual(list(poller.readables), [7]) self.assertEqual(list(poller.writables), [8, 9])
def test_unregister_writable(self): poller = self._makeOne(DummyOptions()) poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) poller.register_writable(6) poller.unregister_writable(7) poller.unregister_writable(6) poller.unregister_writable(100) # not registered, ignore error self.assertEqual(list(poller.readables), [6, 7]) self.assertEqual(list(poller.writables), [8])
def test_poll_returns_readables_and_writables(self): _select = DummySelect(result={'readables': [6], 'writables': [8]}) poller = self._makeOne(DummyOptions()) poller._select = _select poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) readables, writables = poller.poll(1) self.assertEqual(readables, [6]) self.assertEqual(writables, [8])
def test_poll_ignores_eintr(self): _select = DummySelect(error=errno.EINTR) options = DummyOptions() poller = self._makeOne(options) poller._select = _select poller.register_readable(6) poller.poll(1) self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
def test_poll_ignores_ebadf(self): _select = DummySelect(error=errno.EBADF) options = DummyOptions() poller = self._makeOne(options) poller._select = _select poller.register_readable(6) poller.poll(1) self.assertEqual(options.logger.data[0], 'EBADF encountered in poll') self.assertEqual(list(poller.readables), []) self.assertEqual(list(poller.writables), [])
def test_poll_uncaught_exception(self): _select = DummySelect(error=errno.EPERM) options = DummyOptions() poller = self._makeOne(options) poller._select = _select poller.register_readable(6) self.assertRaises(select.error, poller.poll, 1)
if implements_kqueue(): KQueuePollerTestsBase = unittest.TestCase else: KQueuePollerTestsBase = SkipTestCase
class KQueuePollerTests(KQueuePollerTestsBase):
def _makeOne(self, options): return KQueuePoller(options)
def test_register_readable(self): kqueue = DummyKQueue() poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_readable(6) self.assertEqual(list(poller.readables), [6]) self.assertEqual(len(kqueue.registered_kevents), 1) self.assertReadEventAdded(kqueue.registered_kevents[0], 6)
def test_register_writable(self): kqueue = DummyKQueue() poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_writable(7) self.assertEqual(list(poller.writables), [7]) self.assertEqual(len(kqueue.registered_kevents), 1) self.assertWriteEventAdded(kqueue.registered_kevents[0], 7)
def test_unregister_readable(self): kqueue = DummyKQueue() poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_writable(7) poller.register_readable(8) poller.unregister_readable(7) poller.unregister_readable(8) poller.unregister_readable(100) # not registered, ignore error self.assertEqual(list(poller.writables), [7]) self.assertEqual(list(poller.readables), []) self.assertWriteEventAdded(kqueue.registered_kevents[0], 7) self.assertReadEventAdded(kqueue.registered_kevents[1], 8) self.assertReadEventDeleted(kqueue.registered_kevents[2], 7) self.assertReadEventDeleted(kqueue.registered_kevents[3], 8)
def test_unregister_writable(self): kqueue = DummyKQueue() poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_writable(7) poller.register_readable(8) poller.unregister_writable(7) poller.unregister_writable(8) poller.unregister_writable(100) # not registered, ignore error self.assertEqual(list(poller.writables), []) self.assertEqual(list(poller.readables), [8]) self.assertWriteEventAdded(kqueue.registered_kevents[0], 7) self.assertReadEventAdded(kqueue.registered_kevents[1], 8) self.assertWriteEventDeleted(kqueue.registered_kevents[2], 7) self.assertWriteEventDeleted(kqueue.registered_kevents[3], 8)
def test_poll_returns_readables_and_writables(self): kqueue = DummyKQueue(result=[(6, select.KQ_FILTER_READ), (7, select.KQ_FILTER_READ), (8, select.KQ_FILTER_WRITE)]) poller = self._makeOne(DummyOptions()) poller._kqueue = kqueue poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) readables, writables = poller.poll(1000) self.assertEqual(readables, [6,7]) self.assertEqual(writables, [8])
def test_poll_ignores_eintr(self): kqueue = DummyKQueue(raise_errno_poll=errno.EINTR) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = kqueue poller.register_readable(6) poller.poll(1000) self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
def test_register_readable_and_writable_ignores_ebadf(self): _kqueue = DummyKQueue(raise_errno_register=errno.EBADF) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = _kqueue poller.register_readable(6) poller.register_writable(7) self.assertEqual(options.logger.data[0], 'EBADF encountered in kqueue. Invalid file descriptor 6') self.assertEqual(options.logger.data[1], 'EBADF encountered in kqueue. Invalid file descriptor 7')
def test_register_uncaught_exception(self): _kqueue = DummyKQueue(raise_errno_register=errno.ENOMEM) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = _kqueue self.assertRaises(OSError, poller.register_readable, 5)
def test_poll_uncaught_exception(self): kqueue = DummyKQueue(raise_errno_poll=errno.EINVAL) options = DummyOptions() poller = self._makeOne(options) poller._kqueue = kqueue poller.register_readable(6) self.assertRaises(OSError, poller.poll, 1000)
def test_before_daemonize_closes_kqueue(self): mock_kqueue = Mock() options = DummyOptions() poller = self._makeOne(options) poller._kqueue = mock_kqueue poller.before_daemonize() mock_kqueue.close.assert_called_once_with() self.assertEqual(poller._kqueue, None)
def test_after_daemonize_restores_kqueue(self): options = DummyOptions() poller = self._makeOne(options) poller.readables = [1] poller.writables = [3] poller.register_readable = Mock() poller.register_writable = Mock() poller.after_daemonize() self.assertTrue(isinstance(poller._kqueue, select.kqueue)) poller.register_readable.assert_called_with(1) poller.register_writable.assert_called_with(3)
def test_close_closes_kqueue(self): mock_kqueue = Mock() options = DummyOptions() poller = self._makeOne(options) poller._kqueue = mock_kqueue poller.close() mock_kqueue.close.assert_called_once_with() self.assertEqual(poller._kqueue, None)
def assertReadEventAdded(self, kevent, fd): self.assertKevent(kevent, fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
def assertWriteEventAdded(self, kevent, fd): self.assertKevent(kevent, fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
def assertReadEventDeleted(self, kevent, fd): self.assertKevent(kevent, fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
def assertWriteEventDeleted(self, kevent, fd): self.assertKevent(kevent, fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
def assertKevent(self, kevent, ident, filter, flags): self.assertEqual(kevent.ident, ident) self.assertEqual(kevent.filter, filter) self.assertEqual(kevent.flags, flags)
if implements_poll(): PollerPollTestsBase = unittest.TestCase else: PollerPollTestsBase = SkipTestCase
class PollerPollTests(PollerPollTestsBase):
def _makeOne(self, options): return PollPoller(options)
def test_register_readable(self): select_poll = DummySelectPoll() poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) poller.register_readable(7) self.assertEqual(select_poll.registered_as_readable, [6,7])
def test_register_writable(self): select_poll = DummySelectPoll() poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_writable(6) self.assertEqual(select_poll.registered_as_writable, [6])
def test_poll_returns_readables_and_writables(self): select_poll = DummySelectPoll(result=[(6, select.POLLIN), (7, select.POLLPRI), (8, select.POLLOUT), (9, select.POLLHUP)]) poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) poller.register_readable(7) poller.register_writable(8) poller.register_readable(9) readables, writables = poller.poll(1000) self.assertEqual(readables, [6,7,9]) self.assertEqual(writables, [8])
def test_poll_ignores_eintr(self): select_poll = DummySelectPoll(error=errno.EINTR) options = DummyOptions() poller = self._makeOne(options) poller._poller = select_poll poller.register_readable(9) poller.poll(1000) self.assertEqual(options.logger.data[0], 'EINTR encountered in poll')
def test_poll_uncaught_exception(self): select_poll = DummySelectPoll(error=errno.EBADF) options = DummyOptions() poller = self._makeOne(options) poller._poller = select_poll poller.register_readable(9) self.assertRaises(select.error, poller.poll, 1000)
def test_poll_ignores_and_unregisters_closed_fd(self): select_poll = DummySelectPoll(result=[(6, select.POLLNVAL), (7, select.POLLPRI)]) poller = self._makeOne(DummyOptions()) poller._poller = select_poll poller.register_readable(6) poller.register_readable(7) readables, writables = poller.poll(1000) self.assertEqual(readables, [7]) self.assertEqual(select_poll.unregistered, [6])
class DummySelect(object): ''' Fake implementation of select.select() ''' def __init__(self, result=None, error=None): result = result or {} self.readables = result.get('readables', []) self.writables = result.get('writables', []) self.error = error
def select(self, r, w, x, timeout): if self.error: raise select.error(self.error) return self.readables, self.writables, []
class DummySelectPoll(object): ''' Fake implementation of select.poll() ''' def __init__(self, result=None, error=None): self.result = result or [] self.error = error self.registered_as_readable = [] self.registered_as_writable = [] self.unregistered = []
def register(self, fd, eventmask): if eventmask == select.POLLIN | select.POLLPRI | select.POLLHUP: self.registered_as_readable.append(fd) elif eventmask == select.POLLOUT: self.registered_as_writable.append(fd) else: raise ValueError("Registered a fd on unknown eventmask: '{0}'".format(eventmask))
def unregister(self, fd): self.unregistered.append(fd)
def poll(self, timeout): if self.error: raise select.error(self.error) return self.result
class DummyKQueue(object): ''' Fake implementation of select.kqueue() ''' def __init__(self, result=None, raise_errno_poll=None, raise_errno_register=None): self.result = result or [] self.errno_poll = raise_errno_poll self.errno_register = raise_errno_register self.registered_kevents = [] self.registered_flags = []
def control(self, kevents, max_events, timeout=None): if kevents is None: # being called on poll() self.assert_max_events_on_poll(max_events) self.raise_error(self.errno_poll) return self.build_result()
self.assert_max_events_on_register(max_events) self.raise_error(self.errno_register) self.registered_kevents.extend(kevents)
def raise_error(self, err): if not err: return ex = OSError() ex.errno = err raise ex
def build_result(self): return [FakeKEvent(ident, filter) for ident,filter in self.result]
def assert_max_events_on_poll(self, max_events): assert max_events == KQueuePoller.max_events, ( "`max_events` parameter of `kqueue.control() should be %d" % KQueuePoller.max_events)
def assert_max_events_on_register(self, max_events): assert max_events == 0, ( "`max_events` parameter of `kqueue.control()` should be 0 on register")
class FakeKEvent(object): def __init__(self, ident, filter): self.ident = ident self.filter = filter
def test_suite(): return unittest.findTestCases(sys.modules[__name__])
if __name__ == '__main__': unittest.main(defaultTest='test_suite')
|