123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- # -*- coding: utf-8 -
- #
- # This file is part of gunicorn released under the MIT license.
- # See the NOTICE for more information.
- #
- from datetime import datetime
- import errno
- import os
- import select
- import socket
- import ssl
- import sys
- import gunicorn.http as http
- import gunicorn.http.wsgi as wsgi
- import gunicorn.util as util
- import gunicorn.workers.base as base
- class StopWaiting(Exception):
- """ exception raised to stop waiting for a connection """
- class SyncWorker(base.Worker):
- def accept(self, listener):
- client, addr = listener.accept()
- client.setblocking(1)
- util.close_on_exec(client)
- self.handle(listener, client, addr)
- def wait(self, timeout):
- try:
- self.notify()
- ret = select.select(self.wait_fds, [], [], timeout)
- if ret[0]:
- if self.PIPE[0] in ret[0]:
- os.read(self.PIPE[0], 1)
- return ret[0]
- except select.error as e:
- if e.args[0] == errno.EINTR:
- return self.sockets
- if e.args[0] == errno.EBADF:
- if self.nr < 0:
- return self.sockets
- else:
- raise StopWaiting
- raise
- def is_parent_alive(self):
- # If our parent changed then we shut down.
- if self.ppid != os.getppid():
- self.log.info("Parent changed, shutting down: %s", self)
- return False
- return True
- def run_for_one(self, timeout):
- listener = self.sockets[0]
- while self.alive:
- self.notify()
- # Accept a connection. If we get an error telling us
- # that no connection is waiting we fall down to the
- # select which is where we'll wait for a bit for new
- # workers to come give us some love.
- try:
- self.accept(listener)
- # Keep processing clients until no one is waiting. This
- # prevents the need to select() for every client that we
- # process.
- continue
- except EnvironmentError as e:
- if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
- errno.EWOULDBLOCK):
- raise
- if not self.is_parent_alive():
- return
- try:
- self.wait(timeout)
- except StopWaiting:
- return
- def run_for_multiple(self, timeout):
- while self.alive:
- self.notify()
- try:
- ready = self.wait(timeout)
- except StopWaiting:
- return
- if ready is not None:
- for listener in ready:
- if listener == self.PIPE[0]:
- continue
- try:
- self.accept(listener)
- except EnvironmentError as e:
- if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
- errno.EWOULDBLOCK):
- raise
- if not self.is_parent_alive():
- return
- def run(self):
- # if no timeout is given the worker will never wait and will
- # use the CPU for nothing. This minimal timeout prevent it.
- timeout = self.timeout or 0.5
- # self.socket appears to lose its blocking status after
- # we fork in the arbiter. Reset it here.
- for s in self.sockets:
- s.setblocking(0)
- if len(self.sockets) > 1:
- self.run_for_multiple(timeout)
- else:
- self.run_for_one(timeout)
- def handle(self, listener, client, addr):
- req = None
- try:
- if self.cfg.is_ssl:
- client = ssl.wrap_socket(client, server_side=True,
- **self.cfg.ssl_options)
- parser = http.RequestParser(self.cfg, client, addr)
- req = next(parser)
- self.handle_request(listener, req, client, addr)
- except http.errors.NoMoreData as e:
- self.log.debug("Ignored premature client disconnection. %s", e)
- except StopIteration as e:
- self.log.debug("Closing connection. %s", e)
- except ssl.SSLError as e:
- if e.args[0] == ssl.SSL_ERROR_EOF:
- self.log.debug("ssl connection closed")
- client.close()
- else:
- self.log.debug("Error processing SSL request.")
- self.handle_error(req, client, addr, e)
- except EnvironmentError as e:
- if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
- self.log.exception("Socket error processing request.")
- else:
- if e.errno == errno.ECONNRESET:
- self.log.debug("Ignoring connection reset")
- elif e.errno == errno.ENOTCONN:
- self.log.debug("Ignoring socket not connected")
- else:
- self.log.debug("Ignoring EPIPE")
- except Exception as e:
- self.handle_error(req, client, addr, e)
- finally:
- util.close(client)
- def handle_request(self, listener, req, client, addr):
- environ = {}
- resp = None
- try:
- self.cfg.pre_request(self, req)
- request_start = datetime.now()
- resp, environ = wsgi.create(req, client, addr,
- listener.getsockname(), self.cfg)
- # Force the connection closed until someone shows
- # a buffering proxy that supports Keep-Alive to
- # the backend.
- resp.force_close()
- self.nr += 1
- if self.nr >= self.max_requests:
- self.log.info("Autorestarting worker after current request.")
- self.alive = False
- respiter = self.wsgi(environ, resp.start_response)
- try:
- if isinstance(respiter, environ['wsgi.file_wrapper']):
- resp.write_file(respiter)
- else:
- for item in respiter:
- resp.write(item)
- resp.close()
- request_time = datetime.now() - request_start
- self.log.access(resp, req, environ, request_time)
- finally:
- if hasattr(respiter, "close"):
- respiter.close()
- except EnvironmentError:
- # pass to next try-except level
- util.reraise(*sys.exc_info())
- except Exception:
- if resp and resp.headers_sent:
- # If the requests have already been sent, we should close the
- # connection to indicate the error.
- self.log.exception("Error handling request")
- try:
- client.shutdown(socket.SHUT_RDWR)
- client.close()
- except EnvironmentError:
- pass
- raise StopIteration()
- raise
- finally:
- try:
- self.cfg.post_request(self, req, environ, resp)
- except Exception:
- self.log.exception("Exception in post_request hook")
|