一、简介
多个线程之间的数据是共享的,多个线程进行数据交换的时候,不能够保证数据的安全性和一致性,所以当多个线程需要进行数据交换的时候,队列就出现了,队列可以完美解决线程间的数据交换,保证线程间数据的安全性和一致性。
二、队列类型
import randomqueue_data = [1,2,3,4,5,6]random.shuffle(queue_data)print('原数据:',queue_data)queue_test = queue.Queue()def queuePut(value): queue_test.put(value)for i in queue_data: queuePut(i)for i in range(len(queue_data)): print(queue_test.get())
import queueimport randomqueue_data = [1,2,3,4,5,6]random.shuffle(queue_data)print('原数据:',queue_data)queue_test = queue.LifoQueue()def queuePut(value): queue_test.put(value)for i in queue_data: queuePut(i)for i in range(len(queue_data)): print(queue_test.get())
import queueimport randomqueue_data = [1,2,3,4,5,6]random.shuffle(queue_data)print('原数据:',queue_data)queue_test = queue.PriorityQueue()def queuePut(value): queue_test.put(value)for i in queue_data: queuePut(i)for i in range(len(queue_data)): print(queue_test.get())
三、队列方法
四、队列+线程、队列+线程池
案例:简单的两人对话
import queueimport threadingimport timeclass ChatSend(threading.Thread): def __init__(self, name, message_queue): super().__init__() self.name = name self.message_queue = message_queue def run(self): '''使用队列中的任务跟踪''' message = input(f'{self.name}:') self.message_queue.put({ 'name': self.name, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) }) self.message_queue.join()class ChatReceive(threading.Thread): def __init__(self, name, message_queue): super().__init__() self.name = name self.message_queue = message_queue def run(self): message_info = self.message_queue.get() print(f'\n{message_info['time']}') print(f'{self.name}接收到{message_info['name']}消息:{message_info['message']}\n') self.message_queue.task_done()def chat(chat_person1, chat_person2): message_queue = queue.Queue() print(f'----------{chat_person1}和{chat_person2}正在对话----------') while True: send1 = ChatSend(chat_person1, message_queue) receive1 = ChatReceive(chat_person2, message_queue) send1.start() receive1.start() receive1.join() send2 = ChatSend(chat_person2,message_queue) receive2 = ChatReceive(chat_person1,message_queue) send2.start() receive2.start() receive2.join()chat('张三', '李四')
import queueimport threadingimport timeclass ChatSend(threading.Thread): def __init__(self, name, message_queue, lock=None): super().__init__() self.name = name self.message_queue = message_queue self.lock = lock def run(self): self.lock.acquire() message = input(f'{self.name}:') self.message_queue.put({ 'name': self.name, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) }) self.lock.notify() self.lock.release() class ChatReceive(threading.Thread): def __init__(self, name, message_queue, lock=None): super().__init__() self.name = name self.message_queue = message_queue self.lock = lock def run(self): self.lock.acquire() self.lock.wait() message_info = self.message_queue.get() print(f'\n{message_info['time']}') print(f'{self.name}接收到{message_info['name']}消息:{message_info['message']}\n') self.lock.release()def chat(chat_person1, chat_person2): message_queue = queue.Queue() lock = threading.Condition() print(f'----------{chat_person1}和{chat_person2}正在对话----------') while True: receive1 = ChatReceive(chat_person2, message_queue, lock) send1 = ChatSend(chat_person1, message_queue, lock) receive1.start() send1.start() receive1.join() receive2 = ChatReceive(chat_person1, message_queue, lock) send2 = ChatSend(chat_person2,message_queue, lock) receive2.start() send2.start() receive2.join()chat('张三', '李四')
import queueimport threadingimport timefrom concurrent.futures import ThreadPoolExecutor,waitlock = threading.Condition()class ChatSend: def __init__(self, name, message_queue): self.send_person = name self.message_queue = message_queue def run(self): message = input(f'{self.send_person}:') self.message_queue.put({ 'name': self.send_person, 'message': message, 'time': time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) }) self.message_queue.join()class ChatReceive: def __init__(self, name, message_queue): self.receive_person = name self.message_queue = message_queue def run(self): message_info = self.message_queue.get() print(f'\n{message_info['time']}') print(f'{message_info['name']}对{self.receive_person}说:{message_info['message']}\n') self.message_queue.task_done()class Chat(ThreadPoolExecutor): def __init__(self,person1, person2): ThreadPoolExecutor.__init__(self,max_workers=2) message_queue = queue.Queue() print(f'----------{person1}和{person2}正在对话----------') self.send1 = ChatSend(person1, message_queue) self.receive1 = ChatReceive(person2, message_queue) self.send2 = ChatSend(person2, message_queue) self.receive2 = ChatReceive(person1, message_queue) def start(self): while True: self.submit(self.send1.run) self.submit(self.receive1.run) self.submit(self.send2.run) self.submit(self.receive2.run)chat = Chat('张三', '李四')chat.start()
联系客服