如何使 Apache Spark mapPartition 正常工作?

How to make Apache Spark mapPartition work correctly?

我正在尝试根据每个分区做一些工作,我想 return 与输入相同的数据:

from urllib3 import HTTPConnectionPool

rdd = sc.parallelize(["peter", "john", "harris"])
def sendPartition(iterator):
    pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)

    for record in iterator:
        r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'})

    return iterator


rdd.mapPartitions(sendPartition).count()

我收到这个错误:

TypeError: 'NoneType' object is not iterable

PS:这只是我想要实现的目标的简化。我想为每个元素向 ElasticSearch 发出复杂的地理搜索请求(因此我不能使用 Spark Elasticsearch 连接器)。在这个地图分区之前,我有一个巨大的过滤器,地图等管道

PPS:我已经重新启动了我的 spark,现在我得到“0”作为输出,这比错误要好,但我预计它是“3”。

关于类型错误,它看起来无法使用问题中包含的代码进行重现。我的猜测是在某些时候 None 值已被传递给 RDD 构造函数或 return 从 sendPartition.

编辑

输出为空 RDD 的问题是您使用分区迭代器的方式造成的。 PySpark 使用 itertools.chain 将数据传递给 mapPartition,它的行为或多或少与 Scala Iterator.

相同
import itertools

iter = itertools.chain(range(10))
iter.next()
## 0

完成 for 循环后

for x in iter:
    x

你最终得到一个空 chain:

type(iter)
## itertools.chain

iter.nex()
## Traceback (most recent call last)
##     ...
## StopIteration:

虽然 StopIteration 作为正常迭代逻辑的一部分处理,但 return 没有数据。

有几种方法可以解决这个问题,其中最干净的方法是提取一个函数并使用列表理解

def make_request(record, pool):
    r = pool.request('GET', '/ajax/services/search/web',
        fields={'q': 'urllib3', 'v': '1.0'})
    return r.read() # Or any other data you need.

def sendPartition(iterator):
    pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
    return [make_request(record, pool) for record in iterator]

请注意,如果您想使用连接池,您必须在退出前读取数据mapPartitions。这意味着没有惰性评估(如生成器)。就我个人而言,我会考虑在分区内进行异步请求(例如 3.5 中的 async/await,其他地方的 RxPy)并在退出之前进行评估。