・スレッド数の上限を指定して、スレッドの使いまわしなどをしたりしてくれるらしい。(最初に同時に動かす最大数 max_workers を決めるとスレッドを使いまわしてくれるので上で紹介した普通のスレッドよりかしこい)
したがってCPUバウンドなピュアPythonコードを threading でマルチスレッド化しても速くならない。 subprocess による外部プログラム実行やI/OなどGIL外の処理を待つ場合には有効。
一方 multiprocessing は新しいインタプリタを os.fork() で立ち上げるので、 CPUバウンドなPythonコードもGILに邪魔されず並列処理できる。 ただし通信のため関数や返り値がpicklableでなければならない。
それらの低級ライブラリを使いやすくまとめたのが concurrent.futures (since 3.2) なので、とりあえずこれを使えばよい。
import time, os, sys from multiprocessing import Pool import multiprocessing print("start worker={}", os.getpid()) def nijou(inputs): x = inputs print('input: %d' % (x)) time.sleep(2) retValue = x * x print('double: %d' % (retValue)) return(retValue) if __name__ == "__main__": num_cpu = multiprocessing.cpu_count() case = int(sys.argv[1]) values = [x for x in range(10)] if case==1: with Pool(processes=num_cpu) as p: print('case1') stime = time.time() print(values) # list is required result =, values) else: with Pool(processes=num_cpu) as p: print('case2') stime = time.time() print(values) # list is required result =, values) print(result) print('time is ', time.time() -stime)
import time, sys import numpy as np from multiprocessing import Pool def cos_sim(v1, v2): v1_ = np.array(v1) v2_ = np.array(v2) return, v2_) / (np.linalg.norm(v1_) * np.linalg.norm(v2_)) class Sample: def __init__(self, user_size=943, item_size=1682, file_path="ml-100k/", pool=True): self.file_path = file_path # user数×アイテム数のリスト self.eval_table = [[0 for _ in range(item_size)] for _ in range(user_size)] # user数×user数のcos類似度テーブル self.sim_table = [[0 for _ in range(user_size)] for _ in range(user_size)] self.pool = pool def distinguish_info(self, line): u_id, i_id, rating, timestamp = line.replace("\n", "").split("\t") # u_idとi_idはitemのindexを一つずらす return int(u_id)-1, int(i_id)-1, float(rating), timestamp def calc_cossim(self, target_u_id, target_user_eval): for u_id ,user_eval in enumerate(self.eval_table): self.sim_table[target_u_id][u_id] = cos_sim(target_user_eval, user_eval) if self.pool: return self.sim_table[target_u_id] def wrapper(self, args): return self.calc_cossim(*args) def run(self): f = open(self.file_path , 'r') # userとitemのテーブル作成 start = time.time() for line in f: u_id, i_id, rating, _ = self.distinguish_info(line) self.eval_table[u_id][i_id] = rating # テーブルに基づいてcos類似度作成 for target_u_id, target_user_eval in enumerate(self.eval_table): self.calc_cossim(target_u_id, target_user_eval) times = time.time()-start print("total time is:{}".format(times)) def run_pool(self, processes=8): f = open(self.file_path , 'r') # userとitemのテーブル作成 start = time.time() for line in f: u_id, i_id, rating, _ = self.distinguish_info(line) self.eval_table[u_id][i_id] = rating # テーブルに基づいてcos類似度作成 tmp = [(target_u_id, target_user_eval) for target_u_id, target_user_eval in enumerate(self.eval_table)] with Pool(processes=processes) as pool: # 変更 self.right_sim_table =, tmp) times = time.time()-start print("total time is :{}".format(times)) if __name__ == "__main__": pool = str(sys.argv[1]) if pool=='pool': s = Sample(pool=True) s.run_pool(processes=8) else: s = Sample(pool=None)
$ python3 pool >>>> total time is :40.1155219078064 $ python3 none >>>> total time is:194.3105709552765
from multiprocessing import Manager, Process def f6(d, l): # 辞書型に値を詰め込みます. d[1] = '1' d["2"] = 2 d[0.25] = None # 配列を操作します(ここでは逆順に). l.reverse() if __name__ == "__main__": # マネージャーを生成します. with Manager() as manager: # マネージャーから辞書型を生成します. d = manager.dict() # マネージャーから配列を生成します. l = manager.list(range(10)) # サブプロセスを作り実行します. p = Process(target=f6, args=(d,l)) p.start() p.join() # 辞書からデータを取り出します. print(d) # 配列からデータを取り出します. print(l)
import time from multiprocessing import Queue, Process def f2(q): time.sleep(3) q.put([42, None, "Hello"]) if __name__ == "__main__": q = Queue() # キューを引数に渡して、サブプロセスを作成 p = Process(target=f2, args=(q,)) p.start() # wqait for queue get() print(q.get()) p.join()
from multiprocessing import Lock, Process def f4(lock, i): # ロックを取得します. lock.acquire() # ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ) try: print('Hello', i) finally: # ロックを解放します. lock.release() if __name__ == "__main__": # ロックを作成します. lock = Lock() for num in range(10): Process(target=f4, args=(lock, num)).start()
import time, sys, os from multiprocessing import Pool, Process def nijou(inputs): x = inputs print('input: %d' % x) time.sleep(2) retValue = x * x print('double: %d' % retValue) return(retValue) class PoolApply: def __init__(self, processes): self.processes = processes def pool_apply(self): p = Pool(self.processes) stime = time.time() values = [x for x in range(10)] #print(values) # not list #for x in range(10): result = p.apply(nijou, args=[values[9]]) print(result) print('time is ', time.time() -stime) p.close() def pool_apply_async(self): p = Pool(self.processes) stime = time.time() # プロセスを2つ非同期で実行 values = [x for x in range(10)] result = p.apply_async(nijou, args=[values[9]]) result2 = p.apply_async(nijou, args=[values[9]]) print(result.get()) print(result2.get()) print('time is ', time.time() -stime) p.close() if __name__ == "__main__": case_no = int(sys.argv[1]) num_process = int(sys.argv[2]) pool = PoolApply(num_process) if case_no==1: pool.pool_apply() elif case_no==2: pool.pool_apply_async()
import math, time import sys import concurrent.futures PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return '0' return '{}'.format(n) class MultiProcess(): def run(self): for number, prime in zip(PRIMES, map(is_prime, PRIMES)): print(f'{number} is prime: {prime}') def multi_precoss_run(self): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES,, PRIMES)): print(f'{number} is prime: {prime}') if __name__ == '__main__': multi = str(sys.argv[1]) MULTI = MultiProcess() startTime = time.time() if multi=='m': print('multi process') MULTI.multi_precoss_run() else: print('No multi process') runTime = time.time() - startTime print(f'Time:{runTime}[sec]')
$ python3 m >>>> multi process 112272535095293 is prime: 112272535095293 112582705942171 is prime: 112582705942171 112272535095293 is prime: 112272535095293 115280095190773 is prime: 115280095190773 115797848077099 is prime: 115797848077099 1099726899285419 is prime: 0 Time:0.5466821193695068[sec]
$ python3 none >>>> No multi process 112272535095293 is prime: 112272535095293 112582705942171 is prime: 112582705942171 112272535095293 is prime: 112272535095293 115280095190773 is prime: 115280095190773 115797848077099 is prime: 115797848077099 1099726899285419 is prime: 0 Time:2.1444289684295654[sec]
import concurrent.futures import urllib.request import time, sys, os URLS = ['', '', '', '', ''] # Retrieve a single page and report the URL and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return class ConcurrentFutures(): def get_detail(self): # Start the load operations and mark each future with its URL for url in URLS: try: data = load_url(url,60) except Exception as exc: print(f'{url} generated an exception: {exc}') else: print(f'{url} page is len(data) bytes') def mlti_thread_get_detail(self): # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print(f'{url} generated an exception: {exc}') else: print(f'{url} page is len(data) bytes') def main(): pool = str(sys.argv[1]) CFthread = ConcurrentFutures() startTime = time.time() if pool=='pool': print('multi thread') CFthread.mlti_thread_get_detail() else: print('no thread') CFthread.get_detail() runTime = time.time() - startTime print (f'Time:{runTime}[sec]') if __name__ == '__main__': main()
$ python3 pool >>> multi thread 〜〜〜〜 Time:7.765803098678589[sec] $ python3 none >>>> no thread 〜〜〜 Time:9.271435022354126[sec]