Spark Java:如何将数据从 HTTP 源移动到 Couchbase 接收器?
Spark Java: How to move data from HTTP source to Couchbase sink?
我在 Web 服务器上有一个可用的 .gz
文件,我想以流方式使用该文件并将数据插入 Couchbase。 .gz
文件只有一个文件,每行包含一个 JSON 对象。
由于Spark没有HTTP接收器,我自己写了一个(如下所示)。我正在使用 Couchbase Spark connector 进行插入。但是,当 运行 时,该作业实际上并未插入任何内容。我怀疑这是由于我对 Spark 缺乏经验,不知道如何开始和等待终止。如下所示,有 2 个地方可以进行此类调用。
接收器:
public class HttpReceiver extends Receiver<String> {
private final String url;
public HttpReceiver(String url) {
super(MEMORY_AND_DISK());
this.url = url;
}
@Override
public void onStart() {
new Thread(() -> receive()).start();
}
private void receive() {
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setAllowUserInteraction(false);
conn.setInstanceFollowRedirects(true);
conn.setRequestMethod("GET");
conn.setReadTimeout(60 * 1000);
InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
Reader decoder = new InputStreamReader(gzipStream, UTF_8);
BufferedReader reader = new BufferedReader(decoder);
String json = null;
while (!isStopped() && (json = reader.readLine()) != null) {
store(json);
}
reader.close();
conn.disconnect();
} catch (IOException e) {
stop(e.getMessage(), e);
}
}
@Override
public void onStop() {
}
}
数据加载:
public void load(String url) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));
lines.foreachRDD(rdd ->
sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.writeStream()
.option("idField", "id")
.format("com.couchbase.spark.sql")
.start()
// .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
);
// ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
注释行表明我对开始和终止作业感到困惑。另外,如果接收器有问题或可以改进,请随时对接收器发表评论。
将 Spark v2.1.0 与 Java.
结合使用
编辑 1:
也试过这个实现:
lines.foreachRDD(rdd ->
couchbaseWriter(sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.write()
.option("idField", "id")
.format("com.couchbase.spark.sql"))
.couchbase()
);
ssc.start();
ssc.awaitTermination();
但是它抛出 IllegalStateException: SparkContext has been shutdown
11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
at org.apache.spark.rdd.RDD$$anonfun$fold.apply(RDD.scala:1088)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)
编辑 2:
原来编辑 1 的错误是由我关闭上下文的 @PostDestruct
方法引起的。我正在使用 Spring 并且该 bean 应该是单例的,但是 Spark 以某种方式导致它在作业完成之前销毁。我现在删除了 @PostDestruct
并做了一些更改;以下似乎有效但有悬而未决的问题:
public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));
lines.foreachRDD(rdd -> {
try {
Dataset<Row> select = sql.read().json(rdd)
.select("id", "name", "rating", "review_count", "hours", "attributes");
couchbaseWriter(select.write()
.option("idField", "id")
.format(format))
.couchbase();
} catch (Exception e) {
// Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
}
});
ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
未决问题:
- 不时抛出
AnalysisException: cannot resolve '
id' given input columns: [];
。这是我的接收器的问题吗?
当文档已经存在时,任务失败并出现以下异常。就我而言,我只是想覆盖文档(如果存在),而不是炸毁。
Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException
at com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:475)
回答我自己的问题,这就是我最终无一例外地工作:
public void load(String dataDirURL, String format) throws InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));
ObjectMapper objectMapper = new ObjectMapper();
lines.foreachRDD(rdd -> {
JavaRDD<RawJsonDocument> docRdd = rdd
.filter(content -> !isEmpty(content))
.map(content -> {
String id = "";
String modifiedContent = "";
try {
ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
if (node.has("id")) {
id = node.get("id").textValue();
modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
return RawJsonDocument.create(id, modifiedContent);
}
})
.filter(doc -> !isEmpty(doc.id()));
couchbaseDocumentRDD(docRdd)
.saveToCouchbase(UPSERT);
}
);
ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
我在 Web 服务器上有一个可用的 .gz
文件,我想以流方式使用该文件并将数据插入 Couchbase。 .gz
文件只有一个文件,每行包含一个 JSON 对象。
由于Spark没有HTTP接收器,我自己写了一个(如下所示)。我正在使用 Couchbase Spark connector 进行插入。但是,当 运行 时,该作业实际上并未插入任何内容。我怀疑这是由于我对 Spark 缺乏经验,不知道如何开始和等待终止。如下所示,有 2 个地方可以进行此类调用。
接收器:
public class HttpReceiver extends Receiver<String> {
private final String url;
public HttpReceiver(String url) {
super(MEMORY_AND_DISK());
this.url = url;
}
@Override
public void onStart() {
new Thread(() -> receive()).start();
}
private void receive() {
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setAllowUserInteraction(false);
conn.setInstanceFollowRedirects(true);
conn.setRequestMethod("GET");
conn.setReadTimeout(60 * 1000);
InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
Reader decoder = new InputStreamReader(gzipStream, UTF_8);
BufferedReader reader = new BufferedReader(decoder);
String json = null;
while (!isStopped() && (json = reader.readLine()) != null) {
store(json);
}
reader.close();
conn.disconnect();
} catch (IOException e) {
stop(e.getMessage(), e);
}
}
@Override
public void onStop() {
}
}
数据加载:
public void load(String url) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));
lines.foreachRDD(rdd ->
sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.writeStream()
.option("idField", "id")
.format("com.couchbase.spark.sql")
.start()
// .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
);
// ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
注释行表明我对开始和终止作业感到困惑。另外,如果接收器有问题或可以改进,请随时对接收器发表评论。
将 Spark v2.1.0 与 Java.
结合使用编辑 1:
也试过这个实现:
lines.foreachRDD(rdd ->
couchbaseWriter(sql.read().json(rdd)
.select(new Column("id"),
new Column("name"),
new Column("rating"),
new Column("review_count"),
new Column("hours"),
new Column("attributes"))
.write()
.option("idField", "id")
.format("com.couchbase.spark.sql"))
.couchbase()
);
ssc.start();
ssc.awaitTermination();
但是它抛出 IllegalStateException: SparkContext has been shutdown
11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
at org.apache.spark.rdd.RDD$$anonfun$fold.apply(RDD.scala:1088)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)
编辑 2:
原来编辑 1 的错误是由我关闭上下文的 @PostDestruct
方法引起的。我正在使用 Spring 并且该 bean 应该是单例的,但是 Spark 以某种方式导致它在作业完成之前销毁。我现在删除了 @PostDestruct
并做了一些更改;以下似乎有效但有悬而未决的问题:
public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));
lines.foreachRDD(rdd -> {
try {
Dataset<Row> select = sql.read().json(rdd)
.select("id", "name", "rating", "review_count", "hours", "attributes");
couchbaseWriter(select.write()
.option("idField", "id")
.format(format))
.couchbase();
} catch (Exception e) {
// Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
}
});
ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}
未决问题:
- 不时抛出
AnalysisException: cannot resolve '
id' given input columns: [];
。这是我的接收器的问题吗? 当文档已经存在时,任务失败并出现以下异常。就我而言,我只是想覆盖文档(如果存在),而不是炸毁。
Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException at com.couchbase.client.java.CouchbaseAsyncBucket.call(CouchbaseAsyncBucket.java:475)
回答我自己的问题,这就是我最终无一例外地工作:
public void load(String dataDirURL, String format) throws InterruptedException {
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));
ObjectMapper objectMapper = new ObjectMapper();
lines.foreachRDD(rdd -> {
JavaRDD<RawJsonDocument> docRdd = rdd
.filter(content -> !isEmpty(content))
.map(content -> {
String id = "";
String modifiedContent = "";
try {
ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
if (node.has("id")) {
id = node.get("id").textValue();
modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
return RawJsonDocument.create(id, modifiedContent);
}
})
.filter(doc -> !isEmpty(doc.id()));
couchbaseDocumentRDD(docRdd)
.saveToCouchbase(UPSERT);
}
);
ssc.start();
ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}