在带有 PySpark 的单个多核机器中使用大型查找 Table
Using Large Lookup Table in Single MultiCore Machine with PySpark
我有一个大型查找 table,它将整数作为键,将字符串列表作为值。我需要这个查找 table 来对我通过 spark 加载的数据进行一些过滤和转换。
import numpy as np
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")
sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)
在启动 pyspark 时,我什至使用了 --driver-memory 20g
选项。
我的机器有 500 GB 内存和 27 个内核。我首先在内存中加载一个名为 lookup_tbl
的字典,它有 17457954 行。
当我尝试 运行 以下代码时,超过 10 分钟我没有得到任何输出。等了这么久,我关闭了进程。我需要查找 table 功能。我什至尝试过使用 broadcast
功能。
sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
x = x.split('\t')
return transform(x)
def check_self(x):
from_id = x[0]
to_id = x[1]
self_ = 1
try:
common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
except KeyError:
common_items = set()
if len(common_items ) < 1:
common_items = set("-")
self_ = 0
return (((from_id, to_id, k, self_) for k in common_items ))
pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")
这是 spark 的问题还是我 运行 设置不正确?此外,我尝试为执行程序和驱动程序内存设置各种值 (~20g
),但没有任何改进。
据我了解,spark 在将其发送到所有本地进程之前首先尝试序列化该字典。有没有办法从公共位置使用这本词典?
首先,要访问广播变量,您必须使用其 value
属性:
# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)
为了避免广播,您可以在 mapPartitions
:
中本地加载 lookup_tbl
def check_partition(iter):
lookup_tbl = ...
for x in iter:
yield check_self
identity = lambda x: x
pair = (sc.textFile(...)
.map(lambda x: clean_data(...)
.mapPartitions(check_partition)
.flatMap(identity))
如果 lookup_tbl
相对较大,它仍然可以 expensive.There 您可以通过多种方式处理此问题:
使用 SQLite 连接而不是局部变量。
import sqlite3
conn = sqlite3.connect('path/to/lookup.db')
c.execute("SELECT key FROM lookup WHERE id = '%s'" % from_id)
s1 = {x[0] for x in c.fetchall()}
c.execute("SELECT key FROM lookup WHERE id = '%s'" % to_id)
s2 = {x[0] for x in c.fetchall()}
common_items = s1.intersection(s2)
它很容易设置,如果数据被正确索引,应该足够快
使用单个数据库服务器进行查找。 MongoDB 应该可以正常工作,并且通过适当的内存映射可以显着减少总体内存占用量
使用join
代替广播
swap = lambda x: (x[1], x[0])
def reshape1(record):
(k1, (items, k2)) = record
return (k2, (k1, items))
def reshape2(record):
(k1, (items1, (k2, items2))) = record
return (k1, k2, set(items1) & set(items2))
pairs = sc.textFile(...).map(lambda x: clean_data(...))
n = ... # Number of partitions
lookup_rdd = sc.parallelize(lookup_tbl.items()).partitionBy(n)
lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)
我有一个大型查找 table,它将整数作为键,将字符串列表作为值。我需要这个查找 table 来对我通过 spark 加载的数据进行一些过滤和转换。
import numpy as np
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")
sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)
在启动 pyspark 时,我什至使用了 --driver-memory 20g
选项。
我的机器有 500 GB 内存和 27 个内核。我首先在内存中加载一个名为 lookup_tbl
的字典,它有 17457954 行。
当我尝试 运行 以下代码时,超过 10 分钟我没有得到任何输出。等了这么久,我关闭了进程。我需要查找 table 功能。我什至尝试过使用 broadcast
功能。
sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
x = x.split('\t')
return transform(x)
def check_self(x):
from_id = x[0]
to_id = x[1]
self_ = 1
try:
common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
except KeyError:
common_items = set()
if len(common_items ) < 1:
common_items = set("-")
self_ = 0
return (((from_id, to_id, k, self_) for k in common_items ))
pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")
这是 spark 的问题还是我 运行 设置不正确?此外,我尝试为执行程序和驱动程序内存设置各种值 (~20g
),但没有任何改进。
据我了解,spark 在将其发送到所有本地进程之前首先尝试序列化该字典。有没有办法从公共位置使用这本词典?
首先,要访问广播变量,您必须使用其 value
属性:
# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)
为了避免广播,您可以在 mapPartitions
:
lookup_tbl
def check_partition(iter):
lookup_tbl = ...
for x in iter:
yield check_self
identity = lambda x: x
pair = (sc.textFile(...)
.map(lambda x: clean_data(...)
.mapPartitions(check_partition)
.flatMap(identity))
如果 lookup_tbl
相对较大,它仍然可以 expensive.There 您可以通过多种方式处理此问题:
使用 SQLite 连接而不是局部变量。
import sqlite3 conn = sqlite3.connect('path/to/lookup.db') c.execute("SELECT key FROM lookup WHERE id = '%s'" % from_id) s1 = {x[0] for x in c.fetchall()} c.execute("SELECT key FROM lookup WHERE id = '%s'" % to_id) s2 = {x[0] for x in c.fetchall()} common_items = s1.intersection(s2)
它很容易设置,如果数据被正确索引,应该足够快
使用单个数据库服务器进行查找。 MongoDB 应该可以正常工作,并且通过适当的内存映射可以显着减少总体内存占用量
使用
join
代替广播swap = lambda x: (x[1], x[0]) def reshape1(record): (k1, (items, k2)) = record return (k2, (k1, items)) def reshape2(record): (k1, (items1, (k2, items2))) = record return (k1, k2, set(items1) & set(items2)) pairs = sc.textFile(...).map(lambda x: clean_data(...)) n = ... # Number of partitions lookup_rdd = sc.parallelize(lookup_tbl.items()).partitionBy(n) lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)