原文地址:https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html

文章较长,建议收藏后再看。

使用Python编写并发代码时并不流畅,我们需要一些额外的思考,你手中的任务是I/O密集类型还是CPU密集型。此外,由于全局解释锁的存在,进一步增加了编写真正的并发代码的难度。

在Python中编写并发代码常常这样做:

如果任务是I/O密集的,可以使用标准库的threading模块,如果任务是CPU密集的,则multiprocessing更合适。threadingmultiprocessing的API提供了很多功能,但他们都是很低级别的代码,在我们业务核心逻辑上增加了额外的复杂性。

Python标准库还包含一个名为concurrent.futrures的模块。在Python3.2中添加了该模块,为开发人员提供了更高级别的异步任务接口。它是在threadingmultiprocessing模块之上的通用抽象层,提供了使用线程池和进程池运行任务的一些接口。当你只需要并发的运行一段完整的代码,而不想使用threadingmultiprocessing增加额外的逻辑,破坏代码的完整性时,这个模块是一个好的解决方案。

concurrent.futures 剖析

根据官方文档,

The concurrent.futures module provides a high-level interface for asynchronously executing callables.

这意味着你可以使用更高阶别的接口来使用线程和进程。该模块提供了一个名为Executor的抽象类,你不能直接实例化它,需要使用它提供的两个子类之一来跑任务。

1
2
3
4
5
6
7
8
9
Executor (Abstract Base Class)
│
├── ThreadPoolExecutor
│   │A concrete subclass of the Executor class to
│   │manage I/O bound tasks with threading underneath
│
├── ProcessPoolExecutor
│   │A concrete subclass of the Executor class to
│   │manage CPU bound tasks with multiprocessing underneath

在模块内,这两个类与服务池交互并管理工作单元(workers)。Future类用于管理工作单元的结果。使用工作池时,应用程序会创建适当的Executor类(ThreadPoolExecutorProcessPoolExecutor)的实例,然后提交给他们任务运行。提交后,将返回一个Future类的实例。当需要任务结果时,可以使用该Future对象进行阻塞,直到拿到任务结果为止。

Executor Objects

由于ThreadPoolExecutorProcessPoolExecutor有相同的API接口,我们主要来看下它们提供的两个方法。

submit(func, args, *kwargs)

func 可调度对象(执行函数),执行参数args, kwargs。返回一个可调用的 Future 对象。

1
2
3
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

map(func, *iterables, timeout=None, chunksize=1)

除了下边部分其他同submit:

  • 可迭代对象数据是立即生成的,不像其他迭代器是惰性的。
  • func 是异步执行的,并且可以同时进行对func的多次调用。

返回的迭代器调用__next__()方法时,将会引发concurrent.futures.TimeoutError。从开始调用到timeout时间后,结果将不获取。timeout可以是intfloat,如果未指定或者是None,则等待时间没有限制。

如果func调用引发异常,则从迭代器中检索其结果时将抛出该异常。

使用ProcessPoolExecutor时,此方法可将迭代项分为多个块,将其作为单独的任务块提交给池。这些块的大小可以通过chunksize来设置。相比非常长的并发列表,使用chunksize进行分块提交执行可以显著提高性能。

使用ThreadPoolExecutor时,chunksize无效。

运行并发任务的通用方法

我的许多脚本包含类似这样的代码:

1
2
for task in get_tasks():
    perform(task)

get_tasks返回一个可迭代对象,该迭代对象包含有特定功能任务的参数相关信息。perform函数依次运行,每次只能运行一个。由于逻辑是顺序执行的,因此很容易理解。当任务数量少或单个任务的执行时间少、复杂度较低时,没有什么大问题。但是当任务数量巨大或单个任务很耗时时,整端代码性能就变得非常低下。

根据一般经验,ThreadPoolExecutor 在主要受I/O限制的任务时使用,例如发起多个http请求、将大量文件保存到磁盘等。ProcessPoolExecutor 在主要受CPU限制的任务中应用。如大运算量的任务、对大量图片处理应用和一次处理多个文本文件等。

使用 Executor.submit 运行任务

当你有许多任务时,可一次性执行它们,然后等待它们全部完成,再收集结果。

1
2
3
4
5
6
7
8
import concurrent.futures


with concurrent.futures.Executor() as executor:
    futures = {executor.submit(perform, task) for task in get_tasks()}

    for fut in concurrent.futures.as_completed(futures):
        print(f"The outcome is {fut.result()}")

这里你首先创建一个执行器,该执行器在单独的进程或线程中执行所有任务。使用with语句将创建一个上下文管理器,该管理器可确保在完成后通过隐士调用executor.shutdown()函数来清理掉所有线程或进程。

在真实的代码中,你需要将Executor替换为ThreadPoolExecutorProcessPoolExecutor这些可调用的类。然后使用推导式来开始所有的任务,使用executor.submit()来调度每一个任务开始执行。每个任务执行后,会生成一个future对象。一旦所有的任务都完成了,concurrent.futures.as_completed()方法便可获取所有任务的结果。executor.result()返回给你任务执行函数的结果值或抛出任务失败异常。

