如何在 Java 中将 String 映射到 Spark 中的 Seq<String>

How to map String to Seq<String> in Spark in Java

我想使用自己的分词器对存储为 Dataset<String> 的文本进行分词,并获取 Dataset<Seq<String>>(这样我就可以将其传递给 CountVectorizer)。

预期输入(/tmp/fulltext.txt):

t1 t2 t3
t4 t5

预期输出:

[t1, t2, t3]
[t4, t5]

我写的tokenizer是(基本上它现在做的事情和Spark附带的Tokenizer是一样的,但是我需要重写它以支持中文文本的分词,所以我不能使用官方 Tokenizer):

public class Utils {

  public static Seq<String> segment(String text) {
    String[] array = text.split(" ");
    List<String> tokens = new ArrayList<>();
    for (String term : array) {
      tokens.add(term.toLowerCase());
    }
    return JavaConverters
        .asScalaIteratorConverter(tokens.iterator())
        .asScala()
        .toSeq();
  }

}

我要制作的 Spark 应用程序是

public class TokenizeTest {

  public static void main(String[] args) {

    SparkSession spark = SparkSession
        .builder()
        .appName("Tokenize Test")
        .getOrCreate();

    Dataset<String> rawText = spark
        .read()
        .textFile("/tmp/fulltext.txt")
        .cache();

    Encoder<Seq> listEncoder = Encoders.bean(Seq.class);

    // Compilation error
    Dataset<Seq<String>> newText = rawText
        .map((MapFunction<String, Seq<String>>) s -> Utils.segment(s), listEncoder);

    newText.show();
    spark.stop();
  }
}

我是Spark的初学者,以上代码只是我认为可行的(阅读官方指南后)。但事实证明 TokenizeTest 的代码根本无法编译。你认为有办法解决它吗?

像这样使用 Scala 集合是行不通的。第一次 Seq 与 Bean 不兼容,第二次它是通用的。

如果要拆分,只需使用 segement 定义为的数组:

public class Utils {

  public static String[] segment(String text) {
    return text.split(" ");
  }

}

TokenizeTest定义为:

public class TokenizeTest {

  public static void main(String[] args) {

    SparkSession spark = SparkSession
        .builder()
        .appName("Tokenize Test")
        .getOrCreate();

    Dataset<String> rawText = spark
        .read()
        .textFile("/path/to/file")
        .cache();

    Encoder<String []> listEncoder = spark.implicits().newStringArrayEncoder();


    Dataset<String []> newText = rawText
        .map((MapFunction<String, String []>) s -> Utils.segment(s), listEncoder);

    newText.show();
    spark.stop();
  }
}

但实际上,您可能会考虑 org.apache.spark.sql.functions.splitorg.apache.spark.ml.feature.Tokenizer 而不是重新发明轮子。