尝试从操作或转换中广播 RDD 或引用 RDD
Attempting to broadcast an RDD or reference an RDD from an action or transformation
我想在 Python 中广播一个 hashmap,我想将其用于工作节点上的查找。
class datatransform:
# Constructor
def __init__(self, lookupFileName, dataFileName):
self.lookupFileName = lookupFileName
self.dataFileName = dataFileName
self.hamp = {}
self.broadcastVar = None;
# Read lookup file from the filesystem and create a local hashmap
# first and then create a broadcast variable.
def create_dictionary(self):
lookup_read = sc.textFile(self.lookupFileName)
self.lookup_parsed = (lookup_read
.map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
self.broadcastVar = sc.broadcast(self.lookup_parsed)
# This function will map the given id to a new index using the broadcasted hashmap.
def featurize(self) :
data_projected = sqlContext.sql("SELECT uid, prod_id FROM userprods ")
data = data_projected.map(lambda p: [p.uid, p.prod_id])
bcastmap = self.broadcastVar
data_featurized = (data_projected
.map(lambda p: [p.uid, bcastmap.value[p.prod_id]]))
datatransform = datatransform ('/path/to/lookupfile', '/path/to/datafile')
datatransform.create_dictionary()
datatransform.read_data()
我收到以下错误消息:
Error message:
It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not
如果你想传播字典,你应该先收藏。这意味着 create_dictionary
应该或多或少像这样
def create_dictionary(self):
lookup_read = sc.textFile(self.lookupFileName)
lookup_parsed = (lookup_read
.map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
self.broadcastVar = sc.broadcast(lookup_parsed.collectAsMap())
我想在 Python 中广播一个 hashmap,我想将其用于工作节点上的查找。
class datatransform:
# Constructor
def __init__(self, lookupFileName, dataFileName):
self.lookupFileName = lookupFileName
self.dataFileName = dataFileName
self.hamp = {}
self.broadcastVar = None;
# Read lookup file from the filesystem and create a local hashmap
# first and then create a broadcast variable.
def create_dictionary(self):
lookup_read = sc.textFile(self.lookupFileName)
self.lookup_parsed = (lookup_read
.map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
self.broadcastVar = sc.broadcast(self.lookup_parsed)
# This function will map the given id to a new index using the broadcasted hashmap.
def featurize(self) :
data_projected = sqlContext.sql("SELECT uid, prod_id FROM userprods ")
data = data_projected.map(lambda p: [p.uid, p.prod_id])
bcastmap = self.broadcastVar
data_featurized = (data_projected
.map(lambda p: [p.uid, bcastmap.value[p.prod_id]]))
datatransform = datatransform ('/path/to/lookupfile', '/path/to/datafile')
datatransform.create_dictionary()
datatransform.read_data()
我收到以下错误消息:
Error message: It appears that you are attempting to broadcast an RDD or reference an RDD from an " Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not
如果你想传播字典,你应该先收藏。这意味着 create_dictionary
应该或多或少像这样
def create_dictionary(self):
lookup_read = sc.textFile(self.lookupFileName)
lookup_parsed = (lookup_read
.map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
self.broadcastVar = sc.broadcast(lookup_parsed.collectAsMap())