利用Python快速实现一个线程池,非常简单

利用Python快速实现一个线程池,非常简单

雷猴啊,兄弟们!今天来展示一下如何用Python快速实现一个线程池。

; 一、序言

当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程。但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置。

比如我们实现一个有 10 个线程的线程池,这样可以并发地处理 10 个任务,每个线程将任务执行完之后,便去执行下一个任务。通过使用线程池,可以避免因线程创建过多而导致资源耗尽,而且任务在执行时的生命周期也可以很好地把控。

而线程池的实现方式也很简单,但这里我们不打算手动实现,因为 Python 提供了一个标准库 concurrent.futures,已经内置了对线程池的支持。所以本篇文章,我们就来详细介绍一下该模块的用法。

二、正文

1、Future 对象

当我们往线程池里面提交一个函数时,会分配一个线程去执行,同时立即返回一个 Future 对象。通过 Future 对象可以监控函数的执行状态,有没有出现异常,以及有没有执行完毕等等。如果函数执行完毕,内部便会调用 future.set_result 将返回值设置到 future 里面,然后外界便可调用 future.result 拿到返回值。

除此之外 future 还可以绑定回调,一旦函数执行完毕,就会以 future 为参数,自动触发回调。所以 future 被称为未来对象,可以把它理解为函数的一个容器,当我们往线程池提交一个函数时,会立即创建相应的 future 然后返回。函数的执行状态什么的,都通过 future 来查看,当然也可以给它绑定一个回调,在函数执行完毕时自动触发。

那么下面我们就来看一下 future 的用法,文字的话理解起来可能有点枯燥。

当一个函数提交到线程池运行时,立即返回一个对象。

[En]

When a function is submitted to the thread pool to run, an object is immediately returned.

这个对象就叫做 Future 对象,里面包含了函数的执行状态等等
当然我们也可以手动创建一个Future对象。

from concurrent.futures import Future

创建 Future 对象 future
future = Future()

给 future 绑定回调
Python学习交流群: 279199867
def callback(f: Future):
    print("当set_result的时候会执行回调,result:",
          f.result())

future.add_done_callback(callback)
通过 add_done_callback 方法即可给 future 绑定回调
调用的时候会自动将 future 作为参数
如果需要多个参数,请使用PARTIAL函数<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>If you need more than one parameter, use a partial function</font>*</details>

回调函数将在何时执行?<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>When will the callback function be executed?</font>*</details>
显然是当 future 执行 set_result 的时候
如果 future 是向线程池提交函数时返回的
那么当函数执行完毕时会自动执行 future.set_result(xx)
并将自身的返回设置进去
而这里的 future 是我们手动创建的,因此需要手动执行
future.set_result("嘿嘿")

当set_result的时候会执行回调,result: 嘿嘿

需要注意的是:只能执行一次 set_result,但是可以多次调用 result 获取结果。

from concurrent.futures import Future

future = Future()
future.set_result("哼哼")

print(future.result())  # 哼哼
print(future.result())  # 哼哼
print(future.result())  # 哼哼

执行 future.result() 之前一定要先 set_result,否则会一直处于阻塞状态。当然 result 方法还可以接收一个 timeout 参数,表示超时时间,如果在指定时间内没有获取到值就会抛出异常。

2、提交函数自动创建 Future 对象

我们上面是手动创建的 Future 对象,但工作中很少会手动创建。我们将函数提交到线程池里面运行的时候,会自动创建 Future 对象并返回。这个 Future 对象里面就包含了函数的执行状态,比如此时是处于暂停、运行中还是完成等等,并且函数在执行完毕之后,还会调用 future.set_result 将自身的返回值设置进去。

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"{name} 睡了 {n} 秒"

创建一个线程池
里面还可以指定 max_workers 参数,表示最多创建多少个线程
如果未指定,则将为每个提交的函数创建一个线程<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>If not specified, a thread will be created for each function submitted</font>*</details>
executor = ThreadPoolExecutor()

