アプリとサービスのすすめ

アプリやIT系のサービスを中心に書いていきます。たまに副業やビジネス関係の情報なども気ままにつづります

pythonの並行処理・並列処理コード集の備忘録

pythonで並行処理・並列処理系のコードの備忘録


f:id:trafalbad:20211101112613j:plain

・multiprocessing
・concurrent.futures


thread

・古いPython2系のバージョンだとこのモジュールしかなかったりするものの、基本的には使い勝手が悪いので使わない。

・Python3系では間違って使わないように、_threadとアンダースコアがつけられているらしい。

・threading

・thread上位互換。並行処理のベーシックなビルトインモジュール。

インターフェイスが大分親切になった。

・Python3系はもちろんのこと、2.7系とかでももう使えるので、基本的にはthreadを使うくらいならthreadingを使うことになる。


・concurrent.futures

・Python3.2以降に登場。基本的にthreadingよりもさらに優秀。

・なお、futureは並列処理のFutureパターンに由来する。(1960~1970年代などに発展し、提案された結構昔からあるもの)

・スレッド数の上限を指定して、スレッドの使いまわしなどをしたりしてくれるらしい。(最初に同時に動かす最大数 max_workers を決めるとスレッドを使いまわしてくれるので上で紹介した普通のスレッドよりかしこい)

・また、マルチスレッドとマルチプロセスの切り替えも1行変える程度で、このモジュールで扱えるので、途中で変えたくなったり比較してみる際などにも便利
****
したがってCPUバウンドなピュアPythonコードを threading でマルチスレッド化しても速くならない。 subprocess による外部プログラム実行やI/OなどGIL外の処理を待つ場合には有効。
一方 multiprocessing は新しいインタプリタを os.fork() で立ち上げるので、 CPUバウンドなPythonコードもGILに邪魔されず並列処理できる。 ただし通信のため関数や返り値がpicklableでなければならない。
それらの低級ライブラリを使いやすくまとめたのが concurrent.futures (since 3.2) なので、とりあえずこれを使えばよい。

****

・threadingとどちらを使うか、という点に関しては、concurrent.futuresが使える環境(Python2.7.xなど)であればそちらを、使えない古い環境であればthreadingという選択で良さそう。

multiprocessing


1.multiprocessingのPoolで複数のcpuコアで処理


import time, os, sys
from multiprocessing import Pool
import multiprocessing

print("start worker={}", os.getpid())

def nijou(inputs):
    x = inputs
    print('input: %d' % (x))
    time.sleep(2)
    retValue = x * x
    print('double: %d' % (retValue))
    return(retValue)

if __name__ == "__main__":
    num_cpu = multiprocessing.cpu_count()
    case = int(sys.argv[1])
    values = [x for x in range(10)]
    if case==1:
        with Pool(processes=num_cpu) as p:
            print('case1')
            stime = time.time()
            print(values)
            # list is required
            result = p.map(nijou, values)
    else:
        with Pool(processes=num_cpu) as p:
            print('case2')
            stime = time.time()
            print(values)
            # list is required
            result = p.map(nijou, values)
    print(result)
    print('time is ', time.time() -stime)

もう少し複雑な処理。データのdownloadはこちら

import time, sys
import numpy as np
from multiprocessing import Pool

def cos_sim(v1, v2):
    v1_ = np.array(v1)
    v2_ = np.array(v2)
    return np.dot(v1_, v2_) / (np.linalg.norm(v1_) * np.linalg.norm(v2_))

