本文共 2886 字,大约阅读时间需要 9 分钟。
消息中间件 --->就是消息队列
异步方式:不需要立马得到结果,需要排队
同步方式:需要实时获得数据,坚决不能排队
例子:
#多进程模块multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
def write(q):
for i in ["a", "b", "c", "d"]:
q.put(i)
print ("put {0} to queue".format(i))
def read(q):
while 1:
result = q.get()
print ("get {0} from queue".format(result))
#写一个主函数
def main():
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
#终止pr线程
pr.terminate()
if __name__ == '__main__':
#调用主函数
main()
输出:
put a to queue
put b to queue
put c to queue
put d to queue
多进程模块multiprocessing中pipe方法实现消息队列
例子:
from multiprocessing import Pipe, Process
import time
def proce1(pipe):
for i in xrange(1, 10):
pipe.send(i)
print ("send {0} to pipe".format(i))
time.sleep(1)
def proce2(pipe):
n = 9
while n > 0 :
result = pipe.recv()
print ("recv {0} from pipe".format(result))
def main():
pipe = Pipe(duplex=False)
print (type(pipe))
p1 = Process(target=proce1, args=(pipe[1],))
p2 = Process(target=proce2, args=(pipe[0],))
p1.start()
p2.start()
p1.join()
p2.join()
pipe[0].close()
pipe[1].close()
if __name__ == '__main__':
main()
输出:
<type 'tuple'>
send 1 to pipe
recv 1 from pipe
recv 2 from pipe
send 2 to pipe
recv 3 from pipe
send 3 to pipe
recv 4 from pipe
send 4 to pipe
send 5 to pipe
recv 5 from pipe
recv 6 from pipe
send 6 to pipe
send 7 to pipe
recv 7 from pipe
send 8 to pipe
recv 8 from pipe
send 9 to pipe
recv 9 from pipe
模仿生产者和消费者的多线程消息队列练习
例子:
from threading import Thread
from multiprocessing import Queue
import time
class Proceduer(Thread):
def __init__(self, queue):
super(Proceduer, self).__init__()
self.queue = queue
def run(self):
try:
for i in xrange(1, 10):
print ("put data is {0} to queue".format(i))
self.queue.put(i)
except Exception as e:
print ("put data error")
raise e
class Consumer_odd(Thread):
def __init__(self, queue):
super(Consumer_odd, self).__init__()
self.queue = queue
def run(self):
try:
while not self.queue.empty:
number = self.queue.get()
if number%2 != 0:
print ("get {0} from queue odd. thread name is {1}".format(number, self.getName()))
else:
self.queue.put(number)
time.sleep(1)
except Exception as e:
raise e
class Consumer_even(Thread):
def __init__(self, queue):
super(Consumer_even, self).__init__()
self.queue = queue
def run(self):
try:
while not self.queue.empty:
number = self.queue.get()
if number%2 == 0:
print ("get {0} from queue even.thread name is{1}".format(number, self.getName()))
else:
self.queue.put(number)
time.sleep(1)
except Exception as e:
raise e
def main():
queue = Queue()
p = Proceduer(queue=queue)
p.start()
p.join()
time.sleep(1)
c1 = Consumer_odd(queue=queue)
c2 = Consumer_even(queue=queue)
c1.start()
c2.start()
c1.join()
c2.join()
print ("ALL thread terminate")
if __name__ == '__main__':
main()