通过 submit 即可将函数提交到线程池,一旦提交,就会立刻运行
因为打开了一个新线程,所以主线程将继续执行<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>Because a new thread is opened, the main thread continues to execute</font>*</details>
至于 submit 的参数,按照函数名,对应参数提交即可
切记不可写成task("古明地觉", 3),这样就变成调用了
future = executor.submit(task, "屏幕前的你", 3)

由于函数里面出现了 time.sleep,并且指定的 n 是 3
所以函数内部会休眠 3 秒,显然此时处于运行状态
print(future)
""""""

我们说 future 相当于一个容器,包含了内部函数的执行状态
函数是否正在运行中
print(future.running())
"""
True
"""
函数是否执行完毕
print(future.done())
"""
False
"""

主程序也 sleep 3 秒
time.sleep(3)

显然,此时已经执行了该函数。<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>Obviously, the function has been executed at this time.</font>*</details>
并且打印结果还告诉我们返回值类型是 str
print(future)
""""""

print(future.running())
"""
False
"""
print(future.done())
"""
True
"""

函数执行完毕时,会将返回值设置在 future 里
也就是说一旦执行了 future.set_result
那么就表示函数执行完毕了,然后外界可以调用 result 拿到返回值
print(future.result())
"""
屏幕前的你 睡了 3 秒
"""

这里再强调一下 future.result(),这一步是会阻塞的,举个例子:

提交函数
future = executor.submit(task, "屏幕前的你", 3)
start = time.perf_counter()
future.result()
end = time.perf_counter()
print(end - start)  # 3.00331525

可以看到,future.result() 这一步花了将近 3s。其实也不难理解,future.result() 是干嘛的?就是为了获取函数的返回值,可函数都还没有执行完毕,它又从哪里获取呢?所以只能先等待函数执行完毕,将返回值通过 set_result 设置到 future 里面之后,外界才能调用 future.result() 获取到值。

如果不想一直等待,可以在获取值时传入超时。

[En]

If you don’t want to wait all the time, you can pass in a timeout when getting the value.

from concurrent.futures import (
    ThreadPoolExecutor,
    TimeoutError
)
import time

def task(name, n):
    time.sleep(n)
    return f"{name} 睡了 {n} 秒"

executor = ThreadPoolExecutor()
future = executor.submit(task, "屏幕前的你", 3)
try:
    # 1 秒之内获取不到值,抛出 TimeoutError
    res = future.result(1)
except TimeoutError:
    pass

再 sleep 2 秒,显然函数执行完毕了
time.sleep(2)
获取返回值
print(future.result())
"""
屏幕前的你 睡了 3 秒
"""

当然,这还不够智能,因为我们不知道函数何时结束执行。因此,最好的方法是绑定回调,并在函数完成时自动触发回调。

[En]

Of course, this is not smart enough, because we don’t know when the function will finish executing. So the best way is to bind a callback and trigger the callback automatically when the function is finished.

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"{name} 睡了 {n} 秒"

def callback(f):
    print(f.result())

executor = ThreadPoolExecutor()
future = executor.submit(task, "屏幕前的你", 3)
绑定回调,3 秒之后自动调用
future.add_done_callback(callback)
"""
屏幕前的你 睡了 3 秒
"""

需要注意的是,在调用 submit 方法之后,提交到线程池的函数就已经开始执行了。而不管函数有没有执行完毕,我们都可以给对应的 future 绑定回调。

如果函数完成之前添加回调,那么会在函数完成后触发回调。如果函数完成之后添加回调,由于函数已经完成,代表此时的 future 已经有值了,或者说已经 set_result 了,那么会立即触发回调。

3、future.set_result 到底干了什么事情

当函数执行完毕之后,会执行 set_result,那么这个方法到底干了什么事情呢?

我们看到 future 有两个被保护的属性,分别是 _result 和 _state。显然 _result 用于保存函数的返回值,而 future.result() 本质上也是返回 _result 属性的值。而 _state 属性则用于表示函数的执行状态,初始为 PENDING,执行中为 RUNING,执行完毕时被设置为 FINISHED。

调用 future.result() 的时候,会判断 _state 的属性,如果还在执行中就一直等待。当 _state 为 FINISHED 的时候,就返回 _result 属性的值。

; 4、提交多个函数

我们一次只提交一个函数,但实际上我们可以提交任意多个函数。让我们来看看:

[En]

We only submitted one function at a time, but we can actually submit as many as we want. Let’s take a look at:

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, n):
    time.sleep(n)
    return f"{name} 睡了 {n} 秒"

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 3),
           executor.submit(task, "屏幕前的你", 4),
           executor.submit(task, "屏幕前的你", 1)]
此时都处于running
print(futures)
"""
[,
 ,
 ]
"""

time.sleep(3)
主程序 sleep 3s 后
futures[0]和futures[2]处于 finished
futures[1]仍处于 running
print(futures)
"""
[,
 ,
 ]
"""

如果是多个函数,要如何拿到返回值呢?很简单,遍历 futures 即可。

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 5),
           executor.submit(task, "屏幕前的你", 2),
           executor.submit(task, "屏幕前的你", 4),
           executor.submit(task, "屏幕前的你", 3),
           executor.submit(task, "屏幕前的你", 6)]

for future in futures:
    print(future.result())
"""
屏幕前的你 睡了 5 秒
屏幕前的你 睡了 2 秒
屏幕前的你 睡了 4 秒
屏幕前的你 睡了 3 秒
屏幕前的你 睡了 6 秒
"""

这里面有一些值得说一说的地方,首先 futures 里面有 5 个 future,记做 future1, future2, future3, future4, future5。

当使用 for 循环遍历的时候,实际上会依次遍历这 5 个 future,所以返回值的顺序就是我们添加的函数的顺序。由于 future1 对应的函数休眠了 5s,那么必须等到 5s 后,future1 里面才会有值。

但这五个函数是并发执行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定会先执行完毕,然后执行 set_result,将返回值设置到对应的 future 里。

但 Python 的 for 循环不可能在第一次迭代还没有结束,就去执行第二次迭代。因为 futures 里面的几个 future 的顺序已经一开始就被定好了,只有当第一个 future.result() 执行完成之后,才会执行第二个 future.result(),以及第三个、第四个。

因此即便后面的函数已经执行完毕,但由于 for 循环的顺序,也只能等着,直到前面的 future.result() 执行完毕。所以当第一个 future.result() 结束时,后面三个 future.result() 会立刻输出,因为它们内部的函数已经执行结束了。

而最后一个 future,由于内部函数 sleep 了 6 秒,因此要再等待 1 秒,才会打印 future.result()。

5、使用 map 来提交多个函数

使用 submit 提交函数会返回一个 future,并且还可以给 future 绑定一个回调。但如果不关心回调的话,那么还可以使用 map 进行提交。

executor = ThreadPoolExecutor()
map 内部也是使用了 submit
results = executor.map(task,
                       ["屏幕前的你"] * 3,
                       [3, 1, 2])
并且返回的是迭代器
print(results)
""""""

此时遍历得到的是不再是 future
而是 future.result()
for result in results:
    print(result)
"""
屏幕前的你 睡了 3 秒
屏幕前的你 睡了 1 秒
屏幕前的你 睡了 2 秒
"""

可以看到,当使用for循环的时候,map 执行的逻辑和 submit 是一样的。唯一的区别是,此时不需要再调用 result 了,因为返回的就是函数的返回值。

或者我们直接调用 list 也行。

executor = ThreadPoolExecutor()
results = executor.map(task,
                       ["屏幕前的你"] * 3,
                       [3, 1, 2])
print(list(results))
"""
['屏幕前的你 睡了 3 秒',
 '屏幕前的你 睡了 1 秒',
 '屏幕前的你 睡了 2 秒']
"""

results 是一个生成器,调用 list 的时候会将里面的值全部产出。由于 map 内部还是使用的 submit,然后通过 future.result() 拿到返回值,而耗时最长的函数需要 3 秒,因此这一步会阻塞 3 秒。3 秒过后,会打印所有函数的返回值。

6、按照顺序等待执行

当获得上述返回值时,将按照提交函数的顺序获得返回值。如果我想让函数先完成执行,并首先获得该函数的返回值,我应该怎么做?

[En]

When the return value is obtained above, it is obtained in the order in which the function is submitted. What should I do if I want the function to finish executing first and get the return value of that function first?

