一个高效的迭代器,用于获取列表的前 k 个最小值

an efficient iterator for getting the top k minimum of a list

我有一个包含许多未排序数字的列表,例如:

N=1000000
x = [random.randint(0,N) for i in range(N)]

我只想要前 k 个最小值,目前这是我的方法

def f1(x,k): # O(nlogn)
    return sorted(x)[:k]

这会执行很多冗余操作,因为我们还要对剩余的 N-k 个元素进行排序。枚举也不起作用:

def f2(x,k): # O(nlogn)
    y = []
    for idx,val in enumerate( sorted(x) ):
        if idx == k: break
        y.append(val)
    return y

验证枚举没有帮助:

if 1 : ## Time taken = 0.6364126205444336
    st1 = time.time()
    y = f1(x,3)
    et1 = time.time()
    print('Time taken = ', et1-st1)

if 1 : ## Time taken = 0.6330435276031494
    st2 = time.time()
    y = f2(x,3)
    et2 = time.time()
    print('Time taken = ', et2-st2)

可能我需要一个生成器连续 returns 列表的下一个最小值,并且由于获得下一个最小值应该是 O(1) 操作,函数 f3() 应该只是 O(k) 对吧? 在这种情况下,哪种 GENERATOR 函数最有效?

def f3(x,k): # O(k)
    y = []
    for idx,val in enumerate( GENERATOR ):
        if idx == k: break
        y.append(val)
    return y

编辑 1

The analysis shown here are wrong, please ignore and jump to Edit 3

可能的最低限度:就时间复杂度而言,我认为这是可实现的下限,但因为它将增加原始列表,所以它是 不是我的问题的解决方案。

def f3(x,k): # O(k) Time
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min) # This removes from the original list
        y.append(curr_min)
        idx += 1
    return y

if 1 : ## Time taken = 0.07096505165100098
    st3 = time.time()
    y = f3(x,3)
    et3 = time.time()
    print('Time taken = ', et3-st3)

O(N) 次 | O(N) 存储:迄今为止的最佳解决方案,但是它需要原始列表的副本,因此导致 O(N) 时间和存储,具有获得下一个最小值的迭代器,k 次,将是 O(1) 存储和 O(k) 时间。

def f3(x,k): # O(N) Time | O(N) Storage
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y

if 1 : ## Time taken = 0.0814204216003418
    st3 = time.time()
    y = f3(x,3)
    et3 = time.time()
    print('Time taken = ', et3-st3)

编辑 2

Thanks for pointing out my above mistakes, getting minimum of a list should be O(n), not O(1).

编辑 3

Here's a full script of analysis after using the recommended solution. Now this raised more questions

1) Constructing x as a heap using heapq.heappush is slower than using list.append x to a list, then to heapq.heapify it ?

2) heapq.nsmallest slows down if x is already a heap?

3) Current conclusion : don't heapq.heapify the current list, then use heapq.nsmallest.

import time, random, heapq
import numpy as np

class Timer:
    def __init__(self, description):
        self.description = description
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    def __exit__(self, *args):
        end = time.perf_counter()
        print(f"The time for '{self.description}' took: {end - self.start}.")


def f3(x,k):
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y

def f_sort(x, k):
    y = []
    for idx,val in enumerate( sorted(x) ):
        if idx == k: break
        y.append(val)
    return y

def f_heapify_pop(x, k):
    heapq.heapify(x)
    return [heapq.heappop(x) for _ in range(k)]
def f_heap_pop(x, k):
    return [heapq.heappop(x) for _ in range(k)]

def f_heap_nsmallest(x, k):
    return heapq.nsmallest(k, x)

def f_np_partition(x, k):
    return np.partition(x, k)[:k]

if True : ## Constructing list vs heap
    N=1000000
    # N= 500000
    x_main = [random.randint(0,N) for i in range(N)]
    with Timer('constructing list') as t:
        x=[]
        for curr_val in x_main:
            x.append(curr_val)
    with Timer('constructing heap') as t:
        x_heap=[]
        for curr_val in x_main:
            heapq.heappush(x_heap, curr_val)
    with Timer('heapify x from a list') as t:
        x_heapify=[]
        for curr_val in x_main:
            x_heapify.append(curr_val)
        heapq.heapify(x_heapify)
    with Timer('x list to numpy') as t:
        x_np = np.array(x)
    """
    N=1000000
        The time for 'constructing list' took: 0.2717265225946903.
        The time for 'constructing heap' took: 0.45691753178834915.
        The time for 'heapify x from a list' took: 0.4259336367249489.
        The time for 'x list to numpy' took: 0.14815033599734306. 
    """

