Python 进程间通信 Queue / Pipe

ChatGPT 3.5 国内中文镜像站免费使用啦

零基础 Python 学习路线推荐 : Python 学习目录 >> Python 基础入门


一.前言

      1.在前一篇文章 Python 进程 Process 与线程 threading 区别 中讲到线程 threading 共享内存地址,进程与进程 Peocess 之间相互独立,互不影响(相当于深拷贝);

      2.在线程间通信的时候可以使用 Queue 模块完成,进程间通信也可以通过 Queue完成,但是此 Queue 并非线程的 Queue ,进程间通信 Queue 是将数据 pickle 后传给另一个进程的 Queue,用于父进程与子进程之间的通信或同一父进程的子进程之间通信


1.使用 Queue 线程间通信

# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""

#导入线程相关模块
import threading
import queue  

q = queue.Queue()

2.使用 Queue 进程间通信,适用于多个进程之间通信

# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""

# 导入进程相关模块
from multiprocessing import Process
from multiprocessing import Queue 

q = Queue()

3.使用Pipe进程间通信,适用于两个进程之间通信(一对一)

# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""


# 导入进程相关模块 
from multiprocessing import Process 
from multiprocessing import Pipe 

pipe = Pipe()

二.python进程间通信Queue/Pipe使用

      Python 提供了多种进程通信的方式,主要 Queue 和 Pipe 这两种方式,Queue 用于多个进程间实现通信,Pipe 用于两个进程的通信;


1.使用Queue进程间通信

  •      put :以插入数据到队列中,他还有两个可选参数:blocked 和 timeout 。详情自行百度
  •      get :从队列读取并且删除一个元素。同样还有两个可选参数:blocked 和timeout , 详情自行百度
# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""

from multiprocessing import Process
from multiprocessing import Queue
import os,time,random

#写数据进程执行的代码
def proc_write(q,urls):
    print ('Process is write....')
    for url in urls:
        q.put(url)
        print ('put %s to queue... ' %url)
        time.sleep(random.random())

#读数据进程的代码
def proc_read(q):
    print('Process is reading...')
    while True:
        url = q.get(True)
        print('Get %s from queue' %url)

if __name__ == '__main__':
    #父进程创建Queue,并传给各个子进程
    q = Queue()
    proc_write1 = Process(target=proc_write,args=(q,['url_1','url_2','url_3']))
    proc_write2 = Process(target=proc_write,args=(q,['url_4','url_5','url_6']))
    proc_reader = Process(target=proc_read,args=(q,))
    #启动子进程,写入
    proc_write1.start()
    proc_write2.start()

    proc_reader.start()
    #等待proc_write1结束
    proc_write1.join()
    proc_write2.join()
    #proc_raader进程是死循环,强制结束
    proc_reader.terminate()
    print("mian")

'''
输出结果:

Process is write....
put url_1 to queue... 
Process is write....
put url_4 to queue... 
Process is reading...
Get url_1 from queue
Get url_4 from queue
put url_5 to queue... 
Get url_5 from queue
put url_2 to queue... 
Get url_2 from queue
put url_3 to queue... 
Get url_3 from queue
put url_6 to queue... 
Get url_6 from queue
mian
'''

2.使用 Pipe 进程间通信

      Pipe 常用于两个进程,两个进程分别位于管道的两端 Pipe 方法返回(conn1,conn2)代表一个管道的两个端,Pipe 方法有 duplex 参数,默认为True ,即全双工模式,若为 FALSE ,conn1 只负责接收信息,conn2 负责发送, Pipe 同样也包含两个方法:

send : 发送信息;

recv : 接收信息;

# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""

from multiprocessing import Process
from multiprocessing import Pipe
import os,time,random
#写数据进程执行的代码
def proc_send(pipe,urls):
    #print 'Process is write....'
    for url in urls:

        print ('Process is send :%s' %url)
        pipe.send(url)
        time.sleep(random.random())

#读数据进程的代码
def proc_recv(pipe):
    while True:
        print('Process rev:%s' %pipe.recv())
        time.sleep(random.random())

if __name__ == '__main__':
    #父进程创建pipe,并传给各个子进程
    pipe = Pipe()
    p1 = Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10) ]))
    p2 = Process(target=proc_recv,args=(pipe[1],))
    #启动子进程,写入
    p1.start()
    p2.start()

    p1.join()
    p2.terminate()
    print("mian")

'''
输出结果:

Process is send :url_0
Process rev:url_0
Process is send :url_1
Process rev:url_1
Process is send :url_2
Process rev:url_2
Process is send :url_3
Process rev:url_3
Process is send :url_4
Process rev:url_4
Process is send :url_5
Process is send :url_6
Process is send :url_7
Process rev:url_5
Process is send :url_8
Process is send :url_9
Process rev:url_6
mian
'''

三.测试 queue.Queue 来完成进程间通信能否成功?

      当然我们也可以尝试使用线程 threading 的 Queue 是否能完成线程间通信,示例代码如下:

# !usr/bin/env python
# -*- coding:utf-8 _*-
"""
@Author:猿说编程
@Blog(个人博客地址): www.codersrc.com
@File:Python 进程间通信 Queue / Pipe.py
@Time:2021/05/09 07:37
@Motto:不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
 
"""


from multiprocessing import Process
# from multiprocessing import Queue     # 进程间通信Queue,两者不要混淆
import queue                            # 线程间通信queue.Queue,两者不要混淆
import time

def p_put(q,*args):
    q.put(args)
    print('Has put %s' % args)


def p_get(q,*args):
    print('%s wait to get...' % args)

    print(q.get())
    print('%s got it' % args)


if __name__ == "__main__":
    q = queue.Queue()
    p1 = Process(target=p_put, args=(q,'p1', ))
    p2 = Process(target=p_get, args=(q,'p2', ))
    p1.start()
    p2.start()

'''
直接异常报错:

Traceback (most recent call last):
  File "E:/Project/python_project/untitled10/123.py", line 38, in <module>
    p1.start()
  File "G:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "G:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "G:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "G:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
'''

四.猜你喜欢

  1. Python 条件推导式
  2. Python 列表推导式
  3. Python 字典推导式
  4. Python 不定长参数 *argc/**kargcs
  5. Python 匿名函数 lambda
  6. Python return 逻辑判断表达式
  7. Python is 和 == 区别
  8. Python 可变数据类型和不可变数据类型
  9. Python 浅拷贝和深拷贝
  10. Python 异常处理
  11. Python 线程创建和传参
  12. Python 线程互斥锁 Lock
  13. Python 线程时间 Event
  14. Python 线程条件变量 Condition
  15. Python 线程定时器 Timer
  16. Python 线程信号量 Semaphore
  17. Python 线程障碍对象 Barrier
  18. Python 线程队列 Queue – FIFO
  19. Python 线程队列 LifoQueue – LIFO
  20. Python 线程优先队列 PriorityQueue
  21. Python 线程池 ThreadPoolExecutor(一)
  22. Python 线程池 ThreadPoolExecutor(二)
  23. Python 进程 Process 模块
  24. Python 进程 Process 与线程 threading 区别
  25. Python 进程间通信 Queue / Pipe

ChatGPT 3.5 国内中文镜像站免费使用啦
© 版权声明
THE END
喜欢就支持一下吧
点赞3 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容