sync.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. #
  6. from datetime import datetime
  7. import errno
  8. import os
  9. import select
  10. import socket
  11. import ssl
  12. import sys
  13. import gunicorn.http as http
  14. import gunicorn.http.wsgi as wsgi
  15. import gunicorn.util as util
  16. import gunicorn.workers.base as base
  17. class StopWaiting(Exception):
  18. """ exception raised to stop waiting for a connection """
  19. class SyncWorker(base.Worker):
  20. def accept(self, listener):
  21. client, addr = listener.accept()
  22. client.setblocking(1)
  23. util.close_on_exec(client)
  24. self.handle(listener, client, addr)
  25. def wait(self, timeout):
  26. try:
  27. self.notify()
  28. ret = select.select(self.wait_fds, [], [], timeout)
  29. if ret[0]:
  30. if self.PIPE[0] in ret[0]:
  31. os.read(self.PIPE[0], 1)
  32. return ret[0]
  33. except select.error as e:
  34. if e.args[0] == errno.EINTR:
  35. return self.sockets
  36. if e.args[0] == errno.EBADF:
  37. if self.nr < 0:
  38. return self.sockets
  39. else:
  40. raise StopWaiting
  41. raise
  42. def is_parent_alive(self):
  43. # If our parent changed then we shut down.
  44. if self.ppid != os.getppid():
  45. self.log.info("Parent changed, shutting down: %s", self)
  46. return False
  47. return True
  48. def run_for_one(self, timeout):
  49. listener = self.sockets[0]
  50. while self.alive:
  51. self.notify()
  52. # Accept a connection. If we get an error telling us
  53. # that no connection is waiting we fall down to the
  54. # select which is where we'll wait for a bit for new
  55. # workers to come give us some love.
  56. try:
  57. self.accept(listener)
  58. # Keep processing clients until no one is waiting. This
  59. # prevents the need to select() for every client that we
  60. # process.
  61. continue
  62. except EnvironmentError as e:
  63. if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
  64. errno.EWOULDBLOCK):
  65. raise
  66. if not self.is_parent_alive():
  67. return
  68. try:
  69. self.wait(timeout)
  70. except StopWaiting:
  71. return
  72. def run_for_multiple(self, timeout):
  73. while self.alive:
  74. self.notify()
  75. try:
  76. ready = self.wait(timeout)
  77. except StopWaiting:
  78. return
  79. if ready is not None:
  80. for listener in ready:
  81. if listener == self.PIPE[0]:
  82. continue
  83. try:
  84. self.accept(listener)
  85. except EnvironmentError as e:
  86. if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
  87. errno.EWOULDBLOCK):
  88. raise
  89. if not self.is_parent_alive():
  90. return
  91. def run(self):
  92. # if no timeout is given the worker will never wait and will
  93. # use the CPU for nothing. This minimal timeout prevent it.
  94. timeout = self.timeout or 0.5
  95. # self.socket appears to lose its blocking status after
  96. # we fork in the arbiter. Reset it here.
  97. for s in self.sockets:
  98. s.setblocking(0)
  99. if len(self.sockets) > 1:
  100. self.run_for_multiple(timeout)
  101. else:
  102. self.run_for_one(timeout)
  103. def handle(self, listener, client, addr):
  104. req = None
  105. try:
  106. if self.cfg.is_ssl:
  107. client = ssl.wrap_socket(client, server_side=True,
  108. **self.cfg.ssl_options)
  109. parser = http.RequestParser(self.cfg, client, addr)
  110. req = next(parser)
  111. self.handle_request(listener, req, client, addr)
  112. except http.errors.NoMoreData as e:
  113. self.log.debug("Ignored premature client disconnection. %s", e)
  114. except StopIteration as e:
  115. self.log.debug("Closing connection. %s", e)
  116. except ssl.SSLError as e:
  117. if e.args[0] == ssl.SSL_ERROR_EOF:
  118. self.log.debug("ssl connection closed")
  119. client.close()
  120. else:
  121. self.log.debug("Error processing SSL request.")
  122. self.handle_error(req, client, addr, e)
  123. except EnvironmentError as e:
  124. if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
  125. self.log.exception("Socket error processing request.")
  126. else:
  127. if e.errno == errno.ECONNRESET:
  128. self.log.debug("Ignoring connection reset")
  129. elif e.errno == errno.ENOTCONN:
  130. self.log.debug("Ignoring socket not connected")
  131. else:
  132. self.log.debug("Ignoring EPIPE")
  133. except Exception as e:
  134. self.handle_error(req, client, addr, e)
  135. finally:
  136. util.close(client)
  137. def handle_request(self, listener, req, client, addr):
  138. environ = {}
  139. resp = None
  140. try:
  141. self.cfg.pre_request(self, req)
  142. request_start = datetime.now()
  143. resp, environ = wsgi.create(req, client, addr,
  144. listener.getsockname(), self.cfg)
  145. # Force the connection closed until someone shows
  146. # a buffering proxy that supports Keep-Alive to
  147. # the backend.
  148. resp.force_close()
  149. self.nr += 1
  150. if self.nr >= self.max_requests:
  151. self.log.info("Autorestarting worker after current request.")
  152. self.alive = False
  153. respiter = self.wsgi(environ, resp.start_response)
  154. try:
  155. if isinstance(respiter, environ['wsgi.file_wrapper']):
  156. resp.write_file(respiter)
  157. else:
  158. for item in respiter:
  159. resp.write(item)
  160. resp.close()
  161. request_time = datetime.now() - request_start
  162. self.log.access(resp, req, environ, request_time)
  163. finally:
  164. if hasattr(respiter, "close"):
  165. respiter.close()
  166. except EnvironmentError:
  167. # pass to next try-except level
  168. util.reraise(*sys.exc_info())
  169. except Exception:
  170. if resp and resp.headers_sent:
  171. # If the requests have already been sent, we should close the
  172. # connection to indicate the error.
  173. self.log.exception("Error handling request")
  174. try:
  175. client.shutdown(socket.SHUT_RDWR)
  176. client.close()
  177. except EnvironmentError:
  178. pass
  179. raise StopIteration()
  180. raise
  181. finally:
  182. try:
  183. self.cfg.post_request(self, req, environ, resp)
  184. except Exception:
  185. self.log.exception("Exception in post_request hook")