如何使用 Apache Flink 删除 Cassandra 中的一行?
How can I delete a row in Cassandra using Apache Flink?
在 Apache Flink 中,很容易通过 CassandraSink
向 Cassandra 中插入一行。但是我找不到删除一行的方法。
我也尝试编写自定义接收器,但我得到了 NotSerializableException
。
如何构建删除操作的代码?
public class MyCassandraSink implements SinkFunction<String> {
private Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.build();
private Session cassandra = cluster.connect("mykeyspace");
@Override
public void invoke(String value, Context context) throws Exception {
cassandra.execute("SOME DELETE QUERY");
}
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [com.datastax.driver.core.SessionManager@3b0fe47a] is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
at com.meshkan.streaming.entry.EventListener.main(EventListener.java:42)
Caused by: java.io.NotSerializableException: com.datastax.driver.core.SessionManager
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.concurrent.CopyOnWriteArrayList.writeObject(CopyOnWriteArrayList.java:973)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 9 more
要实现您自己的插入与删除逻辑,请创建一个扩展 CassandraSinkBase
的接收器,然后实现 send()
方法。请参阅 AbstractCassandraTupleSink
作为执行此操作的示例。请注意 CassandraSinkBase
如何通过使其成为瞬态并在 open()
调用中创建它来避免 Cassandra 客户端的序列化问题。
我找到了解决方案,但我不喜欢它。 CassandraPojoInputFormat 可用于删除和更新行。 (我也将它用于 SELECT,顾名思义,这似乎是它的用途。)
恕我直言,它有效的事实是唯一的救赎美德。在找到一个优雅的解决方案之前,我一直在使用它。我还在找...
CassandraPojoInputFormat<MyThingyConnector> myThingyCassandraPojoInputFormat =
new CassandraPojoInputFormat<MyThingyConnector>(
"DELETE FROM " + dbKeyspace + ".<table_name> <where clause>",
clusterBuilder,
MyThingyConnector.class);
myThingyCassandraPojoInputFormat.configure(null);
myThingyCassandraPojoInputFormat.open(cassandraInputSplit);
myThingyCassandraPojoInputFormat.close();
在 Apache Flink 中,很容易通过 CassandraSink
向 Cassandra 中插入一行。但是我找不到删除一行的方法。
我也尝试编写自定义接收器,但我得到了 NotSerializableException
。
如何构建删除操作的代码?
public class MyCassandraSink implements SinkFunction<String> {
private Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.build();
private Session cassandra = cluster.connect("mykeyspace");
@Override
public void invoke(String value, Context context) throws Exception {
cassandra.execute("SOME DELETE QUERY");
}
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [com.datastax.driver.core.SessionManager@3b0fe47a] is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
at com.meshkan.streaming.entry.EventListener.main(EventListener.java:42)
Caused by: java.io.NotSerializableException: com.datastax.driver.core.SessionManager
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.concurrent.CopyOnWriteArrayList.writeObject(CopyOnWriteArrayList.java:973)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 9 more
要实现您自己的插入与删除逻辑,请创建一个扩展 CassandraSinkBase
的接收器,然后实现 send()
方法。请参阅 AbstractCassandraTupleSink
作为执行此操作的示例。请注意 CassandraSinkBase
如何通过使其成为瞬态并在 open()
调用中创建它来避免 Cassandra 客户端的序列化问题。
我找到了解决方案,但我不喜欢它。 CassandraPojoInputFormat 可用于删除和更新行。 (我也将它用于 SELECT,顾名思义,这似乎是它的用途。)
恕我直言,它有效的事实是唯一的救赎美德。在找到一个优雅的解决方案之前,我一直在使用它。我还在找...
CassandraPojoInputFormat<MyThingyConnector> myThingyCassandraPojoInputFormat =
new CassandraPojoInputFormat<MyThingyConnector>(
"DELETE FROM " + dbKeyspace + ".<table_name> <where clause>",
clusterBuilder,
MyThingyConnector.class);
myThingyCassandraPojoInputFormat.configure(null);
myThingyCassandraPojoInputFormat.open(cassandraInputSplit);
myThingyCassandraPojoInputFormat.close();