ggevent.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. import os
  6. import sys
  7. from datetime import datetime
  8. from functools import partial
  9. import time
  10. try:
  11. import gevent
  12. except ImportError:
  13. raise RuntimeError("gevent worker requires gevent 1.4 or higher")
  14. else:
  15. from pkg_resources import parse_version
  16. if parse_version(gevent.__version__) < parse_version('1.4'):
  17. raise RuntimeError("gevent worker requires gevent 1.4 or higher")
  18. from gevent.pool import Pool
  19. from gevent.server import StreamServer
  20. from gevent import hub, monkey, socket, pywsgi
  21. import gunicorn
  22. from gunicorn.http.wsgi import base_environ
  23. from gunicorn.workers.base_async import AsyncWorker
  24. VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)
  25. class GeventWorker(AsyncWorker):
  26. server_class = None
  27. wsgi_handler = None
  28. def patch(self):
  29. monkey.patch_all()
  30. # patch sockets
  31. sockets = []
  32. for s in self.sockets:
  33. sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM,
  34. fileno=s.sock.fileno()))
  35. self.sockets = sockets
  36. def notify(self):
  37. super().notify()
  38. if self.ppid != os.getppid():
  39. self.log.info("Parent changed, shutting down: %s", self)
  40. sys.exit(0)
  41. def timeout_ctx(self):
  42. return gevent.Timeout(self.cfg.keepalive, False)
  43. def run(self):
  44. servers = []
  45. ssl_args = {}
  46. if self.cfg.is_ssl:
  47. ssl_args = dict(server_side=True, **self.cfg.ssl_options)
  48. for s in self.sockets:
  49. s.setblocking(1)
  50. pool = Pool(self.worker_connections)
  51. if self.server_class is not None:
  52. environ = base_environ(self.cfg)
  53. environ.update({
  54. "wsgi.multithread": True,
  55. "SERVER_SOFTWARE": VERSION,
  56. })
  57. server = self.server_class(
  58. s, application=self.wsgi, spawn=pool, log=self.log,
  59. handler_class=self.wsgi_handler, environ=environ,
  60. **ssl_args)
  61. else:
  62. hfun = partial(self.handle, s)
  63. server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
  64. if self.cfg.workers > 1:
  65. server.max_accept = 1
  66. server.start()
  67. servers.append(server)
  68. while self.alive:
  69. self.notify()
  70. gevent.sleep(1.0)
  71. try:
  72. # Stop accepting requests
  73. for server in servers:
  74. if hasattr(server, 'close'): # gevent 1.0
  75. server.close()
  76. if hasattr(server, 'kill'): # gevent < 1.0
  77. server.kill()
  78. # Handle current requests until graceful_timeout
  79. ts = time.time()
  80. while time.time() - ts <= self.cfg.graceful_timeout:
  81. accepting = 0
  82. for server in servers:
  83. if server.pool.free_count() != server.pool.size:
  84. accepting += 1
  85. # if no server is accepting a connection, we can exit
  86. if not accepting:
  87. return
  88. self.notify()
  89. gevent.sleep(1.0)
  90. # Force kill all active the handlers
  91. self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
  92. for server in servers:
  93. server.stop(timeout=1)
  94. except Exception:
  95. pass
  96. def handle(self, listener, client, addr):
  97. # Connected socket timeout defaults to socket.getdefaulttimeout().
  98. # This forces to blocking mode.
  99. client.setblocking(1)
  100. super().handle(listener, client, addr)
  101. def handle_request(self, listener_name, req, sock, addr):
  102. try:
  103. super().handle_request(listener_name, req, sock, addr)
  104. except gevent.GreenletExit:
  105. pass
  106. except SystemExit:
  107. pass
  108. def handle_quit(self, sig, frame):
  109. # Move this out of the signal handler so we can use
  110. # blocking calls. See #1126
  111. gevent.spawn(super().handle_quit, sig, frame)
  112. def handle_usr1(self, sig, frame):
  113. # Make the gevent workers handle the usr1 signal
  114. # by deferring to a new greenlet. See #1645
  115. gevent.spawn(super().handle_usr1, sig, frame)
  116. def init_process(self):
  117. self.patch()
  118. hub.reinit()
  119. super().init_process()
  120. class GeventResponse(object):
  121. status = None
  122. headers = None
  123. sent = None
  124. def __init__(self, status, headers, clength):
  125. self.status = status
  126. self.headers = headers
  127. self.sent = clength
  128. class PyWSGIHandler(pywsgi.WSGIHandler):
  129. def log_request(self):
  130. start = datetime.fromtimestamp(self.time_start)
  131. finish = datetime.fromtimestamp(self.time_finish)
  132. response_time = finish - start
  133. resp_headers = getattr(self, 'response_headers', {})
  134. resp = GeventResponse(self.status, resp_headers, self.response_length)
  135. if hasattr(self, 'headers'):
  136. req_headers = self.headers.items()
  137. else:
  138. req_headers = []
  139. self.server.log.access(resp, req_headers, self.environ, response_time)
  140. def get_environ(self):
  141. env = super().get_environ()
  142. env['gunicorn.sock'] = self.socket
  143. env['RAW_URI'] = self.path
  144. return env
  145. class PyWSGIServer(pywsgi.WSGIServer):
  146. pass
  147. class GeventPyWSGIWorker(GeventWorker):
  148. "The Gevent StreamServer based workers."
  149. server_class = PyWSGIServer
  150. wsgi_handler = PyWSGIHandler