Python Concurrent 详解

前言

有的人说Python中的多线程是假的,是不真实的,把Python的多线程贬的一无是处,但其实对于高IO低计算的场景下,线程、协程的提升是巨大的,例如爬虫项目,可以借助async、concurrent和threading等模块,实现大幅度的性能提升,计算任务可以使用大数据中的mapreduce思想借助multiprocessing模块多进程计算。

介绍 Concurrent 内置库

concurrent 是什么?

Python 中的内置库,他帮助我们快速实现并发编程。

对比其他库:

类型

模块

介绍

CPU Cores

量级

协程

async

协程,一种轻量级应用层的任务调度切换达到多线程效果的模块,由代码和语言的编译器决定何时计算

1

上万

并行

threading

多线程,线程由CPU调度何时计算何时处理

1

数千

并发

multiprocessing

多进程

Any

几十

并行、并发

concurrent

支持多线程和多进程,很方便的创建线程池,程序员只需要编写函数然后提交,线程池便会自动调度执行

1 or Any

几十 - 数千

什么是并发、并行?

打个比方:

并发就是一个人吃三个包子,这个吃一口哪个吃一口直到吃完。

并行就是三个人吃三个包子,一起吃,直到吃完。

包子就是待处理的任务,人就是CPU核心。

了解详细:https://blog.csdn.net/java_zero2one/article/details/51477791

实现一个小案例

import concurrent.futures
import time

# 任务函数
def task(n):
    print(f"开始任务 {n}")
    time.sleep(n)  # 等待 n 秒
    print(f"{n} ok")
    return n

# 使用 concurrent futures 模块 创建线程池       最大线程3
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # 提交10个任务
	print("开始")
    for i in range(10):
        executor.submit(task, i)
	print("任务创建完成")

print("完成")

执行效果:

开始
开始任务 1
开始任务 2
开始任务 3
任务创建完成
1 ok
开始任务 4
2 ok
开始任务 5
3 ok
开始任务 6
4 ok
...

这个简单的小案例实现了10个任务提交给线程池,线程池自动依次执行提交的函数。

任务1完成后,马上自动执行了任务4,非常方便,用户只需要提交任务函数就行了。

创建一个线程池

通过 with 关键字加上 concurrent.futures.ThreadPoolExecutor 类来创建线程池

为什么要用 with :防止进程池自动销毁,with可以阻塞主线程,直到提交的任务都完成,才关闭阻塞,跳出with继续执行主线程。

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor:
    futrue = executor.submit(func, args, kwargs)

ThreadPoolExecutor() 类

字面意思,线程池执行者

可以往里面提交任务

实例化对象时可以填写以下参数:

  • max_workers 最大线程数

  • thread_name_prefix 线程前缀(可选)

  • initializer、initargs 创建线程时 会先调用 initializer(*initargs)(可选)

ThreadPoolExecutor().submit() 方法

使用 submit() 方法来提交一个任务

submit 方法的参数:

fn 一个函数(高阶函数: 函数作为参数)

*args 任何参数,实际会传给 fn

**kwargs 任何指定参数,实际会传给 fn

submit 方法的返回值

return 一个Future对象

Future() 类

他代表由submit方法提交的任务,可以调用他的方法,实现自己的精细化逻辑

Future().done() 方法

这个任务是否完成,完成返回True,未完成返回False

Future().cancel() 方法

尝试取消执行这个任务,如果可以取消返回True,取消失败则返回False,如果执行开始则不能被取消。

Future().canceled() 方法

被成功取消,则返回 True。

Future().running() 方法

是否在执行,如果是返回True,否则False

Future().result() 方法

获取future返回值,如果还在运行则会阻塞通过设置timeout 参数来规定阻塞多少秒

如果future在设置的时间内完成,则返回返回值,超时则报错 concurrent.futures.TimeoutError 如果函数本身报错,则会索引到函数本身的报错

Future().exception() 方法

获取 future 的报错,并返回,可以设置timeout,和result是有区别的,result是索引到报错,这个方法是返回。

Future().add_done_callback() 方法

参数为一个函数 fn 将这个 fn 添加到回调列表,当线程执行完成则会执行传入的函数,可以多次增加,一次增加一个

调用fn 时 会传入他的Future对象本身

线程池综合案例

import concurrent.futures
import time

def init_thread(*args):
    print("正在创建一个线程")
    print(*args)


def f1():
    print("f1")
    time.sleep(1)
    raise Exception("f1 error")
    return "f1 ok"

def callback(f: concurrent.futures.Future):
    print("执行成功" if f.done() else "执行失败")
    print("完美运行" if f.exception() == None else "运行出错")
    if f.exception() != None:
        print(f.exception())



with concurrent.futures.ThreadPoolExecutor(initializer=init_thread, initargs=("hello",)) as executor:
    futrue: concurrent.futures.Future = executor.submit(f1)
    futrue.add_done_callback(callback)

原创文章 转载请标明出处
https://d5v.cc/archives/1703568445679

LICENSED UNDER CC BY-NC-SA 4.0
Comment