濮阳杆衣贸易有限公司

主頁(yè) > 知識(shí)庫(kù) > Python線程池的正確使用方法

Python線程池的正確使用方法

熱門(mén)標(biāo)簽:企業(yè)微信地圖標(biāo)注 銀川電話機(jī)器人電話 高德地圖標(biāo)注收入咋樣 B52系統(tǒng)電梯外呼顯示E7 鶴壁手機(jī)自動(dòng)外呼系統(tǒng)違法嗎 萊蕪電信外呼系統(tǒng) 沈陽(yáng)防封電銷電話卡 怎么辦理400客服電話 地圖標(biāo)注多個(gè)

Python線程池的正確使用

1、為什么要使用線程池呢?

因?yàn)榫€程執(zhí)行完任務(wù)之后就會(huì)被系統(tǒng)銷毀,下次再執(zhí)行任務(wù)的時(shí)候再進(jìn)行創(chuàng)建。這種方式在邏輯上沒(méi)有啥問(wèn)題。但是系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高,因?yàn)槠渲猩婕芭c操作系統(tǒng)的交互,操作系統(tǒng)需要給新線程分配資源。打個(gè)比方吧!就像軟件公司招聘員工干活一樣。當(dāng)有活干時(shí),就招聘一個(gè)外包人員干活。當(dāng)活干完之后就把這個(gè)人員辭退掉。你說(shuō)在這過(guò)程中所耗費(fèi)的時(shí)間成本和溝通成本是不是很大。那么公司一般的做法是:當(dāng)項(xiàng)目立項(xiàng)時(shí)就確定需要幾名開(kāi)發(fā)人員,然后將這些人員配齊。然后這些人員就常駐在項(xiàng)目組,有活就干,沒(méi)活就摸魚(yú)。線程池也是同樣的道理。線程池可以定義最大線程數(shù),這些線程有任務(wù)就執(zhí)行任務(wù),沒(méi)任務(wù)就進(jìn)入線程池中歇著。

2、線程池怎么用呢?

線程池的基類是concurrent.futures模塊中的Executor類,而Executor類提供了兩個(gè)子類,即ThreadPoolExecutor類和ProcessPoolExecutor類。其中ThreadPoolExecutor用于創(chuàng)建線程池,而ProcessPoolExecutor用于創(chuàng)建進(jìn)程池。本文將重點(diǎn)介紹ThreadPoolExecutor類的使用。首先,讓我們來(lái)看看ThreadPoolExecutor類的構(gòu)造函數(shù)。這里使用的Python版本是:3.6.7。

      def __init__(self, max_workers=None, thread_name_prefix=''):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers = 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))

他的構(gòu)造函數(shù)只有兩個(gè)參數(shù):一個(gè)是max_workers參數(shù),用于指定線程池的最大線程數(shù),如果不指定的話則默認(rèn)是CPU核數(shù)的5倍。另一個(gè)參數(shù)是thread_name_prefix,它用來(lái)指定線程池中線程的名稱前綴。其他參數(shù):

  • _shutdown初始值值為False,默認(rèn)情況下線程池不銷毀,即線程池的生命周期跟項(xiàng)目的生命周期一致。
  • self._work_queue = queue.Queue()生成緩沖隊(duì)列。
  • _threads沒(méi)有任務(wù)被提交時(shí),線程的數(shù)量設(shè)置為0。
  • _shutdown_lock 指定線程池的鎖是Lock鎖。
  • 說(shuō)完了線程池的創(chuàng)建之后,接著來(lái)看看線程池中比較常用的幾個(gè)方法吧。
  • submit(self, fn, *args, **kwargs):
  • 該方法用提交任務(wù),即將fn函數(shù)提交給線程池,*args代表傳給fn函數(shù)的參數(shù),**kwargs代表以關(guān)鍵字參數(shù)的形式為fn函數(shù)傳入?yún)?shù)。
  • shutdown(self, wait=True):
  • 關(guān)閉線程池
  • map(func, *iterables, timeout=None, chunksize=1):
  • 該函數(shù)類似于全局函數(shù)map(func,*iterables),只是該函數(shù)將會(huì)啟動(dòng)多個(gè)線程,以異步方式立即對(duì)iterables執(zhí)行map處理。

程序?qū)ask函數(shù)通過(guò)submit方法提交給線程池之后,線程池會(huì)返回一個(gè)Future對(duì)象,該對(duì)象的作用主要是用于獲取線程任務(wù)函數(shù)的返回值。Future提供了如下幾個(gè)方法。

  • cancel():取消該Future代表的線程任務(wù)。如果該任務(wù)正在執(zhí)行,不可取消,則該方法返回False;否則,程序會(huì)取消該任務(wù),并返回True。
  • result(timeout=None):獲取該 Future 代表的線程任務(wù)最后返回的結(jié)果。如果 Future 代表的線程任務(wù)還未完成,該方法將會(huì)阻塞當(dāng)前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。
  • add_done_callback(fn):為該 Future 代表的線程任務(wù)注冊(cè)一個(gè)“回調(diào)函數(shù)”,當(dāng)該任務(wù)成功完成時(shí),程序會(huì)自動(dòng)觸發(fā)該 fn 函數(shù)。
  • done():如果該Future代表的線程任務(wù)被成功取消或執(zhí)行完成,則該方法返回True。

