尝试从操作或转换中广播 RDD 或引用 RDD

Attempting to broadcast an RDD or reference an RDD from an action or transformation

我想在 Python 中广播一个 hashmap,我想将其用于工作节点上的查找。

class datatransform:

   # Constructor

    def __init__(self, lookupFileName, dataFileName):
        self.lookupFileName = lookupFileName
        self.dataFileName = dataFileName
        self.hamp = {}
        self.broadcastVar = None;

    # Read lookup file from the filesystem and create a local hashmap
    # first and then create a broadcast variable.

    def create_dictionary(self):
        lookup_read = sc.textFile(self.lookupFileName)
        self.lookup_parsed = (lookup_read
            .map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
        self.broadcastVar = sc.broadcast(self.lookup_parsed)

    # This function will map the given id to a new index using the broadcasted hashmap.

    def featurize(self) :
        data_projected = sqlContext.sql("SELECT uid, prod_id FROM userprods ")
        data = data_projected.map(lambda p: [p.uid, p.prod_id])
        bcastmap = self.broadcastVar
        data_featurized = (data_projected
            .map(lambda p: [p.uid, bcastmap.value[p.prod_id]]))

datatransform = datatransform ('/path/to/lookupfile', '/path/to/datafile')

datatransform.create_dictionary()

datatransform.read_data()

我收到以下错误消息:

Error message: It appears that you are attempting to broadcast an RDD or reference an RDD from an " Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not

如果你想传播字典,你应该先收藏。这意味着 create_dictionary 应该或多或少像这样

def create_dictionary(self):
    lookup_read = sc.textFile(self.lookupFileName)
    lookup_parsed = (lookup_read
        .map(lambda line: [line.split('\t')[0], line.split('\t')[1]]))
    self.broadcastVar = sc.broadcast(lookup_parsed.collectAsMap())