executor.submit()方法是异步来调用执行任务的,并不包含任务的任何上下文信息。所以如果你想同时获取每个任务的信息,你需要自己处理。

1
2
3
4
5
6
7
8
9
import concurrent.futures


with concurrent.futures.Executor() as executor:
    futures = {executor.submit(perform, task): task for task in get_tasks()}

    for fut in concurrent.futures.as_completed(futures):
        original_task = futures[fut]
        print(f"The result of {original_task} is {fut.result()}")

注意,futures是一个任务结果对应任务的一个字典结构。

使用Executor.map来运行任务

按照预定的顺序收集结果的另一种方式是使用executor.map()方法:

1
2
3
4
5
6
import concurrent.futures


with concurrent.futures.Executor() as executor:
    for arg, res in zip(get_tasks(), executor.map(perform, get_tasks())):
        print(f"The result of {arg} is {res}")

注意,map方法会一次性执行所有任务,不会像正常迭代器那样有惰性。如果任务在执行的时候有问题,它会立即抛出,不再继续执行。

在Python3.5+中,executor.map()可以设置一个参数chunksize。当我们使用ProcessPoolExecutor时,会使用该参数设置的值来分批次的执行任务。每次执行chunksize个任务,该参数默认为1。当使用ThreadPoolExecutor时,参数无效。

真实的例子

在开始例子之前,我们先写一个简单的装饰器,来帮助我们更好的查看执行时间。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time
from functools import wraps


def timeit(method):
    @wraps(method)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = method(*args, **kwargs)
        end_time = time.time()
        print(f"{method.__name__} => {(end_time-start_time)*1000} ms")

        return result
    return wrapper

这个装饰器可以这样用:

1
2
3
@timeit
def func(n):
return list(range(n))

它会打印执行函数的名字和执行时间。

使用多线程下载和保存文件

首先,让我们从这堆URL中下载一些pdf文件,并将它们保存到我们的磁盘。这是一个I/O类型的任务,我们使用ThreadPoolExecutor类来操作。开始之前我们先看下顺序执行的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pathlib import Path
import urllib.request


def download_one(url):
    """
    Downloads the specified URL and saves it to disk
    """

    req = urllib.request.urlopen(url)
    fullpath = Path(url)
    fname = fullpath.name
    ext = fullpath.suffix

    if not ext:
        raise RuntimeError("URL does not contain an extension")

    with open(fname, "wb") as handle:
        while True:
            chunk = req.read(1024)
            if not chunk:
                break
            handle.write(chunk)

    msg = f"Finished downloading {fname}"
    return msg


@timeit
def download_all(urls):
    return [download_one(url) for url in urls]


if __name__ == "__main__":
    urls = (
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
    )

    results = download_all(urls)
    for result in results:
        print(result)
1
2
3
4
5
6
>>> download_all => 22850.6863117218 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf

在上边的代码块中,我定义了两个方法。download_one方法负责从url下载pdf文件并保存到磁盘。它会检查url中的文件是否有扩展名,如果没有,它会抛出RunTimeError异常。如果有扩展名,它会下载文件,并保存到磁盘。第二个方法download_all仅仅遍历这些URL并依次对URL应用download_one方法。这个顺序执行的代码大概花费了22.8s的时间。现在让我们来看下多线程版本的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from pathlib import Path
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed


def download_one(url):
    """
    Downloads the specified URL and saves it to disk
    """

    req = urllib.request.urlopen(url)
    fullpath = Path(url)
    fname = fullpath.name
    ext = fullpath.suffix

    if not ext:
        raise RuntimeError("URL does not contain an extension")

    with open(fname, "wb") as handle:
        while True:
            chunk = req.read(1024)
            if not chunk:
                break
            handle.write(chunk)

    msg = f"Finished downloading {fname}"
    return msg


@timeit
def download_all(urls):
    """
    Create a thread pool and download specified urls
    """

    with ThreadPoolExecutor(max_workers=13) as executor:
        return executor.map(download_one, urls, timeout=60)


if __name__ == "__main__":
    urls = (
        "http://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf",
    )

    results = download_all(urls)
    for result in results:
        print(result)
1
2
3
4
5
6
>>> download_all => 5042.651653289795 ms
... Finished downloading f1040.pdf
... Finished downloading f1040a.pdf
... Finished downloading f1040ez.pdf
... Finished downloading f1040es.pdf
... Finished downloading f1040sb.pdf

这个并发版本只花费了顺序版本大约1/4的时间。timeout参数是说在这个时间之后,就不能再获取任务结果。max_works参数是启动多少个工作线程,一般为2*multiprocessing.cpu_count()+1。我的机器是6和12线程的,所以我选择启动13个线程。

你也可以尝试使用ProcessPoolExecutor类相同的接口方法来跑上边的代码。但是因为任务本身的特性,还是使用线程的性能稍好。

使用多进程运行CPU密集型任务

