在带有 PySpark 的单个多核机器中使用大型查找 Table

Using Large Lookup Table in Single MultiCore Machine with PySpark

我有一个大型查找 table,它将整数作为键,将字符串列表作为值。我需要这个查找 table 来对我通过 spark 加载的数据进行一些过滤和转换。

import numpy as np
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")

sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)

在启动 pyspark 时,我什至使用了 --driver-memory 20g 选项。

我的机器有 500 GB 内存和 27 个内核。我首先在内存中加载一个名为 lookup_tbl 的字典,它有 17457954 行。

当我尝试 运行 以下代码时,超过 10 分钟我没有得到任何输出。等了这么久,我关闭了进程。我需要查找 table 功能。我什至尝试过使用 broadcast 功能。

sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
  x = x.split('\t')
  return transform(x)


def check_self(x):
  from_id = x[0]
  to_id = x[1]
  self_ = 1
  try:
    common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
  except KeyError:
    common_items = set()
  if len(common_items ) < 1:
    common_items = set("-")
    self_ = 0
  return (((from_id, to_id, k, self_) for k in common_items ))

pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")

这是 spark 的问题还是我 运行 设置不正确?此外,我尝试为执行程序和驱动程序内存设置各种值 (~20g),但没有任何改进。

据我了解,spark 在将其发送到所有本地进程之前首先尝试序列化该字典。有没有办法从公共位置使用这本词典?

首先,要访问广播变量,您必须使用其 value 属性:

# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)

为了避免广播,您可以在 mapPartitions:

中本地加载 lookup_tbl
def check_partition(iter):
   lookup_tbl = ...
   for x in iter:
       yield check_self

identity = lambda x: x
pair = (sc.textFile(...)
    .map(lambda x: clean_data(...)
    .mapPartitions(check_partition)
    .flatMap(identity))

如果 lookup_tbl 相对较大,它仍然可以 expensive.There 您可以通过多种方式处理此问题:

  1. 使用 SQLite 连接而不是局部变量。

    import sqlite3
    conn = sqlite3.connect('path/to/lookup.db')
    
    c.execute("SELECT key FROM lookup WHERE id = '%s'" % from_id)
    s1 = {x[0] for x in c.fetchall()}
    c.execute("SELECT key FROM lookup WHERE id = '%s'" % to_id)
    s2 = {x[0] for x in c.fetchall()}
    common_items = s1.intersection(s2)
    

    它很容易设置,如果数据被正确索引,应该足够快

  2. 使用单个数据库服务器进行查找。 MongoDB 应该可以正常工作,并且通过适当的内存映射可以显着减少总体内存占用量

  3. 使用join代替广播

    swap = lambda x: (x[1], x[0])
    
    def reshape1(record):
       (k1, (items, k2)) = record
       return (k2, (k1, items))
    
    def reshape2(record):
       (k1, (items1, (k2, items2))) = record
       return (k1, k2, set(items1) & set(items2))
    
    pairs = sc.textFile(...).map(lambda x: clean_data(...))
    
    n = ... # Number of partitions
    lookup_rdd = sc.parallelize(lookup_tbl.items()).partitionBy(n)
    
    lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)