如何生成具有随机内容和 N 行的 DataFrame?
How to generate a DataFrame with random content and N rows?
如何在 Scala 中创建具有 100 行和 3 列且随机整数值在 (1, 100) 范围内的 Spark DataFrame?
我知道如何手动创建 DataFrame,但我无法自动创建它:
val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3")
您可以简单地使用scala.util.Random
生成范围内的随机数并循环100行,最后使用createDataFrame
api
import scala.util.Random
val data = 1 to 100 map(x => (1+Random.nextInt(100), 1+Random.nextInt(100), 1+Random.nextInt(100)))
sqlContext.createDataFrame(data).toDF("col1", "col2", "col3").show(false)
给你,Seq.fill
是你的朋友:
def randomInt1to100 = scala.util.Random.nextInt(100)+1
val df = sc.parallelize(
Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
您可以使用下面的通用代码
//no of rows required
val rows = 15
//no of columns required
val cols = 10
val spark = SparkSession.builder
.master("local[*]")
.appName("testApp")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.getOrCreate()
import spark.implicits._
val columns = 1 to cols map (i => "col" + i)
// create the DataFrame schema with these columns (in that order)
val schema = StructType(columns.map(StructField(_, IntegerType)))
val lstrows = Seq.fill(rows * cols)(Random.nextInt(100) + 1).grouped(cols).toList.map { x => Row(x: _*) }
val rdd = spark.sparkContext.makeRDD(lstrows)
val df = spark.createDataFrame(rdd, schema)
在本地生成数据然后将其并行化是完全可以的,尤其是当您不必生成大量数据时。
但是,如果您需要生成庞大的数据集,您始终可以实现一个 RDD 来并行执行此操作,如下例所示。
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
def values: Iterator[A] = Iterator.fill(numValues)(random)
}
// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {
// Based on the item and executor count, determine how many values are
// computed in each executor. Distribute the rest evenly (if any).
private val valuesPerSlice = numValues / numSlices
private val slicesWithExtraItem = numValues % numSlices
// Just ask the partition for the data
override def compute(split: Partition, context: TaskContext): Iterator[A] =
split.asInstanceOf[RandomPartition[A]].values
// Generate the partitions so that the load is as evenly spread as possible
// e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
override protected def getPartitions: Array[Partition] =
((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
(slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray
}
一旦你有了这个,你就可以使用它传递你自己的 运行dom 数据生成器来获得 RDD[Int]
val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
* outputs:
* 30
* 86
* 75
* 20
* ...
*/
或 RDD[(Int, Int, Int)]
def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
* outputs:
* (33,22,15)
* (65,24,64)
* (41,81,44)
* (58,7,18)
* ...
*/
当然你也可以很容易地将它包裹在 DataFrame
中:
spark.createDataFrame(rdd).show()
/*
* outputs:
* +---+---+---+
* | _1| _2| _3|
* +---+---+---+
* |100| 48| 92|
* | 34| 40| 30|
* | 98| 63| 61|
* | 95| 17| 63|
* | 68| 31| 34|
* .............
*/
请注意,在这种情况下,每次对 RDD
/DataFrame
执行操作时生成的数据都是不同的。通过更改 RandomPartition
的实现以实际存储值而不是动态生成它们,您可以获得一组稳定的 运行dom 项目,同时仍然保留这种方法的灵活性和可扩展性。
无状态方法的一个好处 属性 是您甚至可以在本地生成庞大的数据集。几秒钟后在我的笔记本电脑上出现以下 运行:
new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647
如果您需要创建大量的随机数据,Spark 提供了一个名为 RandomRDDs 的对象,它可以生成填充了服从均匀、正态或各种其他分布的随机数的数据集。
https://spark.apache.org/docs/latest/mllib-statistics.html#random-data-generation
来自他们的例子:
import org.apache.spark.mllib.random.RandomRDDs._
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)
// Apply a transform to get a random double RDD following `N(1, 4)`.
val v = u.map(x => 1.0 + 2.0 * x)
如何在 Scala 中创建具有 100 行和 3 列且随机整数值在 (1, 100) 范围内的 Spark DataFrame?
我知道如何手动创建 DataFrame,但我无法自动创建它:
val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3")
您可以简单地使用scala.util.Random
生成范围内的随机数并循环100行,最后使用createDataFrame
api
import scala.util.Random
val data = 1 to 100 map(x => (1+Random.nextInt(100), 1+Random.nextInt(100), 1+Random.nextInt(100)))
sqlContext.createDataFrame(data).toDF("col1", "col2", "col3").show(false)
给你,Seq.fill
是你的朋友:
def randomInt1to100 = scala.util.Random.nextInt(100)+1
val df = sc.parallelize(
Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
您可以使用下面的通用代码
//no of rows required
val rows = 15
//no of columns required
val cols = 10
val spark = SparkSession.builder
.master("local[*]")
.appName("testApp")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.getOrCreate()
import spark.implicits._
val columns = 1 to cols map (i => "col" + i)
// create the DataFrame schema with these columns (in that order)
val schema = StructType(columns.map(StructField(_, IntegerType)))
val lstrows = Seq.fill(rows * cols)(Random.nextInt(100) + 1).grouped(cols).toList.map { x => Row(x: _*) }
val rdd = spark.sparkContext.makeRDD(lstrows)
val df = spark.createDataFrame(rdd, schema)
在本地生成数据然后将其并行化是完全可以的,尤其是当您不必生成大量数据时。
但是,如果您需要生成庞大的数据集,您始终可以实现一个 RDD 来并行执行此操作,如下例所示。
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
def values: Iterator[A] = Iterator.fill(numValues)(random)
}
// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {
// Based on the item and executor count, determine how many values are
// computed in each executor. Distribute the rest evenly (if any).
private val valuesPerSlice = numValues / numSlices
private val slicesWithExtraItem = numValues % numSlices
// Just ask the partition for the data
override def compute(split: Partition, context: TaskContext): Iterator[A] =
split.asInstanceOf[RandomPartition[A]].values
// Generate the partitions so that the load is as evenly spread as possible
// e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
override protected def getPartitions: Array[Partition] =
((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
(slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray
}
一旦你有了这个,你就可以使用它传递你自己的 运行dom 数据生成器来获得 RDD[Int]
val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
* outputs:
* 30
* 86
* 75
* 20
* ...
*/
或 RDD[(Int, Int, Int)]
def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
* outputs:
* (33,22,15)
* (65,24,64)
* (41,81,44)
* (58,7,18)
* ...
*/
当然你也可以很容易地将它包裹在 DataFrame
中:
spark.createDataFrame(rdd).show()
/*
* outputs:
* +---+---+---+
* | _1| _2| _3|
* +---+---+---+
* |100| 48| 92|
* | 34| 40| 30|
* | 98| 63| 61|
* | 95| 17| 63|
* | 68| 31| 34|
* .............
*/
请注意,在这种情况下,每次对 RDD
/DataFrame
执行操作时生成的数据都是不同的。通过更改 RandomPartition
的实现以实际存储值而不是动态生成它们,您可以获得一组稳定的 运行dom 项目,同时仍然保留这种方法的灵活性和可扩展性。
无状态方法的一个好处 属性 是您甚至可以在本地生成庞大的数据集。几秒钟后在我的笔记本电脑上出现以下 运行:
new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647
如果您需要创建大量的随机数据,Spark 提供了一个名为 RandomRDDs 的对象,它可以生成填充了服从均匀、正态或各种其他分布的随机数的数据集。
https://spark.apache.org/docs/latest/mllib-statistics.html#random-data-generation
来自他们的例子:
import org.apache.spark.mllib.random.RandomRDDs._
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)
// Apply a transform to get a random double RDD following `N(1, 4)`.
val v = u.map(x => 1.0 + 2.0 * x)