Apache Ignite + Spark Dataframes:客户端与服务器的疑问

Apache Ignite + Spark Dataframes: Client vs Server Doubts

我一直在尝试整合 ignite 和 spark。我的应用程序的目标是写入和读取 spark 数据帧 to/from 点燃。但是,我面临着更大数据集(> 200 000 000 行)的几个问题。

我在 YARN 上有一个 6 节点 Ignite 集群 运行。它有 160Gb 内存和 12 个内核。我正在尝试在 Ignite 缓存(分区 1 备份)中使用 spark(大约 20Gb 的原始文本数据)保存数据帧:

def main(args: Array[String]) {
    val ignite = setupIgnite

    closeAfter(ignite) { _ ⇒

      implicit val spark: SparkSession = SparkSession.builder
        .appName("Ignite Benchmark")
        .getOrCreate()

      val customer = readDF("csv", "|", Schemas.customerSchema, "hdfs://master.local:8020/apps/hive/warehouse/ssbplus100/customer")
      val part = readDF("csv", "|", Schemas.partSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/part")
      val supplier = readDF("csv", "|", Schemas.supplierSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/supplier")
      val dateDim = readDF("csv", "|", Schemas.dateDimSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/date_dim")
      val lineorder = readDF("csv", "|", Schemas.lineorderSchema, "hdfs:// master.local:8020/apps/hive/warehouse/ssbplus100/lineorder")

      writeDF(customer, "customer", List("custkey"), TEMPLATES.REPLICATED)
      writeDF(part, "part", List("partkey"), TEMPLATES.REPLICATED)
      writeDF(supplier, "supplier", List("suppkey"), TEMPLATES.REPLICATED)
      writeDF(dateDim, "date_dim", List("datekey"), TEMPLATES.REPLICATED)
      writeDF(lineorder.limit(200000000), "lineorder", List("orderkey, linenumber"), TEMPLATES.NO_BACKUP)

    }
  }

在某些时候,spark 应用程序检索到此错误:

    class org.apache.ignite.internal.mem.IgniteOutOfMemoryException: Out of memory in data region [name=default, initSize=256.0 MiB, maxSize=12.6 GiB, persistenceEnabled=false] Try the following:
  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)
  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)
  ^-- Enable eviction or expiration policies
        at org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl.allocatePage(PageMemoryNoStoreImpl.java:304)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.allocateDataPage(AbstractFreeList.java:463)
        at org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList.insertDataRow(AbstractFreeList.java:501)
        at org.apache.ignite.internal.processors.cache.persistence.RowStore.addRow(RowStore.java:97)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.createRow(IgniteCacheOffheapManagerImpl.java:1302)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4426)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry$UpdateClosure.call(GridCacheMapEntry.java:4371)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3083)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree$Invoke.access00(BPlusTree.java:2977)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1726)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1703)
        at org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1610)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1249)
        at org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:352)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.storeValue(GridCacheMapEntry.java:3602)
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.initialValue(GridCacheMapEntry.java:2774)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$IsolatedUpdater.receive(DataStreamerImpl.java:2125)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:140)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.localUpdate(DataStreamProcessor.java:400)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:305)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access[=11=]0(DataStreamProcessor.java:60)
        at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.onMessage(DataStreamProcessor.java:90)
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1556)
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1184)
        at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:125)
        at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:1091)
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:511)
        at java.lang.Thread.run(Thread.java:748)

我认为问题在于在 spark 会话之前启动了 ignite 服务器,就像在官方的 ignite 示例中一样。该服务器开始缓存我正在写入 ignite 缓存的数据,并超过其默认区域大小最大值(12Gb,这与我为 yarn 集群定义的 20GB 不同)。但是,我不明白示例和文档如何告诉我们在 spark 上下文(和我假设的会话)之前创建一个 ignite 服务器。我知道如果没有这个,应用程序将在所有 spark 作业终止后挂起,但我不明白在 spark 应用程序上有一个服务器开始缓存数据的逻辑。我对这个概念很困惑,现在我已经将 spark 中的这个 ignite 实例设置为客户端。

这是一个奇怪的行为,因为我的所有点燃节点(YARN 上的运行)都为默认区域定义了 20GB(我更改并验证了它)。这表明错误必须来自在 Spark 上启动的 ignite 服务器(我认为它是驱动程序上的一个,每个工作人员一个),因为我没有更改 ignite-config.xml 中的默认区域大小spark 应用程序(如错误所示,默认为 12GB)。然而,这有意义吗? Spark 是否应该抛出这个错误作为它读取和写入数据 from/to 点燃的唯一目标? Spark 是否参与缓存任何数据,这是否意味着我应该在我的应用程序的 ignite-config.xml 中设置客户端模式,尽管官方示例没有使用客户端模式?

此致, 卡洛斯

首先,Spark-Ignite 连接器已经 connects in client mode

我假设您有足够的内存,但您可以按照 Capacity Planning 指南中的示例来确定。

但是,我认为问题在于您过于关注示例应用程序 (!)。该示例——为了自包含——包括一个服务器和一个 Spark 客户端。如果您已经有一个 Ignite 集群,您不需要在您的 Spark 客户端中启动服务器

这是一个来自真实应用程序的略微改动的示例(在 Java 中,抱歉):

    try (SparkSession spark = SparkSession
        .builder()
        .appName("AppName")
        .master(sparkMaster)
        .config("spark.executor.extraClassPath", igniteClassPath())
        .getOrCreate()) {

        // Get source DataFrame
        DataSet<Row> results = ....

        results.write()
            .outputMode("append")
            .format(IgniteDataFrameSettings.FORMAT_IGNITE())
            .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), igniteCfgFile)
            .option(IgniteDataFrameSettings.OPTION_TABLE(), "Results")
            .option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE(), true)
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "name")
            .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "backups=1")
            .write();
    }

我没有测试,但你应该明白了:你需要为 Ignite 配置文件提供一个 URL;它在后台创建客户端以连接到该服务器。