前言
有的人说Python中的多线程是假的,是不真实的,把Python的多线程贬的一无是处,但其实对于高IO低计算的场景下,线程、协程的提升是巨大的,例如爬虫项目,可以借助async、concurrent和threading等模块,实现大幅度的性能提升,计算任务可以使用大数据中的mapreduce思想借助multiprocessing模块多进程计算。
介绍 Concurrent 内置库
concurrent 是什么?
Python 中的内置库,他帮助我们快速实现并发编程。
对比其他库:
类型 | 模块 | 介绍 | CPU Cores | 量级 |
---|---|---|---|---|
协程 | async | 协程,一种轻量级应用层的任务调度切换达到多线程效果的模块,由代码和语言的编译器决定何时计算 | 1 | 上万 |
并行 | threading | 多线程,线程由CPU调度何时计算何时处理 | 1 | 数千 |
并发 | multiprocessing | 多进程 | Any | 几十 |
并行、并发 | concurrent | 支持多线程和多进程,很方便的创建线程池,程序员只需要编写函数然后提交,线程池便会自动调度执行 | 1 or Any | 几十 - 数千 |
什么是并发、并行?
打个比方:
并发就是一个人吃三个包子,这个吃一口哪个吃一口直到吃完。
并行就是三个人吃三个包子,一起吃,直到吃完。
包子就是待处理的任务,人就是CPU核心。
了解详细:https://blog.csdn.net/java_zero2one/article/details/51477791
实现一个小案例
import concurrent.futures
import time
# 任务函数
def task(n):
print(f"开始任务 {n}")
time.sleep(n) # 等待 n 秒
print(f"{n} ok")
return n
# 使用 concurrent futures 模块 创建线程池 最大线程3
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交10个任务
print("开始")
for i in range(10):
executor.submit(task, i)
print("任务创建完成")
print("完成")
执行效果:
开始
开始任务 1
开始任务 2
开始任务 3
任务创建完成
1 ok
开始任务 4
2 ok
开始任务 5
3 ok
开始任务 6
4 ok
...
这个简单的小案例实现了10个任务提交给线程池,线程池自动依次执行提交的函数。
当任务1
完成后,马上自动执行了任务4
,非常方便,用户只需要提交任务函数就行了。
创建一个线程池
通过 with
关键字加上 concurrent.futures.ThreadPoolExecutor
类来创建线程池
为什么要用 with
:防止进程池自动销毁,with可以阻塞主线程,直到提交的任务都完成,才关闭阻塞,跳出with继续执行主线程。
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
futrue = executor.submit(func, args, kwargs)
ThreadPoolExecutor() 类
字面意思,线程池执行者
可以往里面提交任务
实例化对象时可以填写以下参数:
max_workers 最大线程数
thread_name_prefix 线程前缀(可选)
initializer、initargs 创建线程时 会先调用
initializer(*initargs)
(可选)
ThreadPoolExecutor().submit() 方法
使用 submit()
方法来提交一个任务
submit 方法的参数:
fn 一个函数(高阶函数: 函数作为参数)
*args 任何参数,实际会传给 fn
**kwargs 任何指定参数,实际会传给 fn
submit 方法的返回值
return 一个Future对象
Future() 类
他代表由submit方法提交的任务,可以调用他的方法,实现自己的精细化逻辑
Future().done() 方法
这个任务是否完成,完成返回True,未完成返回False
Future().cancel() 方法
尝试取消执行这个任务,如果可以取消返回True,取消失败则返回False,如果执行开始则不能被取消。
Future().canceled() 方法
被成功取消,则返回 True。
Future().running() 方法
是否在执行,如果是返回True,否则False
Future().result() 方法
获取future返回值,如果还在运行则会阻塞通过设置timeout
参数来规定阻塞多少秒
如果future在设置的时间内完成,则返回返回值,超时则报错 concurrent.futures.TimeoutError
如果函数本身报错,则会索引到函数本身的报错
Future().exception() 方法
获取 future 的报错,并返回,可以设置timeout,和result是有区别的,result是索引到报错,这个方法是返回。
Future().add_done_callback() 方法
参数为一个函数 fn 将这个 fn 添加到回调列表,当线程执行完成则会执行传入的函数,可以多次增加,一次增加一个
调用fn 时 会传入他的Future对象本身
线程池综合案例
import concurrent.futures
import time
def init_thread(*args):
print("正在创建一个线程")
print(*args)
def f1():
print("f1")
time.sleep(1)
raise Exception("f1 error")
return "f1 ok"
def callback(f: concurrent.futures.Future):
print("执行成功" if f.done() else "执行失败")
print("完美运行" if f.exception() == None else "运行出错")
if f.exception() != None:
print(f.exception())
with concurrent.futures.ThreadPoolExecutor(initializer=init_thread, initargs=("hello",)) as executor:
futrue: concurrent.futures.Future = executor.submit(f1)
futrue.add_done_callback(callback)
原创文章 转载请标明出处
https://d5v.cc/archives/1703568445679
评论