geventlet.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. from functools import partial
  6. import sys
  7. try:
  8. import eventlet
  9. except ImportError:
  10. raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher")
  11. else:
  12. from pkg_resources import parse_version
  13. if parse_version(eventlet.__version__) < parse_version('0.24.1'):
  14. raise RuntimeError("eventlet worker requires eventlet 0.24.1 or higher")
  15. from eventlet import hubs, greenthread
  16. from eventlet.greenio import GreenSocket
  17. from eventlet.wsgi import ALREADY_HANDLED as EVENTLET_ALREADY_HANDLED
  18. import greenlet
  19. from gunicorn.workers.base_async import AsyncWorker
  20. def _eventlet_socket_sendfile(self, file, offset=0, count=None):
  21. # Based on the implementation in gevent which in turn is slightly
  22. # modified from the standard library implementation.
  23. if self.gettimeout() == 0:
  24. raise ValueError("non-blocking sockets are not supported")
  25. if offset:
  26. file.seek(offset)
  27. blocksize = min(count, 8192) if count else 8192
  28. total_sent = 0
  29. # localize variable access to minimize overhead
  30. file_read = file.read
  31. sock_send = self.send
  32. try:
  33. while True:
  34. if count:
  35. blocksize = min(count - total_sent, blocksize)
  36. if blocksize <= 0:
  37. break
  38. data = memoryview(file_read(blocksize))
  39. if not data:
  40. break # EOF
  41. while True:
  42. try:
  43. sent = sock_send(data)
  44. except BlockingIOError:
  45. continue
  46. else:
  47. total_sent += sent
  48. if sent < len(data):
  49. data = data[sent:]
  50. else:
  51. break
  52. return total_sent
  53. finally:
  54. if total_sent > 0 and hasattr(file, 'seek'):
  55. file.seek(offset + total_sent)
  56. def _eventlet_serve(sock, handle, concurrency):
  57. """
  58. Serve requests forever.
  59. This code is nearly identical to ``eventlet.convenience.serve`` except
  60. that it attempts to join the pool at the end, which allows for gunicorn
  61. graceful shutdowns.
  62. """
  63. pool = eventlet.greenpool.GreenPool(concurrency)
  64. server_gt = eventlet.greenthread.getcurrent()
  65. while True:
  66. try:
  67. conn, addr = sock.accept()
  68. gt = pool.spawn(handle, conn, addr)
  69. gt.link(_eventlet_stop, server_gt, conn)
  70. conn, addr, gt = None, None, None
  71. except eventlet.StopServe:
  72. sock.close()
  73. pool.waitall()
  74. return
  75. def _eventlet_stop(client, server, conn):
  76. """
  77. Stop a greenlet handling a request and close its connection.
  78. This code is lifted from eventlet so as not to depend on undocumented
  79. functions in the library.
  80. """
  81. try:
  82. try:
  83. client.wait()
  84. finally:
  85. conn.close()
  86. except greenlet.GreenletExit:
  87. pass
  88. except Exception:
  89. greenthread.kill(server, *sys.exc_info())
  90. def patch_sendfile():
  91. # As of eventlet 0.25.1, GreenSocket.sendfile doesn't exist,
  92. # meaning the native implementations of socket.sendfile will be used.
  93. # If os.sendfile exists, it will attempt to use that, failing explicitly
  94. # if the socket is in non-blocking mode, which the underlying
  95. # socket object /is/. Even the regular _sendfile_use_send will
  96. # fail in that way; plus, it would use the underlying socket.send which isn't
  97. # properly cooperative. So we have to monkey-patch a working socket.sendfile()
  98. # into GreenSocket; in this method, `self.send` will be the GreenSocket's
  99. # send method which is properly cooperative.
  100. if not hasattr(GreenSocket, 'sendfile'):
  101. GreenSocket.sendfile = _eventlet_socket_sendfile
  102. class EventletWorker(AsyncWorker):
  103. def patch(self):
  104. hubs.use_hub()
  105. eventlet.monkey_patch()
  106. patch_sendfile()
  107. def is_already_handled(self, respiter):
  108. if respiter == EVENTLET_ALREADY_HANDLED:
  109. raise StopIteration()
  110. return super().is_already_handled(respiter)
  111. def init_process(self):
  112. self.patch()
  113. super().init_process()
  114. def handle_quit(self, sig, frame):
  115. eventlet.spawn(super().handle_quit, sig, frame)
  116. def handle_usr1(self, sig, frame):
  117. eventlet.spawn(super().handle_usr1, sig, frame)
  118. def timeout_ctx(self):
  119. return eventlet.Timeout(self.cfg.keepalive or None, False)
  120. def handle(self, listener, client, addr):
  121. if self.cfg.is_ssl:
  122. client = eventlet.wrap_ssl(client, server_side=True,
  123. **self.cfg.ssl_options)
  124. super().handle(listener, client, addr)
  125. def run(self):
  126. acceptors = []
  127. for sock in self.sockets:
  128. gsock = GreenSocket(sock)
  129. gsock.setblocking(1)
  130. hfun = partial(self.handle, gsock)
  131. acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun,
  132. self.worker_connections)
  133. acceptors.append(acceptor)
  134. eventlet.sleep(0.0)
  135. while self.alive:
  136. self.notify()
  137. eventlet.sleep(1.0)
  138. self.notify()
  139. try:
  140. with eventlet.Timeout(self.cfg.graceful_timeout) as t:
  141. for a in acceptors:
  142. a.kill(eventlet.StopServe())
  143. for a in acceptors:
  144. a.wait()
  145. except eventlet.Timeout as te:
  146. if te != t:
  147. raise
  148. for a in acceptors:
  149. a.kill()