簡単なスレッド処理の実装2
ネットから拝借してきていた、並列処理用のクラスを改良してみました。
オリジナルで処理がブロッキングされていた部分を修正して、
効率がチョット上がっています。
テストコードでしか、使用していないのでバグがありそうですが、
とりあえず書いてみました。
#! /usr/bin/python # -*- encoding: utf-8 -*- from __future__ import unicode_literals from multiprocessing import Process, Pipe, BoundedSemaphore class MyPool: """マルチスレッドで関数を実行するためのクラスです。""" def __init__(self, proc_num): self.pool_sema = BoundedSemaphore(proc_num) def map(self, func, args): """指定した関数funcにargsの引数をわたし並列処理します。 """ def pipefunc(conn,arg): """子プロセスに引数をセットします。 子プロセスが終了すると、conn.send(func(arg))の次の行に処理が進みます。 """ conn.send(func(arg)) conn.close() self.pool_sema.release() k = 0 exec_list = [] #子プロセスを配列に格納 return_conn = [] #戻り値を取り出すためのパイプを配列に入れて管理 return_list = [] #子プロセスから取り出した、戻り値配列 while(k < len(args)): self.pool_sema.acquire() parent_conn, child_conn = Pipe() #Pipeは、Connectionオブジェクトの親用と子用を作成。send()とrecv()を使って通信 child_exe = Process(target = pipefunc, args=(child_conn,args[k],))#args=(Connectionオブジェクト,キューオブジェクト) exec_list.append(child_exe) return_conn.append(parent_conn) k = k + 1 child_exe.start() for conn,child_exe in zip(return_conn,exec_list): """子プロセスで戻り値があれば、この場所で取得 conn.recv()で処理がブロッキングされるので、joinは不要なのですが なんとなくjoinを実行しています。 """ return_list.append(conn.recv()) child_exe.join() return return_list import random,time def make_mother(max_child): class Test(): """並列処理したい処理のクラス""" def __init__(self, type_name = "hoge"): self.type_name = type_name def run(self): for i in range(1,random.randint(3, 12)): time.sleep(1) print "[name=%s]:%s %s" %(self.type_name," "*i,i) print " end [name=%s]" % self.type_name return "%s end" % self.type_name def make_child(name = "hoge"): """子プロセスの生成関数""" t = Test(name) return t.run() p = MyPool(max_child) # プロセスのプールを作成 list = p.map(make_child, range(15)) # 戻り値のリスト = p.map(子プロセスの生成関数, 子プロセスへの引数のリスト) return list if __name__ == '__main__': print make_mother(3)