Python中使用多进程主要方式有两种:一种方法是使用os模块中的fork方法,另外一种是multiprocessing模块。区别在于前者仅仅适用于Unix/Linux操作系统,后者跨平台方式。
- 使用os模块中的fork方式 普通的方法是调用一次,返回一次,而fork方法是调用一次,返回两次,原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这个两个进程几乎是相同的,于是fork分别从父进程和子进程中返回。子进程中永远返回0,fujinchn父进程返回的是子进程的ID。
import os
if __name__=='__main__':
print('Current Process (%s) start ...' % os.getpid())
pid = os.fork()
if pid< 0:
print('error in fork')
elif pid == 0:
print('I am child process (%s) and my parent process is (%s)', os.getpid(), os.getppid())
else:
print('I(%s) created a child process (%s)', os.getpid(),pid)
###windows下无法运行该代码。
- 使用multiprocessing模块 使用Process类描述一个进程对象。创建子进程的时候传一个执行函数和相应的参数,用start()启动一个进程,用join()方法实现进程间的同步。
import os
from multiprocessing import Process
def run_proc(name):
print('Child process %s (%s) Running ' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
for i in range(5):
p = Process(target=run_proc, args=(str(i),))
print('Process wills start')
p.start()
p.join()
print('Process end')
multiprocessing模块提供了一个Pool类来代表进程池对象
from multiprocessing import Pool
import os, time, random
def run_task(name):
print ('Task %s (pid = %s) is running...' % (name, os.getpid()))
time.sleep(random.random() * 3)
print ('Task %s end.' % name)
if __name__ == '__main__':
print ('Current process %s.' % os.getpid())
p = Pool(processes=3)
for i in range(5):
p.apply_async(run_task, args=(i,))
print ('Waiting for all subprocess done...')
p.close()
p.join()
print ('All subprocess done')
D:\SoftWare\Python3.6\python.exeE:/Python/test20.py
Current process 6608.
Waiting for all subprocess done...
Task 0 (pid = 6868) is running...
Task 1 (pid = 8216) is running...
Task 2 (pid = 1496) is running...
Task 0 end.
Task 3 (pid = 6868) is running...
Task 2 end.
Task 4 (pid = 1496) is running...
Task 3 end.
Task 1 end.
Task 4 end.
All subprocess done
创建了容器为3的进程池,添加了5个任务,但是一开始只运行了三个,而且每次只能运行三个。新的任务依次添加进来,使用的进程还是原来的进程,通过pid看出来。
###Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process。
##多进程之间的通信 这里用到的是Queue和Pipe两种方式进行通信。两者的区别就是Pipe经常用来两个进程间的通信,Queue用来在多个进程间实现通信。 1.Queue通信:用get和put进行操作Queue. Put方法用插入数据到队列中,有两个可选参数:blocked和timeout,如果blocked为True(默认值),并且timeout为正值,该方法为阻塞timeout指定的时间,直到队列有剩余时间。如果超时,会抛出Queue.Full的异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 Get方法可以从队列读取并且删除一个元素。同样,Get方法有两个可选参数:blocked和timeout。如果blocked为True,并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,分两种情况:如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。
Queue:
from multiprocessing import Process, Queue
import os, time, random
#写数据进程执行的代码:
def proc_write(q, urls):
print('Process(%s) is writing...' % os.getpid())
for url in urls:
q.put(url)
print('Put %s to queue...' % url)
time.sleep(random.random())
#读数据进程执行的代码:
def proc_read(q):
print('Process(%s) is reading...' % os.getpid())
while True:
url = q.get(True)
print('Get %s from queue' % url)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
proc_writer1 = Process(target=proc_write, args=(q, ['url_1', 'url_2', 'url_3']))
proc_writer2 = Process(target=proc_write, args=(q, ['url_4', 'url_5', 'url_6']))
proc_reader = Process(target=proc_read, args=(q,))
#启动子进程proc_writer,写入
proc_writer1.start()
proc_writer2.start()
#启动子进程proc_reader,读取
proc_reader.start()
#等待proc_writer结束
proc_writer1.join()
proc_writer2.join()
#proc_reader是死循环,只能强行停止程序运行
proc_reader.terminate()
Pipe: Pipe有一个参数,为True(默认值),这个管道就是全双工,都能收和发,若为False,一头发,一头收。
import multiprocessing
import os, time, random
def proc_send(pipe, urls):
for url in urls:
print('Process(%s) send: %s' % (os.getpid(), url))
pipe.send(url)
time.sleep(random.random())
def proc_recv(pipe):
while True:
print('Process(%s) rev:%s' % (os.getpid(), pipe.recv()))
time.sleep(random.random())
if __name__=='__main__':
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc_send, args=(pipe[0], ['url_'+str(i) for i in range(10)]))
p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
本文由 Ryan 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2017/12/20 16:45