from concurrent.futures import (
    ThreadPoolExecutor,
    as_completed
)
import time

def task(name, n):
    time.sleep(n)
    return f"{name} 睡了 {n} 秒"

executor = ThreadPoolExecutor()
futures = [executor.submit(task, "屏幕前的你", 5),
           executor.submit(task, "屏幕前的你", 2),
           executor.submit(task, "屏幕前的你", 1),
           executor.submit(task, "屏幕前的你", 3),
           executor.submit(task, "屏幕前的你", 4)]
for future in as_completed(futures):
    print(future.result())
"""
屏幕前的你 睡了 1 秒
屏幕前的你 睡了 2 秒
屏幕前的你 睡了 3 秒
屏幕前的你 睡了 4 秒
屏幕前的你 睡了 5 秒
"""

此时,谁先完成,谁最先返回。

[En]

At this time, who finishes first, who returns first.

7、取消一个函数的执行

我们通过 submit 可以将函数提交到线程池中执行,但如果我们想取消该怎么办呢?

executor = ThreadPoolExecutor()
future1 = executor.submit(task, "屏幕前的你", 1)
future2 = executor.submit(task, "屏幕前的你", 2)
future3 = executor.submit(task, "屏幕前的你", 3)
取消函数的执行
会将 future 的 _state 属性设置为 CANCELLED
future3.cancel()
查看是否被取消
print(future3.cancelled())  # False

问题来了,调用 cancelled 方法的时候,返回的是False,这是为什么?很简单,因为函数已经被提交到线程池里面了,函数已经运行了。而只有在还没有运行时,取消才会成功。

可这不矛盾了吗?函数一旦提交就会运行,只有不运行才会取消成功,这怎么办?还记得线程池的一个叫做 max_workers 的参数吗?用来控制线程池内的线程数量,我们可以将最大的线程数设置为2,那么当第三个函数进去的时候,就不会执行了,而是处于暂停状态。

executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task, "屏幕前的你", 1)
future2 = executor.submit(task, "屏幕前的你", 2)
future3 = executor.submit(task, "屏幕前的你", 3)
如果可以在池中创建空闲线程<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>If idle threads can be created in the pool</font>*</details>
那么函数一旦提交就会运行,状态为 RUNNING
print(future1._state)  # RUNNING
print(future2._state)  # RUNNING
但 future3 内部的函数还没有运行
因为池子里无法创建新的空闲线程了,所以状态为 PENDING
print(future3._state)  # PENDING
如果该函数不运行,则取消该函数的执行<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>Cancel the execution of the function, provided that the function does not run</font>*</details>
会将 future 的 _state 属性设置为 CANCELLED
future3.cancel()
查看是否被取消
print(future3.cancelled())  # True
print(future3._state)  # CANCELLED

在启动线程池的时候,肯定是需要设置容量的,不然处理几千个函数要开启几千个线程吗。另外当函数被取消了,就不可以再调用 future.result() 了,否则的话会抛出 CancelledError。

8、函数执行时出现异常

摆在我们面前的逻辑是在函数正常执行的前提下,但发生了意想不到的事情。如果在函数执行过程中出现异常怎么办?

[En]

The logic in front of us is under the premise that the function is executed normally, but something unexpected happens. What if there is an exception during the execution of the function?

from concurrent.futures import ThreadPoolExecutor

def task1():
    1 / 0

def task2():
    pass

executor = ThreadPoolExecutor(max_workers=2)
future1 = executor.submit(task1)
future2 = executor.submit(task2)
print(future1)
print(future2)
""""""

结果显示 task1 函数执行出现异常了
那么,如何获得这个例外呢?<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>So how do you get this exception?</font>*</details>
print(future1.exception())
print(future1.exception().__class__)
"""
division by zero

"""

如果执行没有出现异常,那么 exception 方法返回 None
print(future2.exception())  # None

注意:如果函数在执行过程中发生异常<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>Note: if an exception occurs in the execution of the function</font>*</details>
那么调用 result 方法会将异常抛出来
future1.result()
"""
Traceback (most recent call last):
  File "...", line 4, in task1
    1 / 0
ZeroDivisionError: division by zero
"""

