base.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. import io
  6. import os
  7. import signal
  8. import sys
  9. import time
  10. import traceback
  11. from datetime import datetime
  12. from random import randint
  13. from ssl import SSLError
  14. from gunicorn import util
  15. from gunicorn.http.errors import (
  16. ForbiddenProxyRequest, InvalidHeader,
  17. InvalidHeaderName, InvalidHTTPVersion,
  18. InvalidProxyLine, InvalidRequestLine,
  19. InvalidRequestMethod, InvalidSchemeHeaders,
  20. LimitRequestHeaders, LimitRequestLine,
  21. )
  22. from gunicorn.http.wsgi import Response, default_environ
  23. from gunicorn.reloader import reloader_engines
  24. from gunicorn.workers.workertmp import WorkerTmp
  25. class Worker(object):
  26. SIGNALS = [getattr(signal, "SIG%s" % x) for x in (
  27. "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()
  28. )]
  29. PIPE = []
  30. def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
  31. """\
  32. This is called pre-fork so it shouldn't do anything to the
  33. current process. If there's a need to make process wide
  34. changes you'll want to do that in ``self.init_process()``.
  35. """
  36. self.age = age
  37. self.pid = "[booting]"
  38. self.ppid = ppid
  39. self.sockets = sockets
  40. self.app = app
  41. self.timeout = timeout
  42. self.cfg = cfg
  43. self.booted = False
  44. self.aborted = False
  45. self.reloader = None
  46. self.nr = 0
  47. if cfg.max_requests > 0:
  48. jitter = randint(0, cfg.max_requests_jitter)
  49. self.max_requests = cfg.max_requests + jitter
  50. else:
  51. self.max_requests = sys.maxsize
  52. self.alive = True
  53. self.log = log
  54. self.tmp = WorkerTmp(cfg)
  55. def __str__(self):
  56. return "<Worker %s>" % self.pid
  57. def notify(self):
  58. """\
  59. Your worker subclass must arrange to have this method called
  60. once every ``self.timeout`` seconds. If you fail in accomplishing
  61. this task, the master process will murder your workers.
  62. """
  63. self.tmp.notify()
  64. def run(self):
  65. """\
  66. This is the mainloop of a worker process. You should override
  67. this method in a subclass to provide the intended behaviour
  68. for your particular evil schemes.
  69. """
  70. raise NotImplementedError()
  71. def init_process(self):
  72. """\
  73. If you override this method in a subclass, the last statement
  74. in the function should be to call this method with
  75. super().init_process() so that the ``run()`` loop is initiated.
  76. """
  77. # set environment' variables
  78. if self.cfg.env:
  79. for k, v in self.cfg.env.items():
  80. os.environ[k] = v
  81. util.set_owner_process(self.cfg.uid, self.cfg.gid,
  82. initgroups=self.cfg.initgroups)
  83. # Reseed the random number generator
  84. util.seed()
  85. # For waking ourselves up
  86. self.PIPE = os.pipe()
  87. for p in self.PIPE:
  88. util.set_non_blocking(p)
  89. util.close_on_exec(p)
  90. # Prevent fd inheritance
  91. for s in self.sockets:
  92. util.close_on_exec(s)
  93. util.close_on_exec(self.tmp.fileno())
  94. self.wait_fds = self.sockets + [self.PIPE[0]]
  95. self.log.close_on_exec()
  96. self.init_signals()
  97. # start the reloader
  98. if self.cfg.reload:
  99. def changed(fname):
  100. self.log.info("Worker reloading: %s modified", fname)
  101. self.alive = False
  102. os.write(self.PIPE[1], b"1")
  103. self.cfg.worker_int(self)
  104. time.sleep(0.1)
  105. sys.exit(0)
  106. reloader_cls = reloader_engines[self.cfg.reload_engine]
  107. self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
  108. callback=changed)
  109. self.load_wsgi()
  110. if self.reloader:
  111. self.reloader.start()
  112. self.cfg.post_worker_init(self)
  113. # Enter main run loop
  114. self.booted = True
  115. self.run()
  116. def load_wsgi(self):
  117. try:
  118. self.wsgi = self.app.wsgi()
  119. except SyntaxError as e:
  120. if not self.cfg.reload:
  121. raise
  122. self.log.exception(e)
  123. # fix from PR #1228
  124. # storing the traceback into exc_tb will create a circular reference.
  125. # per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
  126. # delete the traceback after use.
  127. try:
  128. _, exc_val, exc_tb = sys.exc_info()
  129. self.reloader.add_extra_file(exc_val.filename)
  130. tb_string = io.StringIO()
  131. traceback.print_tb(exc_tb, file=tb_string)
  132. self.wsgi = util.make_fail_app(tb_string.getvalue())
  133. finally:
  134. del exc_tb
  135. def init_signals(self):
  136. # reset signaling
  137. for s in self.SIGNALS:
  138. signal.signal(s, signal.SIG_DFL)
  139. # init new signaling
  140. signal.signal(signal.SIGQUIT, self.handle_quit)
  141. signal.signal(signal.SIGTERM, self.handle_exit)
  142. signal.signal(signal.SIGINT, self.handle_quit)
  143. signal.signal(signal.SIGWINCH, self.handle_winch)
  144. signal.signal(signal.SIGUSR1, self.handle_usr1)
  145. signal.signal(signal.SIGABRT, self.handle_abort)
  146. # Don't let SIGTERM and SIGUSR1 disturb active requests
  147. # by interrupting system calls
  148. signal.siginterrupt(signal.SIGTERM, False)
  149. signal.siginterrupt(signal.SIGUSR1, False)
  150. if hasattr(signal, 'set_wakeup_fd'):
  151. signal.set_wakeup_fd(self.PIPE[1])
  152. def handle_usr1(self, sig, frame):
  153. self.log.reopen_files()
  154. def handle_exit(self, sig, frame):
  155. self.alive = False
  156. def handle_quit(self, sig, frame):
  157. self.alive = False
  158. # worker_int callback
  159. self.cfg.worker_int(self)
  160. time.sleep(0.1)
  161. sys.exit(0)
  162. def handle_abort(self, sig, frame):
  163. self.alive = False
  164. self.cfg.worker_abort(self)
  165. sys.exit(1)
  166. def handle_error(self, req, client, addr, exc):
  167. request_start = datetime.now()
  168. addr = addr or ('', -1) # unix socket case
  169. if isinstance(exc, (
  170. InvalidRequestLine, InvalidRequestMethod,
  171. InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
  172. LimitRequestLine, LimitRequestHeaders,
  173. InvalidProxyLine, ForbiddenProxyRequest,
  174. InvalidSchemeHeaders,
  175. SSLError,
  176. )):
  177. status_int = 400
  178. reason = "Bad Request"
  179. if isinstance(exc, InvalidRequestLine):
  180. mesg = "Invalid Request Line '%s'" % str(exc)
  181. elif isinstance(exc, InvalidRequestMethod):
  182. mesg = "Invalid Method '%s'" % str(exc)
  183. elif isinstance(exc, InvalidHTTPVersion):
  184. mesg = "Invalid HTTP Version '%s'" % str(exc)
  185. elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
  186. mesg = "%s" % str(exc)
  187. if not req and hasattr(exc, "req"):
  188. req = exc.req # for access log
  189. elif isinstance(exc, LimitRequestLine):
  190. mesg = "%s" % str(exc)
  191. elif isinstance(exc, LimitRequestHeaders):
  192. mesg = "Error parsing headers: '%s'" % str(exc)
  193. elif isinstance(exc, InvalidProxyLine):
  194. mesg = "'%s'" % str(exc)
  195. elif isinstance(exc, ForbiddenProxyRequest):
  196. reason = "Forbidden"
  197. mesg = "Request forbidden"
  198. status_int = 403
  199. elif isinstance(exc, InvalidSchemeHeaders):
  200. mesg = "%s" % str(exc)
  201. elif isinstance(exc, SSLError):
  202. reason = "Forbidden"
  203. mesg = "'%s'" % str(exc)
  204. status_int = 403
  205. msg = "Invalid request from ip={ip}: {error}"
  206. self.log.debug(msg.format(ip=addr[0], error=str(exc)))
  207. else:
  208. if hasattr(req, "uri"):
  209. self.log.exception("Error handling request %s", req.uri)
  210. status_int = 500
  211. reason = "Internal Server Error"
  212. mesg = ""
  213. if req is not None:
  214. request_time = datetime.now() - request_start
  215. environ = default_environ(req, client, self.cfg)
  216. environ['REMOTE_ADDR'] = addr[0]
  217. environ['REMOTE_PORT'] = str(addr[1])
  218. resp = Response(req, client, self.cfg)
  219. resp.status = "%s %s" % (status_int, reason)
  220. resp.response_length = len(mesg)
  221. self.log.access(resp, req, environ, request_time)
  222. try:
  223. util.write_error(client, status_int, reason, mesg)
  224. except Exception:
  225. self.log.debug("Failed to send error message.")
  226. def handle_winch(self, sig, fname):
  227. # Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
  228. self.log.debug("worker: SIGWINCH ignored.")