Top.smallestPerKey() 不适用于排序时间戳
Top.smallestPerKey() not working for sorting timestamp
我有一个用例,其中 PCollection 包含键值对,键是用户 ID,值是用户与应用程序交互的时间戳。
出于 ETL 目的,我想创建一个包含键值对的 PCollection,其中键是用户 ID,值是用户首次与应用程序交互的时间戳。
我正在使用 Top.smallestPerKey() 转换来获取唯一用户 ID 和最早时间戳的 PCollection。
代码片段如下 -
PCollection<KV<String, Timestamp>> keyedUserAndTimestamp =
a.apply(ParDo.named("Getting minimum timestamp for a user.").of(
new DoFn<TableRow, KV<String, Long>>(){
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element().get("user_id").toString(),
Timestamp.valueOf(c.element().get("time_stamp").toString())));
}
}));
PCollection<KV<String, List<Timestamp>>> minTimestampPerUser =
keyedFromUserAndTimestamp.apply(Top.smallestPerKey(1));
这似乎对我不起作用。我收到以下错误 -
The method apply(PTransform<? super PCollection<KV<String,Long>>,OutputT>)
in the type PCollection<KV<String,Long>>
is not applicable for the arguments
(PTransform<
PCollection<KV<Object,Comparable<Comparable<V>>>>,
PCollection<KV<Object,List<Comparable<Comparable<V>>>>>>)
我是 Google 云数据流和 Java 的新手,所以我可能还遗漏了一些非常明显的东西。
有几个问题我希望社区能提供意见 -
- 这是找到时间戳最小值的正确方法吗?
- 我使用的转换是否正确?如果不是,这里的最佳做法是什么?
这是一个 Java 类型的错误,并非特定于 Dataflow。在这种情况下,我相信您被 Java 的类型推断限制所困扰,您必须向 Top.smallestPerKey
提供显式类型参数,特别是 Top.<String, Long>smallestPerKey(1)
.
既然你提到你是 Java 的新手,我会更详细地介绍。 Top.smallestPerKey
的签名是:
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
smallestPerKey(int count)
其中 K
和 V
是调用函数时确定的类型。 Java 未能推断出 K
和 V
应该适合您。在这种情况下,Java 回落到类型的上限:
K
没有上限,所以选择Object
.
V
的上限为 Comparable<V>
(尽管看起来 self-reference,但这是完全合理且常见的事情)所以 Java 选择 Comparable<V>
,即使 V
在出现错误消息的上下文中甚至不是有效类型。
我有一个用例,其中 PCollection 包含键值对,键是用户 ID,值是用户与应用程序交互的时间戳。
出于 ETL 目的,我想创建一个包含键值对的 PCollection,其中键是用户 ID,值是用户首次与应用程序交互的时间戳。
我正在使用 Top.smallestPerKey() 转换来获取唯一用户 ID 和最早时间戳的 PCollection。
代码片段如下 -
PCollection<KV<String, Timestamp>> keyedUserAndTimestamp =
a.apply(ParDo.named("Getting minimum timestamp for a user.").of(
new DoFn<TableRow, KV<String, Long>>(){
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element().get("user_id").toString(),
Timestamp.valueOf(c.element().get("time_stamp").toString())));
}
}));
PCollection<KV<String, List<Timestamp>>> minTimestampPerUser =
keyedFromUserAndTimestamp.apply(Top.smallestPerKey(1));
这似乎对我不起作用。我收到以下错误 -
The method apply(PTransform<? super PCollection<KV<String,Long>>,OutputT>)
in the type PCollection<KV<String,Long>>
is not applicable for the arguments
(PTransform<
PCollection<KV<Object,Comparable<Comparable<V>>>>,
PCollection<KV<Object,List<Comparable<Comparable<V>>>>>>)
我是 Google 云数据流和 Java 的新手,所以我可能还遗漏了一些非常明显的东西。
有几个问题我希望社区能提供意见 -
- 这是找到时间戳最小值的正确方法吗?
- 我使用的转换是否正确?如果不是,这里的最佳做法是什么?
这是一个 Java 类型的错误,并非特定于 Dataflow。在这种情况下,我相信您被 Java 的类型推断限制所困扰,您必须向 Top.smallestPerKey
提供显式类型参数,特别是 Top.<String, Long>smallestPerKey(1)
.
既然你提到你是 Java 的新手,我会更详细地介绍。 Top.smallestPerKey
的签名是:
public static <K, V extends Comparable<V>>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>>
smallestPerKey(int count)
其中 K
和 V
是调用函数时确定的类型。 Java 未能推断出 K
和 V
应该适合您。在这种情况下,Java 回落到类型的上限:
K
没有上限,所以选择Object
.V
的上限为Comparable<V>
(尽管看起来 self-reference,但这是完全合理且常见的事情)所以 Java 选择Comparable<V>
,即使V
在出现错误消息的上下文中甚至不是有效类型。