如何计算 Spark JavaRDD 中当前行和上一行之间的差异
How to calculate difference between current and previous row in Spark JavaRDD
我将 .log
文件解析为 JavaRDD,在对这个 JavaRDD 进行排序之后,现在我有了,例如 oldJavaRDD
:
2016-03-28 | 11:00 | X | object1 | region1
2016-03-28 | 11:01 | Y | object1 | region1
2016-03-28 | 11:05 | X | object1 | region1
2016-03-28 | 11:09 | X | object1 | region1
2016-03-28 | 11:00 | X | object2 | region1
2016-03-28 | 11:01 | Z | object2 | region1
如何获得 newJavaRDD
以将其保存到数据库?
新的 JavaRDD 结构必须是:
2016-03-28 | 9 | object1 | region1
2016-03-28 | 1 | object2 | region1
所以,我必须计算当前行和上一行之间的时间(在某些情况下也使用标志 X, Y, Z
来定义,是否向结果添加时间)并在更改 date, objectName
或 [= 后向 JavaRDD 添加新元素24=].
我可以使用这种类型的代码(map),但我认为这种方法不好而且不是最快的方法
JavaRDD<NewObject> newJavaRDD = oldJavaRDD.map { r ->
String datePrev[] = ...
if (datePrev != dateCurr ...) {
return newJavaRdd;
} else {
return null;
}
}
首先,您的代码示例从 创建 newJavaRDD
的转换中引用 newJavaRDD
- 这在几个不同的层面上是不可能的:
- 您不能在变量声明的右侧引用变量...
- 您不能在 RDD 的转换中使用 RDD(同一个或另一个 - 这无关紧要)- 转换中的任何内容都必须由 Spark 序列化,而 Spark 无法序列化自己的RDD(这毫无意义)
那么,你应该怎么做呢?
假设:
- 您的目的是为
date
+ object
+ region
的每个组合获取一条记录
- 每个这样的组合不应该有太多记录,所以
groupBy
这些字段作为键是安全的
您可以 groupBy
关键字段,然后 mapValues
获取第一条和最后一条记录之间的 "minute distnace" (传递给 mapValues
的函数可以更改为如果我没弄对,请包含您的确切逻辑)。我将使用 Joda Time 库进行时间计算:
public static void main(String[] args) {
// some setup code for this test:
JavaSparkContext sc = new JavaSparkContext("local", "test");
// input:
final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
// date time ? object region
new String[]{"2016-03-28", "11:00", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"},
new String[]{"2016-03-28", "11:05", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:09", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:00", "X", "object2", "region1"},
new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"}
));
// grouping by key:
final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() {
@Override
public String call(String[] record) throws Exception {
return record[0] + record[3] + record[4]; // date, object, region
}
});
// mapping each "value" (all record matching key) to result
final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
@Override
public String[] call(Iterable<String[]> records) throws Exception {
final Iterator<String[]> iterator = records.iterator();
String[] previousRecord = iterator.next();
int diffMinutes = 0;
for (String[] record : records) {
if (record[2].equals("X")) { // if I got your intention right...
final LocalDateTime prev = getLocalDateTime(previousRecord);
final LocalDateTime curr = getLocalDateTime(record);
diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes();
}
previousRecord = record;
}
return new String[]{
previousRecord[0],
Integer.toString(diffMinutes),
previousRecord[3],
previousRecord[4]
};
}
}).values();
// do whatever with "result"...
}
// extracts a Joda LocalDateTime from a "record"
static LocalDateTime getLocalDateTime(String[] record) {
return LocalDateTime.parse(record[0] + " " + record[1], formatter);
}
static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm");
P.S。在 Scala 中,这大约需要 8 行...:/
我将 .log
文件解析为 JavaRDD,在对这个 JavaRDD 进行排序之后,现在我有了,例如 oldJavaRDD
:
2016-03-28 | 11:00 | X | object1 | region1
2016-03-28 | 11:01 | Y | object1 | region1
2016-03-28 | 11:05 | X | object1 | region1
2016-03-28 | 11:09 | X | object1 | region1
2016-03-28 | 11:00 | X | object2 | region1
2016-03-28 | 11:01 | Z | object2 | region1
如何获得 newJavaRDD
以将其保存到数据库?
新的 JavaRDD 结构必须是:
2016-03-28 | 9 | object1 | region1
2016-03-28 | 1 | object2 | region1
所以,我必须计算当前行和上一行之间的时间(在某些情况下也使用标志 X, Y, Z
来定义,是否向结果添加时间)并在更改 date, objectName
或 [= 后向 JavaRDD 添加新元素24=].
我可以使用这种类型的代码(map),但我认为这种方法不好而且不是最快的方法
JavaRDD<NewObject> newJavaRDD = oldJavaRDD.map { r ->
String datePrev[] = ...
if (datePrev != dateCurr ...) {
return newJavaRdd;
} else {
return null;
}
}
首先,您的代码示例从 创建 newJavaRDD
的转换中引用 newJavaRDD
- 这在几个不同的层面上是不可能的:
- 您不能在变量声明的右侧引用变量...
- 您不能在 RDD 的转换中使用 RDD(同一个或另一个 - 这无关紧要)- 转换中的任何内容都必须由 Spark 序列化,而 Spark 无法序列化自己的RDD(这毫无意义)
那么,你应该怎么做呢?
假设:
- 您的目的是为
date
+object
+region
的每个组合获取一条记录
- 每个这样的组合不应该有太多记录,所以
groupBy
这些字段作为键是安全的
您可以 groupBy
关键字段,然后 mapValues
获取第一条和最后一条记录之间的 "minute distnace" (传递给 mapValues
的函数可以更改为如果我没弄对,请包含您的确切逻辑)。我将使用 Joda Time 库进行时间计算:
public static void main(String[] args) {
// some setup code for this test:
JavaSparkContext sc = new JavaSparkContext("local", "test");
// input:
final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
// date time ? object region
new String[]{"2016-03-28", "11:00", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"},
new String[]{"2016-03-28", "11:05", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:09", "X", "object1", "region1"},
new String[]{"2016-03-28", "11:00", "X", "object2", "region1"},
new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"}
));
// grouping by key:
final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() {
@Override
public String call(String[] record) throws Exception {
return record[0] + record[3] + record[4]; // date, object, region
}
});
// mapping each "value" (all record matching key) to result
final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
@Override
public String[] call(Iterable<String[]> records) throws Exception {
final Iterator<String[]> iterator = records.iterator();
String[] previousRecord = iterator.next();
int diffMinutes = 0;
for (String[] record : records) {
if (record[2].equals("X")) { // if I got your intention right...
final LocalDateTime prev = getLocalDateTime(previousRecord);
final LocalDateTime curr = getLocalDateTime(record);
diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes();
}
previousRecord = record;
}
return new String[]{
previousRecord[0],
Integer.toString(diffMinutes),
previousRecord[3],
previousRecord[4]
};
}
}).values();
// do whatever with "result"...
}
// extracts a Joda LocalDateTime from a "record"
static LocalDateTime getLocalDateTime(String[] record) {
return LocalDateTime.parse(record[0] + " " + record[1], formatter);
}
static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm");
P.S。在 Scala 中,这大约需要 8 行...:/