123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525 |
- import functools
- import time
- import inspect
- import collections
- import types
- import itertools
- import pkg_resources.extern.more_itertools
- from typing import Callable, TypeVar
- CallableT = TypeVar("CallableT", bound=Callable[..., object])
- def compose(*funcs):
- """
- Compose any number of unary functions into a single unary function.
- >>> import textwrap
- >>> expected = str.strip(textwrap.dedent(compose.__doc__))
- >>> strip_and_dedent = compose(str.strip, textwrap.dedent)
- >>> strip_and_dedent(compose.__doc__) == expected
- True
- Compose also allows the innermost function to take arbitrary arguments.
- >>> round_three = lambda x: round(x, ndigits=3)
- >>> f = compose(round_three, int.__truediv__)
- >>> [f(3*x, x+1) for x in range(1,10)]
- [1.5, 2.0, 2.25, 2.4, 2.5, 2.571, 2.625, 2.667, 2.7]
- """
- def compose_two(f1, f2):
- return lambda *args, **kwargs: f1(f2(*args, **kwargs))
- return functools.reduce(compose_two, funcs)
- def method_caller(method_name, *args, **kwargs):
- """
- Return a function that will call a named method on the
- target object with optional positional and keyword
- arguments.
- >>> lower = method_caller('lower')
- >>> lower('MyString')
- 'mystring'
- """
- def call_method(target):
- func = getattr(target, method_name)
- return func(*args, **kwargs)
- return call_method
- def once(func):
- """
- Decorate func so it's only ever called the first time.
- This decorator can ensure that an expensive or non-idempotent function
- will not be expensive on subsequent calls and is idempotent.
- >>> add_three = once(lambda a: a+3)
- >>> add_three(3)
- 6
- >>> add_three(9)
- 6
- >>> add_three('12')
- 6
- To reset the stored value, simply clear the property ``saved_result``.
- >>> del add_three.saved_result
- >>> add_three(9)
- 12
- >>> add_three(8)
- 12
- Or invoke 'reset()' on it.
- >>> add_three.reset()
- >>> add_three(-3)
- 0
- >>> add_three(0)
- 0
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- if not hasattr(wrapper, 'saved_result'):
- wrapper.saved_result = func(*args, **kwargs)
- return wrapper.saved_result
- wrapper.reset = lambda: vars(wrapper).__delitem__('saved_result')
- return wrapper
- def method_cache(
- method: CallableT,
- cache_wrapper: Callable[
- [CallableT], CallableT
- ] = functools.lru_cache(), # type: ignore[assignment]
- ) -> CallableT:
- """
- Wrap lru_cache to support storing the cache data in the object instances.
- Abstracts the common paradigm where the method explicitly saves an
- underscore-prefixed protected property on first call and returns that
- subsequently.
- >>> class MyClass:
- ... calls = 0
- ...
- ... @method_cache
- ... def method(self, value):
- ... self.calls += 1
- ... return value
- >>> a = MyClass()
- >>> a.method(3)
- 3
- >>> for x in range(75):
- ... res = a.method(x)
- >>> a.calls
- 75
- Note that the apparent behavior will be exactly like that of lru_cache
- except that the cache is stored on each instance, so values in one
- instance will not flush values from another, and when an instance is
- deleted, so are the cached values for that instance.
- >>> b = MyClass()
- >>> for x in range(35):
- ... res = b.method(x)
- >>> b.calls
- 35
- >>> a.method(0)
- 0
- >>> a.calls
- 75
- Note that if method had been decorated with ``functools.lru_cache()``,
- a.calls would have been 76 (due to the cached value of 0 having been
- flushed by the 'b' instance).
- Clear the cache with ``.cache_clear()``
- >>> a.method.cache_clear()
- Same for a method that hasn't yet been called.
- >>> c = MyClass()
- >>> c.method.cache_clear()
- Another cache wrapper may be supplied:
- >>> cache = functools.lru_cache(maxsize=2)
- >>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache)
- >>> a = MyClass()
- >>> a.method2()
- 3
- Caution - do not subsequently wrap the method with another decorator, such
- as ``@property``, which changes the semantics of the function.
- See also
- http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/
- for another implementation and additional justification.
- """
- def wrapper(self: object, *args: object, **kwargs: object) -> object:
- # it's the first call, replace the method with a cached, bound method
- bound_method: CallableT = types.MethodType( # type: ignore[assignment]
- method, self
- )
- cached_method = cache_wrapper(bound_method)
- setattr(self, method.__name__, cached_method)
- return cached_method(*args, **kwargs)
- # Support cache clear even before cache has been created.
- wrapper.cache_clear = lambda: None # type: ignore[attr-defined]
- return ( # type: ignore[return-value]
- _special_method_cache(method, cache_wrapper) or wrapper
- )
- def _special_method_cache(method, cache_wrapper):
- """
- Because Python treats special methods differently, it's not
- possible to use instance attributes to implement the cached
- methods.
- Instead, install the wrapper method under a different name
- and return a simple proxy to that wrapper.
- https://github.com/jaraco/jaraco.functools/issues/5
- """
- name = method.__name__
- special_names = '__getattr__', '__getitem__'
- if name not in special_names:
- return
- wrapper_name = '__cached' + name
- def proxy(self, *args, **kwargs):
- if wrapper_name not in vars(self):
- bound = types.MethodType(method, self)
- cache = cache_wrapper(bound)
- setattr(self, wrapper_name, cache)
- else:
- cache = getattr(self, wrapper_name)
- return cache(*args, **kwargs)
- return proxy
- def apply(transform):
- """
- Decorate a function with a transform function that is
- invoked on results returned from the decorated function.
- >>> @apply(reversed)
- ... def get_numbers(start):
- ... "doc for get_numbers"
- ... return range(start, start+3)
- >>> list(get_numbers(4))
- [6, 5, 4]
- >>> get_numbers.__doc__
- 'doc for get_numbers'
- """
- def wrap(func):
- return functools.wraps(func)(compose(transform, func))
- return wrap
- def result_invoke(action):
- r"""
- Decorate a function with an action function that is
- invoked on the results returned from the decorated
- function (for its side-effect), then return the original
- result.
- >>> @result_invoke(print)
- ... def add_two(a, b):
- ... return a + b
- >>> x = add_two(2, 3)
- 5
- >>> x
- 5
- """
- def wrap(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- result = func(*args, **kwargs)
- action(result)
- return result
- return wrapper
- return wrap
- def call_aside(f, *args, **kwargs):
- """
- Call a function for its side effect after initialization.
- >>> @call_aside
- ... def func(): print("called")
- called
- >>> func()
- called
- Use functools.partial to pass parameters to the initial call
- >>> @functools.partial(call_aside, name='bingo')
- ... def func(name): print("called with", name)
- called with bingo
- """
- f(*args, **kwargs)
- return f
- class Throttler:
- """
- Rate-limit a function (or other callable)
- """
- def __init__(self, func, max_rate=float('Inf')):
- if isinstance(func, Throttler):
- func = func.func
- self.func = func
- self.max_rate = max_rate
- self.reset()
- def reset(self):
- self.last_called = 0
- def __call__(self, *args, **kwargs):
- self._wait()
- return self.func(*args, **kwargs)
- def _wait(self):
- "ensure at least 1/max_rate seconds from last call"
- elapsed = time.time() - self.last_called
- must_wait = 1 / self.max_rate - elapsed
- time.sleep(max(0, must_wait))
- self.last_called = time.time()
- def __get__(self, obj, type=None):
- return first_invoke(self._wait, functools.partial(self.func, obj))
- def first_invoke(func1, func2):
- """
- Return a function that when invoked will invoke func1 without
- any parameters (for its side-effect) and then invoke func2
- with whatever parameters were passed, returning its result.
- """
- def wrapper(*args, **kwargs):
- func1()
- return func2(*args, **kwargs)
- return wrapper
- def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
- """
- Given a callable func, trap the indicated exceptions
- for up to 'retries' times, invoking cleanup on the
- exception. On the final attempt, allow any exceptions
- to propagate.
- """
- attempts = itertools.count() if retries == float('inf') else range(retries)
- for attempt in attempts:
- try:
- return func()
- except trap:
- cleanup()
- return func()
- def retry(*r_args, **r_kwargs):
- """
- Decorator wrapper for retry_call. Accepts arguments to retry_call
- except func and then returns a decorator for the decorated function.
- Ex:
- >>> @retry(retries=3)
- ... def my_func(a, b):
- ... "this is my funk"
- ... print(a, b)
- >>> my_func.__doc__
- 'this is my funk'
- """
- def decorate(func):
- @functools.wraps(func)
- def wrapper(*f_args, **f_kwargs):
- bound = functools.partial(func, *f_args, **f_kwargs)
- return retry_call(bound, *r_args, **r_kwargs)
- return wrapper
- return decorate
- def print_yielded(func):
- """
- Convert a generator into a function that prints all yielded elements
- >>> @print_yielded
- ... def x():
- ... yield 3; yield None
- >>> x()
- 3
- None
- """
- print_all = functools.partial(map, print)
- print_results = compose(more_itertools.consume, print_all, func)
- return functools.wraps(func)(print_results)
- def pass_none(func):
- """
- Wrap func so it's not called if its first param is None
- >>> print_text = pass_none(print)
- >>> print_text('text')
- text
- >>> print_text(None)
- """
- @functools.wraps(func)
- def wrapper(param, *args, **kwargs):
- if param is not None:
- return func(param, *args, **kwargs)
- return wrapper
- def assign_params(func, namespace):
- """
- Assign parameters from namespace where func solicits.
- >>> def func(x, y=3):
- ... print(x, y)
- >>> assigned = assign_params(func, dict(x=2, z=4))
- >>> assigned()
- 2 3
- The usual errors are raised if a function doesn't receive
- its required parameters:
- >>> assigned = assign_params(func, dict(y=3, z=4))
- >>> assigned()
- Traceback (most recent call last):
- TypeError: func() ...argument...
- It even works on methods:
- >>> class Handler:
- ... def meth(self, arg):
- ... print(arg)
- >>> assign_params(Handler().meth, dict(arg='crystal', foo='clear'))()
- crystal
- """
- sig = inspect.signature(func)
- params = sig.parameters.keys()
- call_ns = {k: namespace[k] for k in params if k in namespace}
- return functools.partial(func, **call_ns)
- def save_method_args(method):
- """
- Wrap a method such that when it is called, the args and kwargs are
- saved on the method.
- >>> class MyClass:
- ... @save_method_args
- ... def method(self, a, b):
- ... print(a, b)
- >>> my_ob = MyClass()
- >>> my_ob.method(1, 2)
- 1 2
- >>> my_ob._saved_method.args
- (1, 2)
- >>> my_ob._saved_method.kwargs
- {}
- >>> my_ob.method(a=3, b='foo')
- 3 foo
- >>> my_ob._saved_method.args
- ()
- >>> my_ob._saved_method.kwargs == dict(a=3, b='foo')
- True
- The arguments are stored on the instance, allowing for
- different instance to save different args.
- >>> your_ob = MyClass()
- >>> your_ob.method({str('x'): 3}, b=[4])
- {'x': 3} [4]
- >>> your_ob._saved_method.args
- ({'x': 3},)
- >>> my_ob._saved_method.args
- ()
- """
- args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs')
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- attr_name = '_saved_' + method.__name__
- attr = args_and_kwargs(args, kwargs)
- setattr(self, attr_name, attr)
- return method(self, *args, **kwargs)
- return wrapper
- def except_(*exceptions, replace=None, use=None):
- """
- Replace the indicated exceptions, if raised, with the indicated
- literal replacement or evaluated expression (if present).
- >>> safe_int = except_(ValueError)(int)
- >>> safe_int('five')
- >>> safe_int('5')
- 5
- Specify a literal replacement with ``replace``.
- >>> safe_int_r = except_(ValueError, replace=0)(int)
- >>> safe_int_r('five')
- 0
- Provide an expression to ``use`` to pass through particular parameters.
- >>> safe_int_pt = except_(ValueError, use='args[0]')(int)
- >>> safe_int_pt('five')
- 'five'
- """
- def decorate(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except exceptions:
- try:
- return eval(use)
- except TypeError:
- return replace
- return wrapper
- return decorate
|