spark 优化性能 Structured Streaming Kafka

spark Optimize performance Structured Streaming Kafka

测试应用两台服务器(4core+16GRAM)

我的想法是从kafka获取数据,用多线程处理,然后保存在Elasticsearch

spark-submit --class com.yizhisec.bigdata.TrafficEs 
--master yarn 
--deploy-mode cluster 
--executor-memory 512M 
--executor-cores 2 
--conf spark.streaming.concurrentJobs=5 
--num-executors 5 
--supervise bigdata-1.0.jar

但是 执行者中只有一个任务

代码

我已经使用numPartitions获取数据

spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", prop.getProperty("kafka.broker.list"))
                .option("kafka.ssl.truststore.location", prop.getProperty("kafka.jks.path"))
                .option("kafka.ssl.truststore.password", prop.getProperty("kafka.jks.passwd"))
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("startingOffsets", "earliest")
                .option("numPartitions", prop.getProperty("kafka.partition"))
                .option("subscribe", topic)
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(value AS STRING)");

流程代码

Dataset<Traffic> df = StreamSparkUtils.steamToTraffic(rawDf);
String[] appProtoFilter = properties.getProperty("appproto").split(",");

Dataset<TrafficNode> nodeDataset = df
        .filter(df.col("appproto").isin(appProtoFilter))
        .map(new MapFunction<Traffic, TrafficNode>() {
            @Override
            public TrafficNode call(Traffic traffic) throws Exception {
                TrafficNode n = new TrafficNode();
                n.setDestport(traffic.getDestport());
                n.setSrcip(traffic.getSrcip());
                n.setDestip(traffic.getDestip());
                n.setAppproto(traffic.getAppproto());
                n.setEndtime(traffic.getEnd_time());
                return n;
            }
        }, Encoders.bean(TrafficNode.class));




StreamingQuery query = null;
try {
    query = StreamSparkUtils.streamSinkEs(nodeDataset, "loh_traffic");
    query.awaitTermination();
} catch (IOException | StreamingQueryException e) {
    e.printStackTrace();
}

如何优化。

保存在 es

public static StreamingQuery streamSinkEs(Dataset<?> dataSet, String index) throws IOException {
        Properties properties = readProp();
        return dataSet.writeStream()
                .option("es.nodes", properties.getProperty("es.nodes"))
                .option("es.port", properties.getProperty("es.port"))
                .option("checkpointLocation", properties.getProperty("es.checkpoint"))
                .format("es")
                .start(index);
    }

streamtoTraffic

    public static Dataset<Traffic> steamToTraffic(Dataset<Row> df) {
        if (df == null) {
            return null;
        }

        StructType trafficSchema = new StructType()
                .add("guid", DataTypes.LongType)
                ...
                .add("downsize", DataTypes.LongType);

        Dataset<Row> ds = df.select(functions.from_json(df.col("value").cast(DataTypes.StringType), trafficSchema).as("data")).select("data.*");
        return ds.as(ExpressionEncoder.javaBean(Traffic.class));
    }

从 Kafka 中读取的应用程序会随着主题中分区的数量而扩展。 只要您的数据仅存储在一个分区中,您的应用程序就只能使用一个消费者获取数据。

在您的情况下,您的 Spark 应用程序中有多少个执行程序并不重要,因为只有一个执行程序可以从一个分区中读取。这是因为在 Kafka 中我们有 Consumer Group 的概念。如果您想提高性能,您应该增加 Kafka 主题中的分区数量。

以下是 Kafa documentation 描述消费者组和分区之间交互的摘录:

By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.