下边的示例,一个CPU密集型哈希函数。核心方法是顺序多次执行CPU密集型的哈希算法,另一个方法是多次执行这个核心方法。让我们来看下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import hashlib


def hash_one(n):
    """A somewhat CPU-intensive task."""

    for i in range(1, n):
        hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

    return "done"


@timeit
def hash_all(n):
    """Function that does hashing in serial."""

    for i in range(n):
        hsh = hash_one(n)

    return "done"


if __name__ == "__main__":
    hash_all(20)
1
>>> hash_all => 18317.330598831177 ms

分析hash_onehash_all方法,会发现一个共同点,他们都有一个CPU密集的for循环代码。它大概执行花费了18s,让我们来看下使用ProcessPoolExecutor版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import hashlib
from concurrent.futures import ProcessPoolExecutor


def hash_one(n):
    """A somewhat CPU-intensive task."""

    for i in range(1, n):
        hashlib.pbkdf2_hmac("sha256", b"password", b"salt", i * 10000)

    return "done"


@timeit
def hash_all(n):
    """Function that does hashing in serial."""

    with ProcessPoolExecutor(max_workers=10) as executor:
        for arg, res in zip(range(n), executor.map(hash_one, range(n), chunksize=2)):
            pass

    return "done"


if __name__ == "__main__":
    hash_all(20)
1
>>> hash_all => 1673.842430114746 ms

仔细查看代码,你会发现在hash_one中for循环的代码,仍然是顺序执行的。但是,在hash_all中的for代码块是多进程执行的。这里我用10个工作进程按每个批次2个任务来并发执行。你可以尝试调整工作进程和并发任务数,已到达最佳的性能。这个并发版本比顺序版本快了近11倍。

并发中常遇到的坑

ThreadPoolExecutorProcessPoolExecutor它适用于简单的并发任务,有循环迭代的情况或仅允许运行子程序的情况。如果你的任务需要排队,或者需要用到多进程和多线程,你任然需要使用threadingmultiprocessing模块。

在使用ThreadPoolExecutor时,可能会出现死锁问题。当与Future相关联的可调用对象等待另一个Future的结果时,它们可能永远不会释放对线程的控制从而导致死锁。让我们来看一个示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from concurrent.futures import ThreadPoolExecutor


def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5


def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


with ThreadPoolExecutor(max_workers=2) as executor:
    # here, the future from a depends on the future from b
    # and vice versa
    # so this is never going to be completed
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)

    print("Result from wait_on_b", a.result())
    print("Result from wait_on_a", b.result())

在这个例子中,wait_on_b函数依赖wait_on_a函数的结果。同时wait_on_a函数又依赖wait_on_b函数的结果,代码由于死锁永远不会结束。让我们看一下另一种死锁情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from concurrent.futures import ThreadPoolExecutor


def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())


with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(wait_on_future)
    print(future.result())

上边的代码,在一个线程中,提交了一个子执行任务。在子任务结束之前,不会释放线程,但是线程又被主任务使用,便造成死锁。使用多个线程可解决这个问题,但是这种写法本身就非常糟糕的,不提倡的。

有时候,并发代码的性能可能比顺序执行代码效率低下。发生这种情况的原因有很多:

  • 1、线程用于执行CPU密集的任务。
  • 2、使用多进程处理I/O密集的任务。
  • 3、任务非常繁碎,无法使用多线程或多进程来处理。

生成和释放线程或进程是需要额外开销的。通常线程的操作比进程快的多,但是错误的使用反而会拖慢你的代码性能。下边是一个繁碎的示例,ThreadPoolExecutorProcessPoolExecutor性能均比顺序的性能差。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import math

PRIMES = [num for num in range(19000, 20000)]


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@timeit
def main():
    for number in PRIMES:
        print(f"{number} is prime: {is_prime(number)}")


if __name__ == "__main__":
    main()
1
2
3
4
5
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 67.65174865722656 ms

上边的示例验证列表中的数是否为质数。顺序版本大约花费了67ms完成。但是并发版本却化了 140ms 才完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from concurrent.futures import ThreadPoolExecutor
import math

num_list = [num for num in range(19000, 20000)]


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@timeit
def main():
    with ThreadPoolExecutor(max_workers=13) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
            print(f"{number} is prime: {prime}")


if __name__ == "__main__":
    main()
1
2
3
4
5
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 140.17250061035156 ms

相同的代码,多进程版本:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from concurrent.futures import ProcessPoolExecutor
import math

num_list = [num for num in range(19000, 20000)]


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@timeit
def main():
    with ProcessPoolExecutor(max_workers=13) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, num_list)):
            print(f"{number} is prime: {prime}")


if __name__ == "__main__":
    main()
1
2
3
4
5
>>> 19088 is prime: False
... 19089 is prime: False
... 19090 is prime: False
... ...
... main => 311.3126754760742 ms

从直观上看,检测质数应该是CPU密集型的操作,最终结果却是顺序执行最快。所以,合理的评估任务计算是否值得使用多线程或多进程也是非常重要的。否则,我们写出的代码性能不一定更高效。

参考