reactor: Unify handling of fd events

The SelectReactor has a different event dispatch system from the
PollReactor and EPollReactor.  However, in practice the PollReactor
code is always used, so there is no reason to maintain a different
implementation for SelectReactor.  Rework the code so that
SelectReactor file dispatch handling is done the same way as
PollReactor (and EPollReactor).  This simplfiies the code.

Introduce a new _check_fds() method that is shared between Reactor
implementations.

Also, fix some cut-and-paste bugs in SelectReactor and EPollReactor.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-09-05 13:34:48 -04:00
parent a89694ac68
commit bb88985b8d

View File

@@ -55,8 +55,6 @@ class ReactorFileHandler:
self.fd = fd
self.read_callback = read_callback
self.write_callback = write_callback
def fileno(self):
return self.fd
class ReactorGreenlet(greenlet.greenlet):
def __init__(self, run):
@@ -109,8 +107,13 @@ class SelectReactor:
self._pipe_fds = None
self._async_queue = queue.Queue()
# File descriptors
self._dummy_fd_hdl = ReactorFileHandler(-1, (lambda e: None),
(lambda e: None))
self._fds = {}
self._read_fds = []
self._write_fds = []
self._READ = 1
self._WRITE = 2
# Greenlets
self._g_dispatch = None
self._greenlets = []
@@ -245,48 +248,54 @@ class SelectReactor:
# File descriptors
def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, read_callback, write_callback)
self._fds[fd] = file_handler
self.set_fd_wake(file_handler, True, False)
return file_handler
def unregister_fd(self, file_handler):
if file_handler in self._read_fds:
self._read_fds.pop(self._read_fds.index(file_handler))
if file_handler in self._write_fds:
self._write_fds.pop(self._write_fds.index(file_handler))
self.set_fd_wake(file_handler, False, False)
del self._fds[file_handler.fd]
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
if file_handler in self._read_fds:
fd = file_handler.fd
if fd in self._read_fds:
if not is_readable:
self._read_fds.pop(self._read_fds.index(file_handler))
self._read_fds.remove(fd)
elif is_readable:
self._read_fds.append(file_handler)
if file_handler in self._write_fds:
self._read_fds.append(fd)
if fd in self._write_fds:
if not is_writeable:
self._write_fds.pop(self._write_fds.index(file_handler))
self._write_fds.remove(fd)
elif is_writeable:
self._write_fds.append(file_handler)
self._write_fds.append(fd)
def _check_fds(self, eventtime, hdls):
g_dispatch = self._g_dispatch
for fd, event in hdls:
hdl = self._fds.get(fd, self._dummy_fd_hdl)
if event & self._READ:
hdl.read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
return self.monotonic()
if event & self._WRITE:
hdl.write_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
return self.monotonic()
return eventtime
# Main loop
def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent()
self._g_dispatch = greenlet.getcurrent()
busy = True
eventtime = self.monotonic()
while self._process:
timeout = self._check_timers(eventtime, busy)
busy = False
res = select.select(self._read_fds, self.write_fds, [], timeout)
res = select.select(self._read_fds, self._write_fds, [], timeout)
eventtime = self.monotonic()
for fd in res[0]:
if res[0] or res[1]:
busy = True
fd.read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
for fd in res[1]:
busy = True
fd.write_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
hdls = ([(fd, self._READ) for fd in res[0]]
+ [(fd, self._WRITE) for fd in res[1]])
eventtime = self._check_fds(eventtime, hdls)
self._g_dispatch = None
def run(self):
if self._pipe_fds is None:
@@ -315,30 +324,27 @@ class PollReactor(SelectReactor):
def __init__(self, gc_checking=False):
SelectReactor.__init__(self, gc_checking)
self._poll = select.poll()
self._fds = {}
self._READ = select.POLLIN | select.POLLHUP
self._WRITE = select.POLLOUT
# File descriptors
def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, read_callback, write_callback)
fds = self._fds.copy()
fds[fd] = file_handler
self._fds = fds
self._poll.register(file_handler, select.POLLIN | select.POLLHUP)
self._fds[fd] = file_handler
self._poll.register(file_handler.fd, select.POLLIN | select.POLLHUP)
return file_handler
def unregister_fd(self, file_handler):
self._poll.unregister(file_handler)
fds = self._fds.copy()
del fds[file_handler.fd]
self._fds = fds
self._poll.unregister(file_handler.fd)
del self._fds[file_handler.fd]
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
flags = select.POLLHUP
if is_readable:
flags |= select.POLLIN
if is_writeable:
flags |= select.POLLOUT
self._poll.modify(file_handler, flags)
self._poll.modify(file_handler.fd, flags)
# Main loop
def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent()
self._g_dispatch = greenlet.getcurrent()
busy = True
eventtime = self.monotonic()
while self._process:
@@ -346,50 +352,36 @@ class PollReactor(SelectReactor):
busy = False
res = self._poll.poll(int(math.ceil(timeout * 1000.)))
eventtime = self.monotonic()
for fd, event in res:
if res:
busy = True
if event & (select.POLLIN | select.POLLHUP):
self._fds[fd].read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
if event & select.POLLOUT:
self._fds[fd].write_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
eventtime = self._check_fds(eventtime, res)
self._g_dispatch = None
class EPollReactor(SelectReactor):
def __init__(self, gc_checking=False):
SelectReactor.__init__(self, gc_checking)
self._epoll = select.epoll()
self._fds = {}
self._READ = select.EPOLLIN | select.EPOLLHUP
self._WRITE = select.EPOLLOUT
# File descriptors
def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, read_callback, write_callback)
fds = self._fds.copy()
fds[fd] = read_callback
self._fds = fds
self._fds[fd] = file_handler
self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP)
return file_handler
def unregister_fd(self, file_handler):
self._epoll.unregister(file_handler.fd)
fds = self._fds.copy()
del fds[file_handler.fd]
self._fds = fds
del self._fds[file_handler.fd]
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
flags = select.POLLHUP
flags = select.EPOLLHUP
if is_readable:
flags |= select.EPOLLIN
if is_writeable:
flags |= select.EPOLLOUT
self._epoll.modify(file_handler, flags)
self._epoll.modify(file_handler.fd, flags)
# Main loop
def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent()
self._g_dispatch = greenlet.getcurrent()
busy = True
eventtime = self.monotonic()
while self._process:
@@ -397,20 +389,9 @@ class EPollReactor(SelectReactor):
busy = False
res = self._epoll.poll(timeout)
eventtime = self.monotonic()
for fd, event in res:
if res:
busy = True
if event & (select.EPOLLIN | select.EPOLLHUP):
self._fds[fd].read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
if event & select.EPOLLOUT:
self._fds[fd].write_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
eventtime = self._check_fds(eventtime, res)
self._g_dispatch = None
# Use the poll based reactor if it is available