Navigate back to the homepage

Timeout on function call in Python

Ashutosh Varma
December 16th, 2020 · 3 min read

For the past few days I was working on dfit, a python module which find the best probability distribution for your observations. Luckily scipy.stats has implemented most of the distributions and I just have to fit all the distributions and compute their errors. Obviously it was not that easy, their were other complexities.

Of all the problems this one was particularly irritating and time consuming. To put you in context of my problem understand that their is around 100 distributions in scipy and I had to process data with all of them for every user input. Also few distributions in scipy.stats sometimes don’t converge, function call just hangs for a long time.

Two things became pretty clear from above, that I must :-

Lets assume _fit_distribution(dist) is the function to process a single distribution

  • Process distributions, run _fit_distribution() in parallel
  • Implement timeout on _fit_distribution() function calls.
  • Implement above for best possible performance.

Note:

If you don’t know what cython and GIL is, you can skip this part.

Cython and OpenMP

Since I was using cython form the start of this project, So Initially I thought of using native parallelism, OpenMP which cython supports (prange). Benefits of using this will obviously be execution speed as it will run without GIL (will not block other threads while running unlike python threads, all threads can run at same time). It is fairly simple to write in cython, below code will create separate thread for each distribution using OpenMP.

1from cython.parallel import prange
2...
3# assume distributions are in list 'dists'
4num_of_native_threads = len(dists)
5for i in prange(num_of_native_threads, nogil=True):
6 _fit_distribution(dists[i])
Cython GIL error when using python functions
    inside no GIL block
Cython GIL error when using python functions inside no GIL block

If you tried above code it will fail to compile and give you errors like “Operation not allowed without gil” and bunch of other errors. It was expected as we cannot access python objects without acquiring GIL. Since scipy.stats does not expose any of these functions using cython pxds so their is no workaround this (unless you port all of scipy.stats in cython).

(Although we can fix this using with gil: blocks but it will be then same as python threads)

Also OpenMP does not support any sort of timeouts.

Since cython’s prange cannot access python objects without GIL and OpenMP does not support any sort of timeouts, we cannot use it.

If your function can run without GIL then it might actually be a good solution but again you have to figure out timeouts in OpenMP (or just simply block on a mutex)

Timeouts in Python 3

There are many approaches to achieve this behavior, each with their pros and cons.

1. Using Signals

This is most simple and but works only in UNIX systems which has signal module. Also you can call it from main thread only, so it doesn’t play well with threads.

1# test_signals.py
2# only available in unix systems
3import functools
4import signal
5import time
6
7def timeout(max_timeout, default=None):
8 """Timeout decorator, parameter in seconds."""
9 def timeout_decorator(func):
10 """Wrap the original function."""
11 @functools.wraps(func)
12 def func_wrapper(*args, **kwargs):
13 """Timeout using signal."""
14 class MyTimeoutError(Exception):
15 pass
16
17 def handler(signum, frame):
18 raise MyTimeoutError(
19 "{0} - Timeout after {1} seconds".format(func.__name__, max_timeout)
20 )
21
22 # set the timeout handler
23 signal.signal(signal.SIGALRM, handler)
24 signal.alarm(max_timeout)
25 result = default
26 try:
27 result = func(*args, **kwargs)
28 except MyTimeoutError as exc:
29 # handle the timeout
30 print(str(exc))
31 finally:
32 # Cancel the timer
33 signal.alarm(0)
34
35 return result
36 return func_wrapper
37 return timeout_decorator
38
39
40
41@timeout(2)
42def _fit_distribution():
43 # some long operation which run for undetermined time
44 for i in range(50):
45 time.sleep(i)
46
47_fit_distribution()

OUTPUT

1$ python test_signals.py
2_fit_distribution - Timeout after 2 seconds

Pros

  • Almost no overhead as no thread or process is spawn.

Cons

  • Only works in UNIX systems
  • Will not work if function catch all exceptions using try catch.
    1@timeout(10)
    2def _fit_distribution():
    3 try:
    4 time.sleep(1000)
    5 except Exception:
    6 pass
  • signals are global so they might interfere with third party modules.
  • signal works only in main thread so they are not recommenced when using threads

2. Using Python Threads - threading

Using high-level threads module - threading we can emulate timeouts for our functions. This recipe is inspired from answers of this stackoverflow question.

How this works?

  • We spawn a thread for our function and wait for timeout secs for it to return.
  • If it returns within timeout we forward the value to the caller.
  • If some exception is raised inside the thread we save that exception info and re-propagate it back to the caller.
  • If it does not return within timeout and no exceptions are raised then we kill the thread by raising MyTimeoutError inside it.
1# test_threading.py
2import threading
3import sys
4import time
5
6class MyTimeoutError(Exception):
7 # exception for our timeouts
8 pass
9
10def timeout_func(func, args=None, kwargs=None, timeout=30, default=None):
11 """This function will spawn a thread and run the given function
12 using the args, kwargs and return the given default value if the
13 timeout is exceeded.
14 http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call
15 """
16
17 class InterruptableThread(threading.Thread):
18 def __init__(self):
19 threading.Thread.__init__(self)
20 self.result = default
21 self.exc_info = (None, None, None)
22
23 def run(self):
24 try:
25 self.result = func(*(args or ()), **(kwargs or {}))
26 except Exception as err:
27 self.exc_info = sys.exc_info()
28
29 def suicide(self):
30 raise MyTimeoutError(
31 "{0} timeout (taking more than {1} sec)".format(func.__name__, timeout)
32 )
33
34 it = InterruptableThread()
35 it.start()
36 it.join(timeout)
37
38 if it.exc_info[0] is not None:
39 a, b, c = it.exc_info
40 raise Exception(a, b, c) # communicate that to caller
41
42 if it.isAlive():
43 it.suicide()
44 raise RuntimeError
45 else:
46 return it.result
47
48
49def _fit_distribution():
50 # some long operation which run for undetermined time
51 for i in range(50):
52 time.sleep(i)
53
54try:
55 timeout_func(_fit_distribution, timeout=2)
56except MyTimeoutError as ex:
57 print(ex)