class Sample:
    def __init__(self, user_size=943, item_size=1682, file_path="ml-100k/u.data", pool=True):
        self.file_path = file_path
        # user数×アイテム数のリスト
        self.eval_table = [[0 for _ in range(item_size)] for _ in range(user_size)]
        # user数×user数のcos類似度テーブル
        self.sim_table = [[0 for _ in range(user_size)] for _ in range(user_size)]
        
        self.pool = pool
    def distinguish_info(self, line):
        u_id, i_id, rating, timestamp = line.replace("\n", "").split("\t")
        # u_idとi_idはitemのindexを一つずらす
        return int(u_id)-1, int(i_id)-1, float(rating), timestamp


    def calc_cossim(self, target_u_id, target_user_eval):
        for u_id ,user_eval in enumerate(self.eval_table):
            self.sim_table[target_u_id][u_id] = cos_sim(target_user_eval, user_eval)
        if self.pool:
            return self.sim_table[target_u_id]
    def wrapper(self, args):
        return self.calc_cossim(*args)
        
    def run(self):
        f = open(self.file_path , 'r')
        # userとitemのテーブル作成
        start = time.time()
        for line in f:
            u_id, i_id, rating, _ = self.distinguish_info(line)
            self.eval_table[u_id][i_id] = rating

        # テーブルに基づいてcos類似度作成
        for target_u_id, target_user_eval in enumerate(self.eval_table):
            self.calc_cossim(target_u_id, target_user_eval)
        times = time.time()-start
        print("total time is:{}".format(times))
        
    def run_pool(self, processes=8):
        f = open(self.file_path , 'r')
        # userとitemのテーブル作成
        start = time.time()
        for line in f:
            u_id, i_id, rating, _ = self.distinguish_info(line)
            self.eval_table[u_id][i_id] = rating

        # テーブルに基づいてcos類似度作成
        tmp = [(target_u_id, target_user_eval) for target_u_id, target_user_eval in enumerate(self.eval_table)]
        with Pool(processes=processes) as pool:
            # 変更
            self.right_sim_table = pool.map(self.wrapper, tmp)
        times = time.time()-start
        print("total time is :{}".format(times))


if __name__ == "__main__":
    pool = str(sys.argv[1])
    if pool=='pool':
        s = Sample(pool=True)
        s.run_pool(processes=8)
    else:
        s = Sample(pool=None)
        s.run()
$ python3 complicated_pool.py pool
>>>>
total time is :40.1155219078064

$ python3 complicated_pool.py none
>>>>
total time is:194.3105709552765

2.ManagerでProcess間を順番通りに行う

from multiprocessing import Manager, Process

def f6(d, l):
    # 辞書型に値を詰め込みます.
    d[1] = '1'
    d["2"] = 2
    d[0.25] = None
    # 配列を操作します(ここでは逆順に).
    l.reverse()

if __name__ == "__main__":
    # マネージャーを生成します.
    with Manager() as manager:
        # マネージャーから辞書型を生成します.
        d = manager.dict()
        # マネージャーから配列を生成します.
        l = manager.list(range(10))
        # サブプロセスを作り実行します.
        p = Process(target=f6, args=(d,l))
        p.start()
        p.join()
        # 辞書からデータを取り出します.
        print(d)
        # 配列からデータを取り出します.
        print(l)


3.QueueでProcess間で値の受け渡し

import time
from multiprocessing import Queue, Process

def f2(q):
    time.sleep(3)
    q.put([42, None, "Hello"])

if __name__ == "__main__":
    q = Queue()
    # キューを引数に渡して、サブプロセスを作成
    p = Process(target=f2, args=(q,))
    p.start()
    # wqait for queue get()
    print(q.get())
    p.join()


4.Lockで制御

from multiprocessing import Lock, Process

def f4(lock, i):
    # ロックを取得します.
    lock.acquire()
    # ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ)
    try:
        print('Hello', i)
    finally:
        # ロックを解放します.
        lock.release()

if __name__ == "__main__":
    # ロックを作成します.
    lock = Lock()
    for num in range(10):
        Process(target=f4, args=(lock, num)).start()

5.apply_async(Poolと特に代わりない)

import time, sys, os
from multiprocessing import Pool, Process

