如何使 Apache Spark mapPartition 正常工作?
How to make Apache Spark mapPartition work correctly?
我正在尝试根据每个分区做一些工作,我想 return 与输入相同的数据:
from urllib3 import HTTPConnectionPool
rdd = sc.parallelize(["peter", "john", "harris"])
def sendPartition(iterator):
pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
for record in iterator:
r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'})
return iterator
rdd.mapPartitions(sendPartition).count()
我收到这个错误:
TypeError: 'NoneType' object is not iterable
PS:这只是我想要实现的目标的简化。我想为每个元素向 ElasticSearch 发出复杂的地理搜索请求(因此我不能使用 Spark Elasticsearch 连接器)。在这个地图分区之前,我有一个巨大的过滤器,地图等管道
PPS:我已经重新启动了我的 spark,现在我得到“0”作为输出,这比错误要好,但我预计它是“3”。
关于类型错误,它看起来无法使用问题中包含的代码进行重现。我的猜测是在某些时候 None
值已被传递给 RDD
构造函数或 return 从 sendPartition
.
编辑
输出为空 RDD 的问题是您使用分区迭代器的方式造成的。 PySpark 使用 itertools.chain
将数据传递给 mapPartition
,它的行为或多或少与 Scala Iterator
.
相同
import itertools
iter = itertools.chain(range(10))
iter.next()
## 0
完成 for
循环后
for x in iter:
x
你最终得到一个空 chain
:
type(iter)
## itertools.chain
iter.nex()
## Traceback (most recent call last)
## ...
## StopIteration:
虽然 StopIteration
作为正常迭代逻辑的一部分处理,但 return 没有数据。
有几种方法可以解决这个问题,其中最干净的方法是提取一个函数并使用列表理解
def make_request(record, pool):
r = pool.request('GET', '/ajax/services/search/web',
fields={'q': 'urllib3', 'v': '1.0'})
return r.read() # Or any other data you need.
def sendPartition(iterator):
pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
return [make_request(record, pool) for record in iterator]
请注意,如果您想使用连接池,您必须在退出前读取数据mapPartitions
。这意味着没有惰性评估(如生成器)。就我个人而言,我会考虑在分区内进行异步请求(例如 3.5 中的 async/await
,其他地方的 RxPy)并在退出之前进行评估。
我正在尝试根据每个分区做一些工作,我想 return 与输入相同的数据:
from urllib3 import HTTPConnectionPool
rdd = sc.parallelize(["peter", "john", "harris"])
def sendPartition(iterator):
pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
for record in iterator:
r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'})
return iterator
rdd.mapPartitions(sendPartition).count()
我收到这个错误:
TypeError: 'NoneType' object is not iterable
PS:这只是我想要实现的目标的简化。我想为每个元素向 ElasticSearch 发出复杂的地理搜索请求(因此我不能使用 Spark Elasticsearch 连接器)。在这个地图分区之前,我有一个巨大的过滤器,地图等管道
PPS:我已经重新启动了我的 spark,现在我得到“0”作为输出,这比错误要好,但我预计它是“3”。
关于类型错误,它看起来无法使用问题中包含的代码进行重现。我的猜测是在某些时候 None
值已被传递给 RDD
构造函数或 return 从 sendPartition
.
输出为空 RDD 的问题是您使用分区迭代器的方式造成的。 PySpark 使用 itertools.chain
将数据传递给 mapPartition
,它的行为或多或少与 Scala Iterator
.
import itertools
iter = itertools.chain(range(10))
iter.next()
## 0
完成 for
循环后
for x in iter:
x
你最终得到一个空 chain
:
type(iter)
## itertools.chain
iter.nex()
## Traceback (most recent call last)
## ...
## StopIteration:
虽然 StopIteration
作为正常迭代逻辑的一部分处理,但 return 没有数据。
有几种方法可以解决这个问题,其中最干净的方法是提取一个函数并使用列表理解
def make_request(record, pool):
r = pool.request('GET', '/ajax/services/search/web',
fields={'q': 'urllib3', 'v': '1.0'})
return r.read() # Or any other data you need.
def sendPartition(iterator):
pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10)
return [make_request(record, pool) for record in iterator]
请注意,如果您想使用连接池,您必须在退出前读取数据mapPartitions
。这意味着没有惰性评估(如生成器)。就我个人而言,我会考虑在分区内进行异步请求(例如 3.5 中的 async/await
,其他地方的 RxPy)并在退出之前进行评估。