用Python实现优先级队列

1. heapq模块

heapq 是一个二叉堆的实现,其内部使用内置的 list 对象,对于列表中的每一个元素,其满足 a[k] <= a[2k+1] and a[k] <= a[2k+2] ,因此,该方法默认的是一个最小堆,a[0] 为队列中的最小元素。
和教科书上不一样的点在于:1. 使用了基于零开始的索引, 2. pop 方法返回了最小的元素,而不是最大的。
在这里插入图片描述

最小优先队列实现

由于 heapq 模块默认的就是最小堆结构,所以可以直接利用其中的函数操作来实现最小优先队列的功能。
注意heapq是函数,要修改的list是其中一个输入

import heapq
arr = [3,1,5,7,2,6,9,3,5]
print("原数组:{0}".format(arr))
# 将给定的列表转化为最小堆,线性时间
heapq.heapify(arr)
print("最小堆数组:{0}".format(arr))

# 插入元素
heapq.heappush(arr, 2)
print("插入新元素后:{0}".format(arr))

# 弹出最小元素
item0 = heapq.heappop(arr)
print("弹出的元素后:{0}".format(arr))

# 返回最小元素
item1 = arr[0]
print("获取最小元素的值:{0}".format(item1))

# 弹出最小元素,并插入一个新的元素,相当于先 heappop, 再 heappush
item2 = heapq.heapreplace(arr, -2)
print("弹出的元素为:{0}".format(item2))
print("现在的堆结构为:{0}".format(arr))
# 原数组:[3, 1, 5, 7, 2, 6, 9, 3, 5]
# 最小堆数组:[1, 2, 5, 3, 3, 6, 9, 7, 5]
# 插入新元素后:[1, 2, 5, 3, 2, 6, 9, 7, 5, 3]
# 弹出的元素后:[2, 2, 5, 3, 3, 6, 9, 7, 5]
# 获取最小元素的值:2
# 弹出的元素为:2
# 现在的堆结构为:[-2, 2, 5, 3, 3, 6, 9, 7, 5]

最大优先队列的实现

# 阅读 heapq 模块的源代码可以发现,其内置了最大优先队列的实现函数和操作函数,但没有内置新插入元素的函数
arr = [3,1,5,7,2,6,9,3,5]
print("原数组:{0}".format(arr))
# 将给定的列表转化为最大堆,线性时间
heapq._heapify_max(arr)
print("最大堆数组:{0}".format(arr))

# 弹出最大元素
item0 = heapq._heappop_max(arr)
print("弹出的元素为:{0}".format(item0))
print("弹出的元素后:{0}".format(arr))

# 弹出最大元素,并插入一个新的元素
item1 = heapq._heapreplace_max(arr, 9)
print("弹出的元素为:{0}".format(item1))
print("现在的堆结构为:{0}".format(arr))

# 原数组:[3, 1, 5, 7, 2, 6, 9, 3, 5]
# 最大堆数组:[9, 7, 6, 5, 2, 3, 5, 3, 1]
# 弹出的元素为:9
# 弹出的元素后:[7, 5, 6, 3, 2, 3, 5, 1]
# 弹出的元素为:7
# 现在的堆结构为:[9, 5, 6, 3, 2, 3, 5, 1]

复杂结构的优先队列

元组类型

q = []
heapq.heappush(q, (2, 'code'))
heapq.heappush(q, (1, 'eat'))
heapq.heappush(q, (3, 'sleep'))
heapq.heappush(q, (2, 'play'))
heapq.heappush(q, (3, "debug"))
q1 = [x for x in q]
while q:
    next_item = heapq.heappop(q)
    print(next_item)
# ---- result -----
# (1, 'eat')
# (2, 'code')
# (2, 'play')
# (3, 'debug')
# (3, 'sleep')

# 返回最小的 n 个元素,相当于 sorted(iterable, key=key)[:n]
n_smallest = heapq.nsmallest(3, q1, key=lambda x: x[0])
print("最小的3个元素:{0}".format(n_smallest))
# 返回最大的 n 个元素,相当于 sorted(iterable, key=key, reverse=True)[:n]
n_largest = heapq.nlargest(3, q1, key=lambda x: x[1])
print("最大的3个元素:{0}".format(n_largest))

# 最小的3个元素:[(1, 'eat'), (2, 'code'), (2, 'play')]
# 最大的3个元素:[(3, 'sleep'), (2, 'play'), (1, 'eat')]

链表指针等类型