出现异常时,调用 future.set_exception 将异常设置到 future 里面,而 future 有一个 _exception 属性,专门保存设置的异常。当调用 future.exception() 时,也会直接返回 _exception 属性的值。

9、等待所有函数执行完毕

假设我们向线程池提交了许多函数,如果我们希望在主程序继续执行之前执行提交的函数,该怎么办?事实上,有很多计划:

[En]

Suppose we submit a lot of functions to the thread pool, and what if we want the committed functions to be executed before the main program can proceed? In fact, there are many plans:

第一种:

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"sleep {n}"

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)

这里是不会阻塞的
print("start")
遍历所有的 future,并调用其 result 方法
这样,在所有功能都执行完之前,我们不会倒下。<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>In this way, we will not go down until all the functions have been executed.</font>*</details>
for future in [future1, future2, future3]:
    print(future.result())
print("end")
"""
start
sleep 5
sleep 2
sleep 4
end
"""

第二种:

from concurrent.futures import (
    ThreadPoolExecutor,
    wait
)
import time

def task(n):
    time.sleep(n)
    return f"sleep {n}"

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)

return_when 有三个可选参数
FIRST_COMPLETED:当任意一个任务完成或者取消
FIRST_EXCEPTION:当任意一个任务出现异常
                 如果都没出现异常等同于ALL_COMPLETED
ALL_COMPLETED:所有任务都完成,默认是这个值
fs = wait([future1, future2, future3],
          return_when="ALL_COMPLETED")
此时返回的fs是DoneAndNotDoneFutures类型的namedtuple
里面有两个值,一个是done,一个是not_done
print(fs.done)
"""
{,
 ,
 }
"""

print(fs.not_done)
"""
set()
"""
for f in fs.done:
    print(f.result())
"""
start
sleep 5
sleep 2
sleep 4
end
"""

第三种:

使用上下文管理
with ThreadPoolExecutor() as executor:
    future1 = executor.submit(task, 5)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 4)

所有函数执行完毕(with语句结束)后才会往下执行

第四种:

executor = ThreadPoolExecutor()

future1 = executor.submit(task, 5)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 4)
所有函数在完成之前都不会执行。<details><summary>*<font color='gray'>[En]</font>*</summary>*<font color='gray'>All functions will not be executed until they are finished.</font>*</details>
executor.shutdown()

三、小结

如果我们需要启动多线程来执行函数,我们不妨使用线程池。每次调用函数时,都会从池中取出一个线程,并在执行该函数后将该线程放回池中,以供其他函数执行。如果池为空,或者如果无法创建新的空闲线程,则下一个函数将不得不等待。

[En]

If we need to start multithreading to execute the function, we might as well use a thread pool. Every time a function is called, a thread is taken from the pool, and after the function is executed, the thread is put back into the pool for other functions to execute. If the pool is empty, or if a new idle thread cannot be created, the next function will have to wait.

最后,concurrent.futures 不仅可以用于实现线程池,还可以用于实现进程池。两者的 API 是一样的:

from concurrent.futures import ProcessPoolExecutor
import time

def task(n):
    time.sleep(n)
    return f"sleep {n}"

executor = ProcessPoolExecutor()
Windows 上需要加上这一行
if __name__ == '__main__':
    future1 = executor.submit(task, 5)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 4)
    executor.shutdown()
    print(future1.result())
    print(future2.result())
    print(future3.result())
"""
sleep 5
sleep 2
sleep 4
"""

线程池和进程池的 API 是一致的,但工作中很少会创建进程池。

今天的分享到此为止,兄弟们。债台高筑见!

[En]

So much for today’s sharing, brothers. See you in debt!

文章不过瘾?试试看视频吧!
Python爬虫入门到实战全集100集教程:代码总是学完就忘记?100个爬虫实战项目!让你沉迷学习丨学以致用丨下一个Python大神就是你!Python tkinter 合集:全网最全python tkinter教程!包含所有知识点!轻松做出好看的tk程序!

Original: https://www.cnblogs.com/hahaa/p/16547372.html
Author: 轻松学Python
Title: 利用Python快速实现一个线程池,非常简单

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/498903/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球