For the past few days I was working on dfit,
a python module which find the best probability distribution for your observations.
Luckily scipy.stats
has implemented most of the distributions and I just have to fit all
the distributions and compute their errors. Obviously it was not that easy, their were
other complexities.
Of all the problems this one was particularly irritating and time consuming. To put you
in context of my problem understand that their is around 100 distributions in scipy
and I had to process data with all of them for every user input. Also few distributions
in scipy.stats
sometimes don’t converge, function call just hangs for a long time.
Two things became pretty clear from above, that I must :-
Lets assume _fit_distribution(dist)
is the function to process a single distribution
- Process distributions, run
_fit_distribution()
in parallel - Implement timeout on
_fit_distribution()
function calls. - Implement above for best possible performance.
Note:
If you don’t know what cython and GIL is, you can skip this part.
Cython and OpenMP
Since I was using cython form the start of this project, So
Initially
I thought of using native parallelism, OpenMP which cython supports (prange
). Benefits of using
this will obviously be execution speed as it will run without GIL (will not block other
threads while running unlike python threads, all threads can run at same time).
It is fairly simple to write in cython, below code will create separate thread for each
distribution using OpenMP.
1from cython.parallel import prange2...3# assume distributions are in list 'dists'4num_of_native_threads = len(dists)5for i in prange(num_of_native_threads, nogil=True):6 _fit_distribution(dists[i])
If you tried above code it will fail to compile and give you errors like “Operation not allowed
without gil” and bunch of other errors. It was expected as we cannot access python objects
without acquiring GIL. Since scipy.stats
does not expose any of these functions using
cython pxds so their is no workaround this (unless you port all of scipy.stats
in cython).
(Although we can fix this using with gil:
blocks but it will be then same as python threads)
Also OpenMP does not support any sort of timeouts.
Since cython’s prange cannot access python objects without GIL and OpenMP does not support any sort of timeouts, we cannot use it.
If your function can run without GIL then it might actually be a good solution but again you have to figure out timeouts in OpenMP (or just simply block on a mutex)
Timeouts in Python 3
There are many approaches to achieve this behavior, each with their pros and cons.
1. Using Signals
This is most simple and but works only in UNIX systems which has signal
module.
Also you can call it from main thread only, so it doesn’t play well with threads.
1# test_signals.py2# only available in unix systems3import functools4import signal5import time67def timeout(max_timeout, default=None):8 """Timeout decorator, parameter in seconds."""9 def timeout_decorator(func):10 """Wrap the original function."""11 @functools.wraps(func)12 def func_wrapper(*args, **kwargs):13 """Timeout using signal."""14 class MyTimeoutError(Exception):15 pass1617 def handler(signum, frame):18 raise MyTimeoutError(19 "{0} - Timeout after {1} seconds".format(func.__name__, max_timeout)20 )2122 # set the timeout handler23 signal.signal(signal.SIGALRM, handler)24 signal.alarm(max_timeout)25 result = default26 try:27 result = func(*args, **kwargs)28 except MyTimeoutError as exc:29 # handle the timeout30 print(str(exc))31 finally:32 # Cancel the timer33 signal.alarm(0)3435 return result36 return func_wrapper37 return timeout_decorator38394041@timeout(2)42def _fit_distribution():43 # some long operation which run for undetermined time44 for i in range(50):45 time.sleep(i)4647_fit_distribution()
OUTPUT
1$ python test_signals.py2_fit_distribution - Timeout after 2 seconds
Pros
- Almost no overhead as no thread or process is spawn.
Cons
- Only works in UNIX systems
- Will not work if function catch all exceptions using
try
catch
.1@timeout(10)2def _fit_distribution():3 try:4 time.sleep(1000)5 except Exception:6 pass signal
s are global so they might interfere with third party modules.signal
works only in main thread so they are not recommenced when using threads
2. Using Python Threads - threading
Using high-level threads module - threading
we can emulate timeouts for our
functions.
This recipe is inspired from answers of
this
stackoverflow question.
How this works?
- We spawn a thread for our function and wait for
timeout
secs for it to return. - If it returns within timeout we forward the value to the caller.
- If some exception is raised inside the thread we save that exception info and re-propagate it back to the caller.
- If it does not return within timeout and no
exceptions are raised then we kill the thread by raising
MyTimeoutError
inside it.
1# test_threading.py2import threading3import sys4import time56class MyTimeoutError(Exception):7 # exception for our timeouts8 pass910def timeout_func(func, args=None, kwargs=None, timeout=30, default=None):11 """This function will spawn a thread and run the given function12 using the args, kwargs and return the given default value if the13 timeout is exceeded.14 http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call15 """1617 class InterruptableThread(threading.Thread):18 def __init__(self):19 threading.Thread.__init__(self)20 self.result = default21 self.exc_info = (None, None, None)2223 def run(self):24 try:25 self.result = func(*(args or ()), **(kwargs or {}))26 except Exception as err:27 self.exc_info = sys.exc_info()2829 def suicide(self):30 raise MyTimeoutError(31 "{0} timeout (taking more than {1} sec)".format(func.__name__, timeout)32 )3334 it = InterruptableThread()35 it.start()36 it.join(timeout)3738 if it.exc_info[0] is not None:39 a, b, c = it.exc_info40 raise Exception(a, b, c) # communicate that to caller4142 if it.isAlive():43 it.suicide()44 raise RuntimeError45 else:46 return it.result474849def _fit_distribution():50 # some long operation which run for undetermined time51 for i in range(50):52 time.sleep(i)5354try:55 timeout_func(_fit_distribution, timeout=2)56except MyTimeoutError as ex:57 print(ex)
OUTPUT
1$ python test_threading.py2_fit_distribution timeout (taking more than 2 sec)
Pros
- Cross platform, unlike
signal
method.
Cons
- Little overhead of using threads.
3. Using Python Threads - low-level _threads
In previous method we were sub-classing threading.Thread
to implement
custom threads which can be safely killed and propagate errors back to caller.
But using high level managed threading
module threads add some overhead.
So instead of waiting for threads to return, we can instead block on a Queue
which low-level threads will fill.
- We initialize a thread-sync queue,
queue.Queue()
- We spawn thread using low-level
_threads
module which will fill the queue with function return and exception information. - We block on
Queue
for timeout seconds.- If Queue is still empty (function does not return or raise exception within
timeout), we kill the thread by raising exception inside it using
async_raise()
- If Queue is not empty and we found exception, we re-raise the exception back to caller.
- If Queue is not empty and no exception is found, we simply return the value.
- If Queue is still empty (function does not return or raise exception within
timeout), we kill the thread by raising exception inside it using
There will be very small overhead for this, as in this method we are using low-level CPython APIs.
1# test_low_level_threads.py2import sys3import traceback4from multiprocessing import TimeoutError as MpTimeoutError5from queue import Empty as Queue_Empty6from queue import Queue7from _thread import start_new_thread8from ctypes import c_long9from ctypes import py_object10from ctypes import pythonapi1112class MyTimeoutError(Exception):13 pass141516def async_raise(tid, exctype=Exception):17 """18 Raise an Exception in the Thread with id `tid`. Perform cleanup if19 needed.20 Based on Killable Threads By Tomer Filiba21 from http://tomerfiliba.com/recipes/Thread2/22 license: public domain.23 """24 assert isinstance(tid, int), 'Invalid thread id: must an integer'2526 tid = c_long(tid)27 exception = py_object(Exception)28 res = pythonapi.PyThreadState_SetAsyncExc(tid, exception)29 if res == 0:30 raise ValueError('Invalid thread id.')31 elif res != 1:32 # if it returns a number greater than one, you're in trouble,33 # and you should call it again with exc=NULL to revert the effect34 pythonapi.PyThreadState_SetAsyncExc(tid, 0)35 raise SystemError('PyThreadState_SetAsyncExc failed.')363738def timeout_func(func, args=None, kwargs=None, timeout=30, q=None):39 """40 Threads-based interruptible runner, but is not reliable and works41 only if everything is pickable.42 """43 # We run `func` in a thread and block on a queue until timeout44 if not q:45 q = Queue()4647 def runner():48 try:49 _res = func(*(args or ()), **(kwargs or {}))50 q.put((None, _res))51 except MyTimeoutError:52 # rasied by async_rasie to kill the orphan threads53 pass54 except Exception as ex:55 q.put((ex, None))5657 tid = start_new_thread(runner, ())5859 try:60 err, res = q.get(timeout=timeout)61 if err:62 raise err63 return res64 except (Queue_Empty, MpTimeoutError):65 raise MyTimeoutError(66 "{0} timeout (taking more than {1} sec)".format(func.__name__, timeout)67 )68 finally:69 try:70 async_raise(tid, MyTimeoutError)71 except (SystemExit, ValueError):72 pass73747576def _fit_distribution():77 # some long operation which run for undetermined time78 for i in range(50):79 time.sleep(i)8081try:82 timeout_func(_fit_distribution, timeout=2)83except MyTimeoutError as ex:84 print(ex)
OUTPUT
1$ python test_low_level_threads.py2_fit_distribution timeout (taking more than 2 sec)
Pros
- Very Little Overhead.
- Exceptions re-raised will retain their traceback
Cons
- Only compatible with CPython implementations, as we are using low-level CPython APIs.
Conclusion
Although we could have use processes form multiprocessing
module instead of threads
but overhead for them is very high. If execution time is not your priority
then second method (threading
module) would be appropriate as it will run across most of
the python implementations.
To solve the problem in dfit
I used the last method (_threads
recipe) for the
performance reasons. Also with little modification in _threads
recipe, I can spawn
multiple raw threads sharing a common Queue
to run _fit_distribution
in parallel
with timeout. Something like this,
1def timeout_func(func, args_list=None, timeout=30, q=None):2 if not q:3 q = Queue()45 ...67 num_threads = len(args_list)8 tids = []9 for args in args_list:10 tids.append(start_new_thread(runner, args)1112 ...