pythonで並行処理・並列処理系のコードの備忘録
・multiprocessing
・concurrent.futures
・thread
・古いPython2系のバージョンだとこのモジュールしかなかったりするものの、基本的には使い勝手が悪いので使わない。
・Python3系では間違って使わないように、_threadとアンダースコアがつけられているらしい。
・threading
・thread上位互換。並行処理のベーシックなビルトインモジュール。
・インターフェイスが大分親切になった。
・Python3系はもちろんのこと、2.7系とかでももう使えるので、基本的にはthreadを使うくらいならthreadingを使うことになる。
・concurrent.futures
・Python3.2以降に登場。基本的にthreadingよりもさらに優秀。
・なお、futureは並列処理のFutureパターンに由来する。(1960~1970年代などに発展し、提案された結構昔からあるもの)
・スレッド数の上限を指定して、スレッドの使いまわしなどをしたりしてくれるらしい。(最初に同時に動かす最大数 max_workers を決めるとスレッドを使いまわしてくれるので上で紹介した普通のスレッドよりかしこい)
・また、マルチスレッドとマルチプロセスの切り替えも1行変える程度で、このモジュールで扱えるので、途中で変えたくなったり比較してみる際などにも便利
****
したがってCPUバウンドなピュアPythonコードを threading でマルチスレッド化しても速くならない。 subprocess による外部プログラム実行やI/OなどGIL外の処理を待つ場合には有効。
一方 multiprocessing は新しいインタプリタを os.fork() で立ち上げるので、 CPUバウンドなPythonコードもGILに邪魔されず並列処理できる。 ただし通信のため関数や返り値がpicklableでなければならない。
それらの低級ライブラリを使いやすくまとめたのが concurrent.futures (since 3.2) なので、とりあえずこれを使えばよい。
****・threadingとどちらを使うか、という点に関しては、concurrent.futuresが使える環境(Python2.7.xなど)であればそちらを、使えない古い環境であればthreadingという選択で良さそう。
multiprocessing
1.multiprocessingのPoolで複数のcpuコアで処理
import time, os, sys from multiprocessing import Pool import multiprocessing print("start worker={}", os.getpid()) def nijou(inputs): x = inputs print('input: %d' % (x)) time.sleep(2) retValue = x * x print('double: %d' % (retValue)) return(retValue) if __name__ == "__main__": num_cpu = multiprocessing.cpu_count() case = int(sys.argv[1]) values = [x for x in range(10)] if case==1: with Pool(processes=num_cpu) as p: print('case1') stime = time.time() print(values) # list is required result = p.map(nijou, values) else: with Pool(processes=num_cpu) as p: print('case2') stime = time.time() print(values) # list is required result = p.map(nijou, values) print(result) print('time is ', time.time() -stime)
もう少し複雑な処理。データのdownloadはこちら
import time, sys import numpy as np from multiprocessing import Pool def cos_sim(v1, v2): v1_ = np.array(v1) v2_ = np.array(v2) return np.dot(v1_, v2_) / (np.linalg.norm(v1_) * np.linalg.norm(v2_)) class Sample: def __init__(self, user_size=943, item_size=1682, file_path="ml-100k/u.data", pool=True): self.file_path = file_path # user数×アイテム数のリスト self.eval_table = [[0 for _ in range(item_size)] for _ in range(user_size)] # user数×user数のcos類似度テーブル self.sim_table = [[0 for _ in range(user_size)] for _ in range(user_size)] self.pool = pool def distinguish_info(self, line): u_id, i_id, rating, timestamp = line.replace("\n", "").split("\t") # u_idとi_idはitemのindexを一つずらす return int(u_id)-1, int(i_id)-1, float(rating), timestamp def calc_cossim(self, target_u_id, target_user_eval): for u_id ,user_eval in enumerate(self.eval_table): self.sim_table[target_u_id][u_id] = cos_sim(target_user_eval, user_eval) if self.pool: return self.sim_table[target_u_id] def wrapper(self, args): return self.calc_cossim(*args) def run(self): f = open(self.file_path , 'r') # userとitemのテーブル作成 start = time.time() for line in f: u_id, i_id, rating, _ = self.distinguish_info(line) self.eval_table[u_id][i_id] = rating # テーブルに基づいてcos類似度作成 for target_u_id, target_user_eval in enumerate(self.eval_table): self.calc_cossim(target_u_id, target_user_eval) times = time.time()-start print("total time is:{}".format(times)) def run_pool(self, processes=8): f = open(self.file_path , 'r') # userとitemのテーブル作成 start = time.time() for line in f: u_id, i_id, rating, _ = self.distinguish_info(line) self.eval_table[u_id][i_id] = rating # テーブルに基づいてcos類似度作成 tmp = [(target_u_id, target_user_eval) for target_u_id, target_user_eval in enumerate(self.eval_table)] with Pool(processes=processes) as pool: # 変更 self.right_sim_table = pool.map(self.wrapper, tmp) times = time.time()-start print("total time is :{}".format(times)) if __name__ == "__main__": pool = str(sys.argv[1]) if pool=='pool': s = Sample(pool=True) s.run_pool(processes=8) else: s = Sample(pool=None) s.run()
$ python3 complicated_pool.py pool >>>> total time is :40.1155219078064 $ python3 complicated_pool.py none >>>> total time is:194.3105709552765
2.ManagerでProcess間を順番通りに行う
from multiprocessing import Manager, Process def f6(d, l): # 辞書型に値を詰め込みます. d[1] = '1' d["2"] = 2 d[0.25] = None # 配列を操作します(ここでは逆順に). l.reverse() if __name__ == "__main__": # マネージャーを生成します. with Manager() as manager: # マネージャーから辞書型を生成します. d = manager.dict() # マネージャーから配列を生成します. l = manager.list(range(10)) # サブプロセスを作り実行します. p = Process(target=f6, args=(d,l)) p.start() p.join() # 辞書からデータを取り出します. print(d) # 配列からデータを取り出します. print(l)
3.QueueでProcess間で値の受け渡し
import time from multiprocessing import Queue, Process def f2(q): time.sleep(3) q.put([42, None, "Hello"]) if __name__ == "__main__": q = Queue() # キューを引数に渡して、サブプロセスを作成 p = Process(target=f2, args=(q,)) p.start() # wqait for queue get() print(q.get()) p.join()
4.Lockで制御
from multiprocessing import Lock, Process def f4(lock, i): # ロックを取得します. lock.acquire() # ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ) try: print('Hello', i) finally: # ロックを解放します. lock.release() if __name__ == "__main__": # ロックを作成します. lock = Lock() for num in range(10): Process(target=f4, args=(lock, num)).start()
5.apply_async(Poolと特に代わりない)
import time, sys, os from multiprocessing import Pool, Process def nijou(inputs): x = inputs print('input: %d' % x) time.sleep(2) retValue = x * x print('double: %d' % retValue) return(retValue) class PoolApply: def __init__(self, processes): self.processes = processes def pool_apply(self): p = Pool(self.processes) stime = time.time() values = [x for x in range(10)] #print(values) # not list #for x in range(10): result = p.apply(nijou, args=[values[9]]) print(result) print('time is ', time.time() -stime) p.close() def pool_apply_async(self): p = Pool(self.processes) stime = time.time() # プロセスを2つ非同期で実行 values = [x for x in range(10)] result = p.apply_async(nijou, args=[values[9]]) result2 = p.apply_async(nijou, args=[values[9]]) print(result.get()) print(result2.get()) print('time is ', time.time() -stime) p.close() if __name__ == "__main__": case_no = int(sys.argv[1]) num_process = int(sys.argv[2]) pool = PoolApply(num_process) if case_no==1: pool.pool_apply() elif case_no==2: pool.pool_apply_async()
concurrent.futures
1.MultiProcess
import math, time import sys import concurrent.futures PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return '0' return '{}'.format(n) class MultiProcess(): def run(self): for number, prime in zip(PRIMES, map(is_prime, PRIMES)): print(f'{number} is prime: {prime}') def multi_precoss_run(self): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print(f'{number} is prime: {prime}') if __name__ == '__main__': multi = str(sys.argv[1]) MULTI = MultiProcess() startTime = time.time() if multi=='m': print('multi process') MULTI.multi_precoss_run() else: print('No multi process') MULTI.run() runTime = time.time() - startTime print(f'Time:{runTime}[sec]')
$ python3 multi_process.py m >>>> multi process 112272535095293 is prime: 112272535095293 112582705942171 is prime: 112582705942171 112272535095293 is prime: 112272535095293 115280095190773 is prime: 115280095190773 115797848077099 is prime: 115797848077099 1099726899285419 is prime: 0 Time:0.5466821193695068[sec]
$ python3 multi_process.py. none >>>> No multi process 112272535095293 is prime: 112272535095293 112582705942171 is prime: 112582705942171 112272535095293 is prime: 112272535095293 115280095190773 is prime: 115280095190773 115797848077099 is prime: 115797848077099 1099726899285419 is prime: 0 Time:2.1444289684295654[sec]
2.MultiThread(Poolと同じ)
import concurrent.futures import urllib.request import time, sys, os URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() class ConcurrentFutures(): def get_detail(self): # Start the load operations and mark each future with its URL for url in URLS: try: data = load_url(url,60) except Exception as exc: print(f'{url} generated an exception: {exc}') else: print(f'{url} page is len(data) bytes') def mlti_thread_get_detail(self): # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print(f'{url} generated an exception: {exc}') else: print(f'{url} page is len(data) bytes') def main(): pool = str(sys.argv[1]) CFthread = ConcurrentFutures() startTime = time.time() if pool=='pool': print('multi thread') CFthread.mlti_thread_get_detail() else: print('no thread') CFthread.get_detail() runTime = time.time() - startTime print (f'Time:{runTime}[sec]') if __name__ == '__main__': main()
$ python3 mlti_thread.py pool >>> multi thread 〜〜〜〜 Time:7.765803098678589[sec] $ python3 mlti_thread.py none >>>> no thread 〜〜〜 Time:9.271435022354126[sec]