來(lái)個(gè)簡(jiǎn)單的例子:

該例中創(chuàng)建了一個(gè)最大線程數(shù)是2的線程池來(lái)執(zhí)行async_add函數(shù)。

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum))
    return sum

# 創(chuàng)建兩個(gè)線程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='測(cè)試線程')
# 向線程池提交一個(gè)task,20作為async_add()函數(shù)的參數(shù)
future1 = pool.submit(async_add, 20)
# 向線程池再提交一個(gè)task
future2 = pool.submit(async_add, 50)
# 判斷future1代表的任務(wù)是否執(zhí)行完
time.sleep(2)
print(future1.done())
print(future2.done())
# 查看future1代表的任務(wù)返回的結(jié)果
print('線程一的執(zhí)行結(jié)果是=' + str(future1.result()))
# 查看future2代表的任務(wù)的返回結(jié)果
print('線程二的執(zhí)行結(jié)果是=' + str(future2.result()))
print("----" + threading.current_thread().name + "----主線程執(zhí)行結(jié)束-----")

運(yùn)行結(jié)果是:

測(cè)試線程_0執(zhí)行求和操作求得的和是=190
測(cè)試線程_1執(zhí)行求和操作求得的和是=1225
True
True
線程一的執(zhí)行結(jié)果是=190
線程二的執(zhí)行結(jié)果是=1225
----MainThread----主線程執(zhí)行結(jié)束-----

本例中定義了一個(gè)最大線程數(shù)是2的線程池,并向線程池中提交了兩個(gè)任務(wù),其中async_add函數(shù)就是要執(zhí)行的任務(wù)。在async_add函數(shù)中添加 time.sleep(1) 休眠一秒是為了驗(yàn)證done()方法返回的結(jié)果。最后才打印主線程執(zhí)行結(jié)束表明result()方法是阻塞的。如果將result()屏蔽掉。
改成如下形式:

# 創(chuàng)建兩個(gè)線程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='測(cè)試線程')
# 向線程池提交一個(gè)task,20作為async_add()函數(shù)的參數(shù)
future1 = pool.submit(async_add, 20)
# 向線程池再提交一個(gè)task
future2 = pool.submit(async_add, 50)
# 判斷future1代表的任務(wù)是否執(zhí)行完
print(future1.done())
print(future2.done())
print("----" + threading.current_thread().name + "----主線程執(zhí)行結(jié)束-----")

則運(yùn)行結(jié)果是:

False
False
----MainThread----主線程執(zhí)行結(jié)束-----
測(cè)試線程_0執(zhí)行求和操作求得的和是=190
測(cè)試線程_1執(zhí)行求和操作求得的和是=1225

3、如何非阻塞的獲取線程執(zhí)行的結(jié)果

前面介紹的result()方法是通過(guò)阻塞的方式來(lái)獲取線程的運(yùn)行結(jié)果的。那么如果通過(guò)非阻塞的方法來(lái)獲取線程任務(wù)最后的返回結(jié)果呢?這里就需要使用線程的回調(diào)函數(shù)來(lái)獲取線程的返回結(jié)果。

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=2) as pool:
    # 向線程池提交一個(gè)task
    future1 = pool.submit(async_add, 20)
    future2 = pool.submit(async_add, 50)


    # 定義獲取結(jié)果的函數(shù)
    def get_result(future):
        print(threading.current_thread().name + '運(yùn)行結(jié)果:' + str(future.result()))


    # 查看future1代表的任務(wù)返回的結(jié)果
    future1.add_done_callback(get_result)
    # 查看future2代表的任務(wù)的返回結(jié)果
    future2.add_done_callback(get_result)
    print('------------主線程執(zhí)行結(jié)束----')

運(yùn)行結(jié)果是:

------------主線程執(zhí)行結(jié)束----
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=1225
ThreadPoolExecutor-0_1運(yùn)行結(jié)果:1225
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=190
ThreadPoolExecutor-0_0運(yùn)行結(jié)果:190

從結(jié)果可以看出獲取線程執(zhí)行結(jié)果的方法完全沒(méi)有阻塞到主線程的運(yùn)行。這里通過(guò)add_done_callback函數(shù)向線程池中注冊(cè)了一個(gè)獲取線程執(zhí)行結(jié)果的函數(shù)get_result。
由于線程池實(shí)現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此程序可以使用with語(yǔ)句來(lái)管理線程池,這樣即可避免手動(dòng)關(guān)閉線程池。