if True : ## Performing experiments on list vs heap
    TRIALS = 10
    ## Experiments on x as list : 
    with Timer('f3') as t:
        for _ in range(TRIALS):
            y = f3(x.copy(), 30)
        print(y)
    with Timer('f_sort') as t:
        for _ in range(TRIALS):
            y = f_sort(x.copy(), 30)
        print(y)
    with Timer('f_np_partition on x') as t:
        for _ in range(TRIALS):
            y = f_np_partition(x.copy(), 30)
        print(y)
    ## Experiments on x as list, but converted to heap in place : 
    with Timer('f_heapify_pop on x') as t:
        for _ in range(TRIALS):
            y = f_heapify_pop(x.copy(), 30)
        print(y)
    with Timer('f_heap_nsmallest on x') as t:
        for _ in range(TRIALS):
            y = f_heap_nsmallest(x.copy(), 30)
        print(y)
    ## Experiments on x_heap as heap : 
    with Timer('f_heap_pop on x_heap') as t:
        for _ in range(TRIALS):
            y = f_heap_pop(x_heap.copy(), 30)
        print(y)
    with Timer('f_heap_nsmallest on x_heap') as t:
        for _ in range(TRIALS):
            y = f_heap_nsmallest(x_heap.copy(), 30)
        print(y)
    ## Experiments on x_np as numpy array : 
    with Timer('f_np_partition on x_np') as t:
        for _ in range(TRIALS):
            y = f_np_partition(x_np.copy(), 30)
        print(y)
    # 
    """
    Experiments on x as list : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f3' took: 10.180440502241254.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_sort' took: 9.054768254980445.
        [ 1  5  5  1  0  4  5  6  7  6  7  7 12 12 11 13 11 12 13 18 10 14 10 18 19 19 21 22 24 25]
        The time for 'f_np_partition on x' took: 1.2620676811784506.

    Experiments on x as list, but converted to heap in place : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heapify_pop on x' took: 0.8628390356898308.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_nsmallest on x' took: 0.5187360178679228.

    Experiments on x_heap as heap : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_pop on x_heap' took: 0.2054140530526638.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_nsmallest on x_heap' took: 0.6638103127479553.
        [ 1  5  5  1  0  4  5  6  7  6  7  7 12 12 11 13 11 12 13 18 10 14 10 18 19 19 21 22 24 25]
        The time for 'f_np_partition on x_np' took: 0.2107151597738266.
    """

这是一个经典问题,普遍接受的解决方案是一种称为 heap 的数据结构。下面我对每个算法 f3f_heap 做了 10 次试验。随着第二个参数 k 的值变大,两个性能之间的差异变得更大。对于 k = 3,我们的算法 f3 耗时 0.76 秒,算法 f_heap 耗时 0.54 秒。但是对于 k = 30,这些值分别变为 6.33 秒和 .54 秒。

import time, random, heapq

class Timer:
    def __init__(self, description):
        self.description = description

    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, *args):
        end = time.perf_counter()
        print(f"The time for {self.description} took: {end - self.start}.")


def f3(x,k): # O(N) Time | O(N) Storage
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y


def f_heap(x, k): # O(nlogn)
    # if you do not need to retain a heap and just need the k smallest, then:
    #return heapq.nsmallest(k, x)

    heapq.heapify(x)
    return [heapq.heappop(x) for _ in range(k)]



N=1000000
x = [random.randint(0,N) for i in range(N)]

TRIALS = 10

with Timer('f3') as t:
    for _ in range(TRIALS):
        y = f3(x.copy(), 30)
print(y)

print()

with Timer('f_heap') as t:
    for _ in range(TRIALS):
        y = f_heap(x.copy(), 30)
print(y)

打印:

The time for f3 took: 6.3301973.
[0, 1, 1, 7, 9, 11, 11, 13, 13, 14, 17, 18, 18, 18, 19, 20, 20, 21, 23, 24, 25, 25, 26, 27, 28, 28, 29, 30, 30, 31]

The time for f_heap took: 0.5372357999999995.
[0, 1, 1, 7, 9, 11, 11, 13, 13, 14, 17, 18, 18, 18, 19, 20, 20, 21, 23, 24, 25, 25, 26, 27, 28, 28, 29, 30, 30, 31]

A Python Demo

更新

按照@user2357112supportsMonica 的建议,使用 numpy.partition 选择最小的 k 确实非常快 如果 您已经在处理 numpy 数组。但是如果你从一个普通的列表开始,并且考虑到转换为 numpy 数组的时间只是为了使用 numpy.partition 方法,那么它比使用 hepaq 方法慢:

def f_np_partition(x, k):
    return sorted(np.partition(x, k)[:k])


with Timer('f_np_partition') as t:
    for _ in range(TRIALS):
        x_np = np.array(x)
        y = f_np_partition(x_np.copy(), 30) # don't really need to copy
print(y)

相对时间:

The time for f3 took: 7.2039111.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]

The time for f_heap took: 0.35521280000000033.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]

The time for f_np_partition took: 0.8379164999999995.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]