OUTPUT

1$ python test_threading.py
2_fit_distribution timeout (taking more than 2 sec)

Pros

  • Cross platform, unlike signal method.

Cons

  • Little overhead of using threads.

3. Using Python Threads - low-level _threads

In previous method we were sub-classing threading.Thread to implement custom threads which can be safely killed and propagate errors back to caller. But using high level managed threading module threads add some overhead.

So instead of waiting for threads to return, we can instead block on a Queue which low-level threads will fill.

  • We initialize a thread-sync queue, queue.Queue()
  • We spawn thread using low-level _threads module which will fill the queue with function return and exception information.
  • We block on Queue for timeout seconds.
    • If Queue is still empty (function does not return or raise exception within timeout), we kill the thread by raising exception inside it using async_raise()
    • If Queue is not empty and we found exception, we re-raise the exception back to caller.
    • If Queue is not empty and no exception is found, we simply return the value.

There will be very small overhead for this, as in this method we are using low-level CPython APIs.

1# test_low_level_threads.py
2import sys
3import traceback
4from multiprocessing import TimeoutError as MpTimeoutError
5from queue import Empty as Queue_Empty
6from queue import Queue
7from _thread import start_new_thread
8from ctypes import c_long
9from ctypes import py_object
10from ctypes import pythonapi
11
12class MyTimeoutError(Exception):
13 pass
14
15
16def async_raise(tid, exctype=Exception):
17 """
18 Raise an Exception in the Thread with id `tid`. Perform cleanup if
19 needed.
20 Based on Killable Threads By Tomer Filiba
21 from http://tomerfiliba.com/recipes/Thread2/
22 license: public domain.
23 """
24 assert isinstance(tid, int), 'Invalid thread id: must an integer'
25
26 tid = c_long(tid)
27 exception = py_object(Exception)
28 res = pythonapi.PyThreadState_SetAsyncExc(tid, exception)
29 if res == 0:
30 raise ValueError('Invalid thread id.')
31 elif res != 1:
32 # if it returns a number greater than one, you're in trouble,
33 # and you should call it again with exc=NULL to revert the effect
34 pythonapi.PyThreadState_SetAsyncExc(tid, 0)
35 raise SystemError('PyThreadState_SetAsyncExc failed.')
36
37
38def timeout_func(func, args=None, kwargs=None, timeout=30, q=None):
39 """
40 Threads-based interruptible runner, but is not reliable and works
41 only if everything is pickable.
42 """
43 # We run `func` in a thread and block on a queue until timeout
44 if not q:
45 q = Queue()
46
47 def runner():
48 try:
49 _res = func(*(args or ()), **(kwargs or {}))
50 q.put((None, _res))
51 except MyTimeoutError:
52 # rasied by async_rasie to kill the orphan threads
53 pass
54 except Exception as ex:
55 q.put((ex, None))
56
57 tid = start_new_thread(runner, ())
58
59 try:
60 err, res = q.get(timeout=timeout)
61 if err:
62 raise err
63 return res
64 except (Queue_Empty, MpTimeoutError):
65 raise MyTimeoutError(
66 "{0} timeout (taking more than {1} sec)".format(func.__name__, timeout)
67 )
68 finally:
69 try:
70 async_raise(tid, MyTimeoutError)
71 except (SystemExit, ValueError):
72 pass
73
74
75
76def _fit_distribution():
77 # some long operation which run for undetermined time
78 for i in range(50):
79 time.sleep(i)
80
81try:
82 timeout_func(_fit_distribution, timeout=2)
83except MyTimeoutError as ex:
84 print(ex)

OUTPUT

1$ python test_low_level_threads.py
2_fit_distribution timeout (taking more than 2 sec)

Pros

  • Very Little Overhead.
  • Exceptions re-raised will retain their traceback

Cons

  • Only compatible with CPython implementations, as we are using low-level CPython APIs.

Conclusion

Although we could have use processes form multiprocessing module instead of threads but overhead for them is very high. If execution time is not your priority then second method (threading module) would be appropriate as it will run across most of the python implementations.

To solve the problem in dfit I used the last method (_threads recipe) for the performance reasons. Also with little modification in _threads recipe, I can spawn multiple raw threads sharing a common Queue to run _fit_distribution in parallel with timeout. Something like this,

1def timeout_func(func, args_list=None, timeout=30, q=None):
2 if not q:
3 q = Queue()
4
5 ...
6
7 num_threads = len(args_list)
8 tids = []
9 for args in args_list:
10 tids.append(start_new_thread(runner, args)
11
12 ...

Join our email list and get notified about new content

Be the first to receive our latest content with the ability to opt-out at anytime. We promise to not spam your inbox or share your email with any third parties.

More articles from Ashutosh Varma

How to backup and restore your PGP keys with GPG ?

Backup the public and secret keyrings and trust database For minimal backup you only need to save your private key, assuming your public key…

August 20th, 2020 · 1 min read

Running Puppeteer in WSL2 ClearLinux

Recently gatsby-plugin-prefetch-google-fonts one of the Gatsby plugin that I have been using in this blog to pre-fetch google fonts…

December 20th, 2020 · 1 min read
© 2020 Ashutosh Varma
Link to $https://github.com/ashutoshvarmaLink to $https://www.linkedin.com/in/varmaashutosh/Link to $https://stackoverflow.com/users/13944127/Link to $mailto:ashutoshvarma11@live.com