4、線程池的運(yùn)行策略

這里有必要介紹一下線程池的執(zhí)行策略,也就是說(shuō)當(dāng)線程池中的任務(wù)數(shù)大于線程池的最大線程數(shù)時(shí),線程池該如何處理這些任務(wù)呢?處理不了的任務(wù)是直接丟棄還是慢慢處理呢?再回答這個(gè)問(wèn)題之前,讓我們來(lái)看下下面這個(gè)例子:這里定義了一個(gè)最大線程數(shù)是4個(gè)線程池,然后向線程池中提交了100個(gè)task任務(wù)。

def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "執(zhí)行求和操作求得的和是=" + str(sum))
    return sum


with ThreadPoolExecutor(max_workers=4) as pool:
    for i in range(100):
        pool.submit(async_add, i)
    print('------------主線程執(zhí)行結(jié)束----')

運(yùn)行結(jié)果是:

------------主線程執(zhí)行結(jié)束----
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=0
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=0
ThreadPoolExecutor-0_3執(zhí)行求和操作求得的和是=3
ThreadPoolExecutor-0_2執(zhí)行求和操作求得的和是=1
...省略部分結(jié)果.....
ThreadPoolExecutor-0_1執(zhí)行求和操作求得的和是=4656
ThreadPoolExecutor-0_2執(zhí)行求和操作求得的和是=4753
ThreadPoolExecutor-0_0執(zhí)行求和操作求得的和是=4560
ThreadPoolExecutor-0_3執(zhí)行求和操作求得的和是=4851

從運(yùn)行結(jié)果可以看出:一直都是相同的線程來(lái)執(zhí)行這些任務(wù),并且所有的任務(wù)都沒(méi)有被丟棄。并且任務(wù)按照先來(lái)后到的順序來(lái)執(zhí)行。這里就需要說(shuō)到線程池默認(rèn)的緩沖隊(duì)列了。self._work_queue = queue.Queue() 該語(yǔ)句會(huì)創(chuàng)建一個(gè)大小無(wú)限制的緩沖隊(duì)列。該隊(duì)列是一個(gè) FIFO(先進(jìn)先出)的常規(guī)隊(duì)列。所以當(dāng)任務(wù)數(shù)超過(guò)最大線程數(shù)時(shí),任務(wù)會(huì)暫時(shí)放在緩沖隊(duì)列queue中。當(dāng)線程空閑之后會(huì)從緩沖隊(duì)列中取出任務(wù)來(lái)執(zhí)行。
該隊(duì)列有個(gè)參數(shù)maxsize可以限制隊(duì)列的大小。如果隊(duì)列的大小達(dá)到隊(duì)列的上限,就會(huì)加鎖,再次加入元素時(shí),就會(huì)被阻塞,直到隊(duì)列中的元素被消費(fèi)。如果將maxsize的設(shè)置為0或者負(fù)數(shù)時(shí),則該隊(duì)列的大小就是無(wú)限制的。

到此這篇關(guān)于Python線程池的正確使用方法的文章就介紹到這了,更多相關(guān)Python線程池的正確使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • Python 線程池模塊之多線程操作代碼
  • Python爬蟲(chóng)之線程池的使用
  • python線程池的四種好處總結(jié)
  • python爬蟲(chóng)線程池案例詳解(梨視頻短視頻爬取)
  • python線程池 ThreadPoolExecutor 的用法示例
  • 實(shí)例代碼講解Python 線程池
  • Python 如何創(chuàng)建一個(gè)線程池
  • python線程池如何使用
  • 解決python ThreadPoolExecutor 線程池中的異常捕獲問(wèn)題
  • Python定時(shí)器線程池原理詳解
  • Python 使用threading+Queue實(shí)現(xiàn)線程池示例

標(biāo)簽:安慶 湘西 銀川 葫蘆島 三亞 呼倫貝爾 烏魯木齊 呼倫貝爾

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《Python線程池的正確使用方法》,本文關(guān)鍵詞  Python,線程,池,的,正確,使用方法,;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《Python線程池的正確使用方法》相關(guān)的同類信息!
  • 本頁(yè)收集關(guān)于Python線程池的正確使用方法的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    信丰县| 土默特左旗| 敦化市| 沁源县| 文成县| 舒城县| 沾益县| 临城县| 塘沽区| 漳州市| 白山市| 榆林市| 云林县| 天祝| 栾城县| 家居| 江北区| 克山县| 淮安市| 乌鲁木齐县| 剑川县| 丰原市| 黄山市| 清新县| 屯门区| 河北区| 文成县| 龙岩市| 从化市| 嘉禾县| 张家界市| 福贡县| 库车县| 兴义市| 志丹县| 治县。| 开原市| 称多县| 绍兴市| 家居| 郸城县|