def nijou(inputs):
    x = inputs
    print('input: %d' % x)
    time.sleep(2)
    retValue = x * x
    print('double: %d' % retValue)
    return(retValue)

class PoolApply:
    def __init__(self, processes):
        self.processes = processes
    def pool_apply(self):
        p = Pool(self.processes)
        stime = time.time()
        values = [x for x in range(10)]
        #print(values)
        # not list
        #for x in range(10):
        result = p.apply(nijou, args=[values[9]])
        print(result)
        print('time is ', time.time() -stime)
        p.close()
        
    def pool_apply_async(self):
        p = Pool(self.processes)
        stime = time.time()
        # プロセスを2つ非同期で実行
        values = [x for x in range(10)]
        result = p.apply_async(nijou, args=[values[9]])
        result2 = p.apply_async(nijou, args=[values[9]])
        print(result.get())
        print(result2.get())
        print('time is ', time.time() -stime)
        p.close()
        
if __name__ == "__main__":
    case_no = int(sys.argv[1])
    num_process = int(sys.argv[2])
    pool = PoolApply(num_process)
    if case_no==1:
        pool.pool_apply()
    elif case_no==2:
        pool.pool_apply_async()


concurrent.futures

1.MultiProcess

import math, time
import sys
import concurrent.futures

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return '0'
    return '{}'.format(n)

class MultiProcess():
    def run(self):
        for number, prime in zip(PRIMES, map(is_prime, PRIMES)):
            print(f'{number} is prime: {prime}')
    
    def multi_precoss_run(self):
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print(f'{number} is prime: {prime}')
    

if __name__ == '__main__':
    multi = str(sys.argv[1])
    MULTI = MultiProcess()
    startTime = time.time()
    if multi=='m':
        print('multi process')
        MULTI.multi_precoss_run()
    else:
        print('No multi process')
        MULTI.run()
    runTime = time.time() - startTime
    print(f'Time:{runTime}[sec]')
$ python3  multi_process.py m
>>>>
multi process
112272535095293 is prime: 112272535095293
112582705942171 is prime: 112582705942171
112272535095293 is prime: 112272535095293
115280095190773 is prime: 115280095190773
115797848077099 is prime: 115797848077099
1099726899285419 is prime: 0
Time:0.5466821193695068[sec]
$ python3  multi_process.py. none
>>>>
No multi process
112272535095293 is prime: 112272535095293
112582705942171 is prime: 112582705942171
112272535095293 is prime: 112272535095293
115280095190773 is prime: 115280095190773
115797848077099 is prime: 115797848077099
1099726899285419 is prime: 0
Time:2.1444289684295654[sec]

2.MultiThread(Poolと同じ)

import concurrent.futures
import urllib.request
import time, sys, os

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

class ConcurrentFutures():
    def get_detail(self):
        # Start the load operations and mark each future with its URL
        for url in URLS:
            try:
                data = load_url(url,60)
            except Exception as exc:
                print(f'{url} generated an exception: {exc}')
            else:
                print(f'{url} page is len(data) bytes')
    
    def mlti_thread_get_detail(self):
        # We can use a with statement to ensure threads are cleaned up promptly
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            # Start the load operations and mark each future with its URL
            future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print(f'{url} generated an exception: {exc}')
                else:
                    print(f'{url} page is len(data) bytes')

def main():
    pool = str(sys.argv[1])
    CFthread = ConcurrentFutures()
    startTime = time.time()
    if pool=='pool':
        print('multi thread')
        CFthread.mlti_thread_get_detail()
    else:
        print('no thread')
        CFthread.get_detail()
    runTime = time.time() - startTime
    print (f'Time:{runTime}[sec]')

if __name__ == '__main__':
    main()
$ python3 mlti_thread.py pool
>>>
multi thread
〜〜〜〜
Time:7.765803098678589[sec]
$ python3 mlti_thread.py none
>>>>
no thread
〜〜〜
Time:9.271435022354126[sec]