如果是链表指针,需要靠指针指向元素的值来比较大小的话,可以用如下的方法.
方法一:直接加入链表的元素的值,这种方法有个缺点,就是完全抛弃了指针,如果想靠指针取得元素其他的性质就行不通了,比如员工按工资排序,然后想获得姓名, 当然了,也可以通过都加到元组里,不过很麻烦

class ListNode:
    def __init__(self, x):
        self.val = x
        self.next = None

heap = []
arr = [ListNode(1), ListNode(5), ListNode(2), ListNode(4),ListNode(8), ListNode(5), ListNode(3), ListNode(4)]
for i in range(len(arr)):
    if arr[i] :
        #先按链表的值排序,如果值相同就按插入先后顺序排序
        heapq.heappush(heap, (arr[i].val, i))
print(heap)
# [(1, 0), (4, 3), (2, 2), (4, 7), (8, 4), (5, 5), (3, 6), (5, 1)]

方法二: 直接动class 定义.

class ListNode:
    def __init__(self, x):
        self.val = x
        self.next = None
    def __repr__(self):
        return str(self.val)
    def __lt__(self, other):
        return self.val < other.val
heap = []
arr = [ListNode(1), ListNode(5), ListNode(2), ListNode(4),ListNode(8), ListNode(5), ListNode(3), ListNode(4)]
for ln in arr:
    heapq.heappush(heap,ln)
print(heap)
# [1, 4, 2, 4, 8, 5, 3, 5]

3. PriorityQueue模块

该模块定义的优先级队列,其内部使用了 heapq 模块,所以它的时间复杂度和heapq是相同的。

当一个对象的所有元素都是可比较的时,默认情况下是根据队列中的对象的第一个元素进行排序,越小的优先级越高,排在越前面。当第一个元素相同时,依次比较后续的元素的大小来进行排序。

由于 PriorityQueue 是继承自 Queue 类,所以很多函数的用法可以直接参照于 Queue 类中的函数。

from queue import PriorityQueue as PQ
pq = PQ()
pq.put((1, 'a'))
pq.put((2, 'c'))
pq.put((2, 'b'))
pq.put((2, 'b'))
print(pq.queue) # [(1, 'a'), (2, 'b'), (2, 'b'), (2, 'c')]
item0 = pq.get() 
print(item0) # (1, 'a')
print(pq.queue) # [(2, 'b'), (2, 'b'), (2, 'c')]

print(pq.qsize()) # 优先队列的尺寸 3

while not pq.empty():
    print(pq.get())
# (2, 'b')
# (2, 'b')
# (2, 'c')
print(pq.queue) # []

4. 实现自己的优先队列

在面向对象的编程过程中,我们通常是将一些单独的函数或变量组合成一个对象,然后在进行优先级排列。例如我们现在有很多种汽车,汽车有名字和价格,以及一些操作方法。当我们对汽车对象来按照价格进行优先级排列时,由于自定义的对象是不可比较的,所以在进行优先级排列时会报错。因此对于那些自定义的对象,我们需要重写优先级队列的方法来进行实现。

由于 PriorityQueue 也是基于 heapq 实现的,所以我们自定义的优先级队列可以直接基于 heapq 模块来实现。

import heapq

class My_PriorityQueue(object):
    def __init__(self):
        self._queue = []
        self._index = 0

    def push(self, item, priority):
        """
        队列由 (priority, index, item) 形式组成
        priority 增加 "-" 号是因为 heappush 默认是最小堆
        index 是为了当两个对象的优先级一致时,按照插入顺序排列
        """
        heapq.heappush(self._queue, (-priority, self._index, item))
        self._index += 1

    def pop(self):
        """
        弹出优先级最高的对象
        """
        return heapq.heappop(self._queue)[-1]

    def qsize(self):
        return len(self._queue)

    def empty(self):
        return True if not self._queue else False

class Car(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def __repr__(self):
        return "{0} -- {1}".format(self.name, self.value)

if __name__ == "__main__":
    car1 = Car("BMW", 45)
    car2 = Car("Maybach", 145)
    car3 = Car("Bugatti", 85)
    car4 = Car("Cadillac", 78)
    car5 = Car("Maserati", 85)
    pq = My_PriorityQueue()
    pq.push(car1, car1.value)
    pq.push(car2, car2.value)
    pq.push(car3, car3.value)
    pq.push(car4, car4.value)
    pq.push(car5, car5.value)
    print("队列大小:{0}".format(pq.qsize()))
    # 弹出元素
    while not pq.empty():
        print(pq.pop())
        
# 队列大小:5
# Maybach -- 145
# Bugatti -- 85
# Maserati -- 85
# Cadillac -- 78
# BMW -- 45