gthread.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. # design:
  6. # A threaded worker accepts connections in the main loop, accepted
  7. # connections are added to the thread pool as a connection job.
  8. # Keepalive connections are put back in the loop waiting for an event.
  9. # If no event happen after the keep alive timeout, the connection is
  10. # closed.
  11. # pylint: disable=no-else-break
  12. import concurrent.futures as futures
  13. import errno
  14. import os
  15. import selectors
  16. import socket
  17. import ssl
  18. import sys
  19. import time
  20. from collections import deque
  21. from datetime import datetime
  22. from functools import partial
  23. from threading import RLock
  24. from . import base
  25. from .. import http
  26. from .. import util
  27. from ..http import wsgi
  28. class TConn(object):
  29. def __init__(self, cfg, sock, client, server):
  30. self.cfg = cfg
  31. self.sock = sock
  32. self.client = client
  33. self.server = server
  34. self.timeout = None
  35. self.parser = None
  36. # set the socket to non blocking
  37. self.sock.setblocking(False)
  38. def init(self):
  39. self.sock.setblocking(True)
  40. if self.parser is None:
  41. # wrap the socket if needed
  42. if self.cfg.is_ssl:
  43. self.sock = ssl.wrap_socket(self.sock, server_side=True,
  44. **self.cfg.ssl_options)
  45. # initialize the parser
  46. self.parser = http.RequestParser(self.cfg, self.sock, self.client)
  47. def set_timeout(self):
  48. # set the timeout
  49. self.timeout = time.time() + self.cfg.keepalive
  50. def close(self):
  51. util.close(self.sock)
  52. class ThreadWorker(base.Worker):
  53. def __init__(self, *args, **kwargs):
  54. super().__init__(*args, **kwargs)
  55. self.worker_connections = self.cfg.worker_connections
  56. self.max_keepalived = self.cfg.worker_connections - self.cfg.threads
  57. # initialise the pool
  58. self.tpool = None
  59. self.poller = None
  60. self._lock = None
  61. self.futures = deque()
  62. self._keep = deque()
  63. self.nr_conns = 0
  64. @classmethod
  65. def check_config(cls, cfg, log):
  66. max_keepalived = cfg.worker_connections - cfg.threads
  67. if max_keepalived <= 0 and cfg.keepalive:
  68. log.warning("No keepalived connections can be handled. " +
  69. "Check the number of worker connections and threads.")
  70. def init_process(self):
  71. self.tpool = self.get_thread_pool()
  72. self.poller = selectors.DefaultSelector()
  73. self._lock = RLock()
  74. super().init_process()
  75. def get_thread_pool(self):
  76. """Override this method to customize how the thread pool is created"""
  77. return futures.ThreadPoolExecutor(max_workers=self.cfg.threads)
  78. def handle_quit(self, sig, frame):
  79. self.alive = False
  80. # worker_int callback
  81. self.cfg.worker_int(self)
  82. self.tpool.shutdown(False)
  83. time.sleep(0.1)
  84. sys.exit(0)
  85. def _wrap_future(self, fs, conn):
  86. fs.conn = conn
  87. self.futures.append(fs)
  88. fs.add_done_callback(self.finish_request)
  89. def enqueue_req(self, conn):
  90. conn.init()
  91. # submit the connection to a worker
  92. fs = self.tpool.submit(self.handle, conn)
  93. self._wrap_future(fs, conn)
  94. def accept(self, server, listener):
  95. try:
  96. sock, client = listener.accept()
  97. # initialize the connection object
  98. conn = TConn(self.cfg, sock, client, server)
  99. self.nr_conns += 1
  100. # enqueue the job
  101. self.enqueue_req(conn)
  102. except EnvironmentError as e:
  103. if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
  104. errno.EWOULDBLOCK):
  105. raise
  106. def reuse_connection(self, conn, client):
  107. with self._lock:
  108. # unregister the client from the poller
  109. self.poller.unregister(client)
  110. # remove the connection from keepalive
  111. try:
  112. self._keep.remove(conn)
  113. except ValueError:
  114. # race condition
  115. return
  116. # submit the connection to a worker
  117. self.enqueue_req(conn)
  118. def murder_keepalived(self):
  119. now = time.time()
  120. while True:
  121. with self._lock:
  122. try:
  123. # remove the connection from the queue
  124. conn = self._keep.popleft()
  125. except IndexError:
  126. break
  127. delta = conn.timeout - now
  128. if delta > 0:
  129. # add the connection back to the queue
  130. with self._lock:
  131. self._keep.appendleft(conn)
  132. break
  133. else:
  134. self.nr_conns -= 1
  135. # remove the socket from the poller
  136. with self._lock:
  137. try:
  138. self.poller.unregister(conn.sock)
  139. except EnvironmentError as e:
  140. if e.errno != errno.EBADF:
  141. raise
  142. except KeyError:
  143. # already removed by the system, continue
  144. pass
  145. # close the socket
  146. conn.close()
  147. def is_parent_alive(self):
  148. # If our parent changed then we shut down.
  149. if self.ppid != os.getppid():
  150. self.log.info("Parent changed, shutting down: %s", self)
  151. return False
  152. return True
  153. def run(self):
  154. # init listeners, add them to the event loop
  155. for sock in self.sockets:
  156. sock.setblocking(False)
  157. # a race condition during graceful shutdown may make the listener
  158. # name unavailable in the request handler so capture it once here
  159. server = sock.getsockname()
  160. acceptor = partial(self.accept, server)
  161. self.poller.register(sock, selectors.EVENT_READ, acceptor)
  162. while self.alive:
  163. # notify the arbiter we are alive
  164. self.notify()
  165. # can we accept more connections?
  166. if self.nr_conns < self.worker_connections:
  167. # wait for an event
  168. events = self.poller.select(1.0)
  169. for key, _ in events:
  170. callback = key.data
  171. callback(key.fileobj)
  172. # check (but do not wait) for finished requests
  173. result = futures.wait(self.futures, timeout=0,
  174. return_when=futures.FIRST_COMPLETED)
  175. else:
  176. # wait for a request to finish
  177. result = futures.wait(self.futures, timeout=1.0,
  178. return_when=futures.FIRST_COMPLETED)
  179. # clean up finished requests
  180. for fut in result.done:
  181. self.futures.remove(fut)
  182. if not self.is_parent_alive():
  183. break
  184. # handle keepalive timeouts
  185. self.murder_keepalived()
  186. self.tpool.shutdown(False)
  187. self.poller.close()
  188. for s in self.sockets:
  189. s.close()
  190. futures.wait(self.futures, timeout=self.cfg.graceful_timeout)
  191. def finish_request(self, fs):
  192. if fs.cancelled():
  193. self.nr_conns -= 1
  194. fs.conn.close()
  195. return
  196. try:
  197. (keepalive, conn) = fs.result()
  198. # if the connection should be kept alived add it
  199. # to the eventloop and record it
  200. if keepalive and self.alive:
  201. # flag the socket as non blocked
  202. conn.sock.setblocking(False)
  203. # register the connection
  204. conn.set_timeout()
  205. with self._lock:
  206. self._keep.append(conn)
  207. # add the socket to the event loop
  208. self.poller.register(conn.sock, selectors.EVENT_READ,
  209. partial(self.reuse_connection, conn))
  210. else:
  211. self.nr_conns -= 1
  212. conn.close()
  213. except Exception:
  214. # an exception happened, make sure to close the
  215. # socket.
  216. self.nr_conns -= 1
  217. fs.conn.close()
  218. def handle(self, conn):
  219. keepalive = False
  220. req = None
  221. try:
  222. req = next(conn.parser)
  223. if not req:
  224. return (False, conn)
  225. # handle the request
  226. keepalive = self.handle_request(req, conn)
  227. if keepalive:
  228. return (keepalive, conn)
  229. except http.errors.NoMoreData as e:
  230. self.log.debug("Ignored premature client disconnection. %s", e)
  231. except StopIteration as e:
  232. self.log.debug("Closing connection. %s", e)
  233. except ssl.SSLError as e:
  234. if e.args[0] == ssl.SSL_ERROR_EOF:
  235. self.log.debug("ssl connection closed")
  236. conn.sock.close()
  237. else:
  238. self.log.debug("Error processing SSL request.")
  239. self.handle_error(req, conn.sock, conn.client, e)
  240. except EnvironmentError as e:
  241. if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
  242. self.log.exception("Socket error processing request.")
  243. else:
  244. if e.errno == errno.ECONNRESET:
  245. self.log.debug("Ignoring connection reset")
  246. elif e.errno == errno.ENOTCONN:
  247. self.log.debug("Ignoring socket not connected")
  248. else:
  249. self.log.debug("Ignoring connection epipe")
  250. except Exception as e:
  251. self.handle_error(req, conn.sock, conn.client, e)
  252. return (False, conn)
  253. def handle_request(self, req, conn):
  254. environ = {}
  255. resp = None
  256. try:
  257. self.cfg.pre_request(self, req)
  258. request_start = datetime.now()
  259. resp, environ = wsgi.create(req, conn.sock, conn.client,
  260. conn.server, self.cfg)
  261. environ["wsgi.multithread"] = True
  262. self.nr += 1
  263. if self.nr >= self.max_requests:
  264. if self.alive:
  265. self.log.info("Autorestarting worker after current request.")
  266. self.alive = False
  267. resp.force_close()
  268. if not self.alive or not self.cfg.keepalive:
  269. resp.force_close()
  270. elif len(self._keep) >= self.max_keepalived:
  271. resp.force_close()
  272. respiter = self.wsgi(environ, resp.start_response)
  273. try:
  274. if isinstance(respiter, environ['wsgi.file_wrapper']):
  275. resp.write_file(respiter)
  276. else:
  277. for item in respiter:
  278. resp.write(item)
  279. resp.close()
  280. request_time = datetime.now() - request_start
  281. self.log.access(resp, req, environ, request_time)
  282. finally:
  283. if hasattr(respiter, "close"):
  284. respiter.close()
  285. if resp.should_close():
  286. self.log.debug("Closing connection.")
  287. return False
  288. except EnvironmentError:
  289. # pass to next try-except level
  290. util.reraise(*sys.exc_info())
  291. except Exception:
  292. if resp and resp.headers_sent:
  293. # If the requests have already been sent, we should close the
  294. # connection to indicate the error.
  295. self.log.exception("Error handling request")
  296. try:
  297. conn.sock.shutdown(socket.SHUT_RDWR)
  298. conn.sock.close()
  299. except EnvironmentError:
  300. pass
  301. raise StopIteration()
  302. raise
  303. finally:
  304. try:
  305. self.cfg.post_request(self, req, environ, resp)
  306. except Exception:
  307. self.log.exception("Exception in post_request hook")
  308. return True