如何在 Spark 中为 kmeans 映射 MongoDB 数据?
How to map MongoDB data in Spark for kmeans?
我想 运行 在 Spark 中对 MongoDB 提供的数据进行 k-means。
我有一个针对平面文件的工作示例:
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
这是平面文件的格式:
0 0 1
1 1 1
2 2 2
9 9 6
现在我想用 MongoDB 替换平面文件:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
我想了解如何从 df 映射数据,以便它可以用作 kmeans 的输入。
数据库的"layout"为:
根
|-- _id: 字符串 (nullable = true)
|-- 字段 0:二进制(可为空 = 真)
|-- field1: binary (nullable = true)
|-- field2: binary (nullable = true)
|-- 字段 3:二进制(可为空 = 真)
|-- 字段 4:二进制(可为空 = 真)
|-- field5: binary (nullable = true)
|-- 字段 6:二进制(可为空 = 真)
|-- field7: binary (nullable = true)
|-- field8: binary (nullable = true)
|-- 字段 9:二进制(可为空 = 真)
I like to understand how to map data from the df so that it can be used as input for kmeans.
根据您的代码片段,我假设您使用的是 PySpark。
如果你查看 clustering.KMeans Python API 文档,你可以看到第一个参数需要是 RDD of Vector or convertible sequence types
执行以下代码后,使用 MongoDB Spark Connector
从 MongoDB 加载数据
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri","mongodb://127.0.0.1/ycsb.usertable")
.load()
您在 df
中拥有的是一个 DataFrame,因此我们需要将其转换为可转换为 Vector 类型的内容。
由于您在文本文件示例中使用了 numpy.array,我们可以继续使用此数组类型来学习转换。
根据提供的 layout
,首先我们需要删除 _id
列,因为聚类训练不需要它。另请参阅 Vector 数据类型以获取更多信息。
有了以上信息,我们就进入正题:
# Drop _id column and get RDD representation of the DataFrame
rowRDD = df.drop("_id").rdd
# Convert RDD of Row into RDD of numpy.array
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
# Feed into KMeans
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
如果您想保留布尔值 (True/False) 而不是整数 (1/0),则可以删除 int
部分。如下:
parsedRdd = rowRDD.map(lambda row: array([x for x in row]))
将所有这些放在一起:
from numpy import array
from pyspark.mllib.clustering import KMeans
import org.apache.spark.sql.SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
rowRDD = df.drop("_id").rdd
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
clusters.clusterCenters
我想 运行 在 Spark 中对 MongoDB 提供的数据进行 k-means。 我有一个针对平面文件的工作示例:
sc = SparkContext(appName="KMeansExample") # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
这是平面文件的格式:
0 0 1
1 1 1
2 2 2
9 9 6
现在我想用 MongoDB 替换平面文件:
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()
# <<<< Here I am missing the parsing >>>>>
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
我想了解如何从 df 映射数据,以便它可以用作 kmeans 的输入。
数据库的"layout"为:
根
|-- _id: 字符串 (nullable = true)
|-- 字段 0:二进制(可为空 = 真)
|-- field1: binary (nullable = true)
|-- field2: binary (nullable = true)
|-- 字段 3:二进制(可为空 = 真)
|-- 字段 4:二进制(可为空 = 真)
|-- field5: binary (nullable = true)
|-- 字段 6:二进制(可为空 = 真)
|-- field7: binary (nullable = true)
|-- field8: binary (nullable = true)
|-- 字段 9:二进制(可为空 = 真)
I like to understand how to map data from the df so that it can be used as input for kmeans.
根据您的代码片段,我假设您使用的是 PySpark。
如果你查看 clustering.KMeans Python API 文档,你可以看到第一个参数需要是 RDD of Vector or convertible sequence types
执行以下代码后,使用 MongoDB Spark Connector
从 MongoDB 加载数据df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("uri","mongodb://127.0.0.1/ycsb.usertable")
.load()
您在 df
中拥有的是一个 DataFrame,因此我们需要将其转换为可转换为 Vector 类型的内容。
由于您在文本文件示例中使用了 numpy.array,我们可以继续使用此数组类型来学习转换。
根据提供的 layout
,首先我们需要删除 _id
列,因为聚类训练不需要它。另请参阅 Vector 数据类型以获取更多信息。
有了以上信息,我们就进入正题:
# Drop _id column and get RDD representation of the DataFrame
rowRDD = df.drop("_id").rdd
# Convert RDD of Row into RDD of numpy.array
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
# Feed into KMeans
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
如果您想保留布尔值 (True/False) 而不是整数 (1/0),则可以删除 int
部分。如下:
parsedRdd = rowRDD.map(lambda row: array([x for x in row]))
将所有这些放在一起:
from numpy import array
from pyspark.mllib.clustering import KMeans
import org.apache.spark.sql.SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
rowRDD = df.drop("_id").rdd
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
clusters.clusterCenters