123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 |
- # -*- coding: utf-8 -
- #
- # This file is part of gunicorn released under the MIT license.
- # See the NOTICE for more information.
- # design:
- # A threaded worker accepts connections in the main loop, accepted
- # connections are added to the thread pool as a connection job.
- # Keepalive connections are put back in the loop waiting for an event.
- # If no event happen after the keep alive timeout, the connection is
- # closed.
- # pylint: disable=no-else-break
- import concurrent.futures as futures
- import errno
- import os
- import selectors
- import socket
- import ssl
- import sys
- import time
- from collections import deque
- from datetime import datetime
- from functools import partial
- from threading import RLock
- from . import base
- from .. import http
- from .. import util
- from ..http import wsgi
- class TConn(object):
- def __init__(self, cfg, sock, client, server):
- self.cfg = cfg
- self.sock = sock
- self.client = client
- self.server = server
- self.timeout = None
- self.parser = None
- # set the socket to non blocking
- self.sock.setblocking(False)
- def init(self):
- self.sock.setblocking(True)
- if self.parser is None:
- # wrap the socket if needed
- if self.cfg.is_ssl:
- self.sock = ssl.wrap_socket(self.sock, server_side=True,
- **self.cfg.ssl_options)
- # initialize the parser
- self.parser = http.RequestParser(self.cfg, self.sock, self.client)
- def set_timeout(self):
- # set the timeout
- self.timeout = time.time() + self.cfg.keepalive
- def close(self):
- util.close(self.sock)
- class ThreadWorker(base.Worker):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.worker_connections = self.cfg.worker_connections
- self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
- # initialise the pool
- self.tpool = None
- self.poller = None
- self._lock = None
- self.futures = deque()
- self._keep = deque()
- self.nr_conns = 0
- @classmethod
- def check_config(cls, cfg, log):
- max_keepalived = cfg.worker_connections - cfg.threads
- if max_keepalived <= 0 and cfg.keepalive:
- log.warning("No keepalived connections can be handled. " +
- "Check the number of worker connections and threads.")
- def init_process(self):
- self.tpool = self.get_thread_pool()
- self.poller = selectors.DefaultSelector()
- self._lock = RLock()
- super().init_process()
- def get_thread_pool(self):
- """Override this method to customize how the thread pool is created"""
- return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
- def handle_quit(self, sig, frame):
- self.alive = False
- # worker_int callback
- self.cfg.worker_int(self)
- self.tpool.shutdown(False)
- time.sleep(0.1)
- sys.exit(0)
- def _wrap_future(self, fs, conn):
- fs.conn = conn
- self.futures.append(fs)
- fs.add_done_callback(self.finish_request)
- def enqueue_req(self, conn):
- conn.init()
- # submit the connection to a worker
- fs = self.tpool.submit(self.handle, conn)
- self._wrap_future(fs, conn)
- def accept(self, server, listener):
- try:
- sock, client = listener.accept()
- # initialize the connection object
- conn = TConn(self.cfg, sock, client, server)
- self.nr_conns += 1
- # enqueue the job
- self.enqueue_req(conn)
- except EnvironmentError as e:
- if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
- errno.EWOULDBLOCK):
- raise
- def reuse_connection(self, conn, client):
- with self._lock:
- # unregister the client from the poller
- self.poller.unregister(client)
- # remove the connection from keepalive
- try:
- self._keep.remove(conn)
- except ValueError:
- # race condition
- return
- # submit the connection to a worker
- self.enqueue_req(conn)
- def murder_keepalived(self):
- now = time.time()
- while True:
- with self._lock:
- try:
- # remove the connection from the queue
- conn = self._keep.popleft()
- except IndexError:
- break
- delta = conn.timeout - now
- if delta > 0:
- # add the connection back to the queue
- with self._lock:
- self._keep.appendleft(conn)
- break
- else:
- self.nr_conns -= 1
- # remove the socket from the poller
- with self._lock:
- try:
- self.poller.unregister(conn.sock)
- except EnvironmentError as e:
- if e.errno != errno.EBADF:
- raise
- except KeyError:
- # already removed by the system, continue
- pass
- # close the socket
- conn.close()
- def is_parent_alive(self):
- # If our parent changed then we shut down.
- if self.ppid != os.getppid():
- self.log.info("Parent changed, shutting down: %s", self)
- return False
- return True
- def run(self):
- # init listeners, add them to the event loop
- for sock in self.sockets:
- sock.setblocking(False)
- # a race condition during graceful shutdown may make the listener
- # name unavailable in the request handler so capture it once here
- server = sock.getsockname()
- acceptor = partial(self.accept, server)
- self.poller.register(sock, selectors.EVENT_READ, acceptor)
- while self.alive:
- # notify the arbiter we are alive
- self.notify()
- # can we accept more connections?
- if self.nr_conns < self.worker_connections:
- # wait for an event
- events = self.poller.select(1.0)
- for key, _ in events:
- callback = key.data
- callback(key.fileobj)
- # check (but do not wait) for finished requests
- result = futures.wait(self.futures, timeout=0,
- return_when=futures.FIRST_COMPLETED)
- else:
- # wait for a request to finish
- result = futures.wait(self.futures, timeout=1.0,
- return_when=futures.FIRST_COMPLETED)
- # clean up finished requests
- for fut in result.done:
- self.futures.remove(fut)
- if not self.is_parent_alive():
- break
- # handle keepalive timeouts
- self.murder_keepalived()
- self.tpool.shutdown(False)
- self.poller.close()
- for s in self.sockets:
- s.close()
- futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
- def finish_request(self, fs):
- if fs.cancelled():
- self.nr_conns -= 1
- fs.conn.close()
- return
- try:
- (keepalive, conn) = fs.result()
- # if the connection should be kept alived add it
- # to the eventloop and record it
- if keepalive and self.alive:
- # flag the socket as non blocked
- conn.sock.setblocking(False)
- # register the connection
- conn.set_timeout()
- with self._lock:
- self._keep.append(conn)
- # add the socket to the event loop
- self.poller.register(conn.sock, selectors.EVENT_READ,
- partial(self.reuse_connection, conn))
- else:
- self.nr_conns -= 1
- conn.close()
- except Exception:
- # an exception happened, make sure to close the
- # socket.
- self.nr_conns -= 1
- fs.conn.close()
- def handle(self, conn):
- keepalive = False
- req = None
- try:
- req = next(conn.parser)
- if not req:
- return (False, conn)
- # handle the request
- keepalive = self.handle_request(req, conn)
- if keepalive:
- return (keepalive, conn)
- except http.errors.NoMoreData as e:
- self.log.debug("Ignored premature client disconnection. %s", e)
- except StopIteration as e:
- self.log.debug("Closing connection. %s", e)
- except ssl.SSLError as e:
- if e.args[0] == ssl.SSL_ERROR_EOF:
- self.log.debug("ssl connection closed")
- conn.sock.close()
- else:
- self.log.debug("Error processing SSL request.")
- self.handle_error(req, conn.sock, conn.client, e)
- except EnvironmentError as e:
- if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
- self.log.exception("Socket error processing request.")
- else:
- if e.errno == errno.ECONNRESET:
- self.log.debug("Ignoring connection reset")
- elif e.errno == errno.ENOTCONN:
- self.log.debug("Ignoring socket not connected")
- else:
- self.log.debug("Ignoring connection epipe")
- except Exception as e:
- self.handle_error(req, conn.sock, conn.client, e)
- return (False, conn)
- def handle_request(self, req, conn):
- environ = {}
- resp = None
- try:
- self.cfg.pre_request(self, req)
- request_start = datetime.now()
- resp, environ = wsgi.create(req, conn.sock, conn.client,
- conn.server, self.cfg)
- environ["wsgi.multithread"] = True
- self.nr += 1
- if self.nr >= self.max_requests:
- if self.alive:
- self.log.info("Autorestarting worker after current request.")
- self.alive = False
- resp.force_close()
- if not self.alive or not self.cfg.keepalive:
- resp.force_close()
- elif len(self._keep) >= self.max_keepalived:
- resp.force_close()
- respiter = self.wsgi(environ, resp.start_response)
- try:
- if isinstance(respiter, environ['wsgi.file_wrapper']):
- resp.write_file(respiter)
- else:
- for item in respiter:
- resp.write(item)
- resp.close()
- request_time = datetime.now() - request_start
- self.log.access(resp, req, environ, request_time)
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- if resp.should_close():
- self.log.debug("Closing connection.")
- return False
- except EnvironmentError:
- # pass to next try-except level
- util.reraise(*sys.exc_info())
- except Exception:
- if resp and resp.headers_sent:
- # If the requests have already been sent, we should close the
- # connection to indicate the error.
- self.log.exception("Error handling request")
- try:
- conn.sock.shutdown(socket.SHUT_RDWR)
- conn.sock.close()
- except EnvironmentError:
- pass
- raise StopIteration()
- raise
- finally:
- try:
- self.cfg.post_request(self, req, environ, resp)
- except Exception:
- self.log.exception("Exception in post_request hook")
- return True
|