如何使用 Apache Flink 从 Cassandra 读取数据?
How to read from Cassandra using Apache Flink?
我的 flink 程序应该为每个输入记录执行 Cassandra 查找,并根据结果做一些进一步的处理。
但我目前无法从 Cassandra 读取数据。这是我到目前为止想出的代码片段。
ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}
但问题是,每次查找需要将近 10 秒,换句话说,这个 for
循环需要 50 秒才能执行。
如何加快此操作?或者,还有其他方法可以在 Flink 中查找 Cassandra 吗?
我想出了一个解决方案,该解决方案在使用流数据查询 Cassandra 时速度相当快。对有同样问题的人有用。
首先,Cassandra 可以用最少的代码查询,
Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");
但这样做的问题是,创建 Session
是一项非常耗时的操作,并且应该为每个键 space 完成一次。您创建 Session
一次并将其重新用于所有读取查询。
现在,由于 Session
不是 Java 可序列化的,它不能作为参数传递给 Flink 运算符,如 Map
或 ProcessFunction
。有几种方法可以解决这个问题,您可以使用 RichFunction 并在其 Open
方法中对其进行初始化,或者使用 Singleton。我将使用第二种解决方案。
在我们创建 Session
.
的地方创建一个单例 Class
public class CassandraSessionSingleton {
private static CassandraSessionSingleton cassandraSessionSingleton = null;
public Session session;
private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
Cluster cluster = clusterBuilder.getCluster();
session = cluster.connect();
}
public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
if (cassandraSessionSingleton == null)
cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
return cassandraSessionSingleton;
}
}
您以后可以使用此会话进行所有查询。这里我以ProcessFunction
查询为例。
public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
ClusterBuilder secureCassandraSinkClusterBuilder;
// Constructor
public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
}
@Override
public void ProcessElement (Object obj) throws Exception {
ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
return resultSet;
}
}
请注意,您可以将 ClusterBuilder
传递给 ProcessFunction
,因为它是可序列化的。现在我们执行查询的 cassandraLookUp
方法。
public class CassandraLookUp {
public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
Session session = cassandraSessionSingleton.session;
ResultSet resultSet = session.execute(query);
return resultSet;
}
}
只在第一次查询时创建单例对象运行,之后重复使用同一个对象,所以查找没有延迟。
我的 flink 程序应该为每个输入记录执行 Cassandra 查找,并根据结果做一些进一步的处理。
但我目前无法从 Cassandra 读取数据。这是我到目前为止想出的代码片段。
ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
.withPort(props.getCassandraPort())
.withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
.build();
}
};
for (int i=1; i<5; i++) {
CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);
Tuple2<String, String> out = new Tuple8<>();
cassandraInputFormat.nextRecord(out);
System.out.println(out);
}
但问题是,每次查找需要将近 10 秒,换句话说,这个 for
循环需要 50 秒才能执行。
如何加快此操作?或者,还有其他方法可以在 Flink 中查找 Cassandra 吗?
我想出了一个解决方案,该解决方案在使用流数据查询 Cassandra 时速度相当快。对有同样问题的人有用。
首先,Cassandra 可以用最少的代码查询,
Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");
但这样做的问题是,创建 Session
是一项非常耗时的操作,并且应该为每个键 space 完成一次。您创建 Session
一次并将其重新用于所有读取查询。
现在,由于 Session
不是 Java 可序列化的,它不能作为参数传递给 Flink 运算符,如 Map
或 ProcessFunction
。有几种方法可以解决这个问题,您可以使用 RichFunction 并在其 Open
方法中对其进行初始化,或者使用 Singleton。我将使用第二种解决方案。
在我们创建 Session
.
public class CassandraSessionSingleton {
private static CassandraSessionSingleton cassandraSessionSingleton = null;
public Session session;
private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
Cluster cluster = clusterBuilder.getCluster();
session = cluster.connect();
}
public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
if (cassandraSessionSingleton == null)
cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
return cassandraSessionSingleton;
}
}
您以后可以使用此会话进行所有查询。这里我以ProcessFunction
查询为例。
public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
ClusterBuilder secureCassandraSinkClusterBuilder;
// Constructor
public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
}
@Override
public void ProcessElement (Object obj) throws Exception {
ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
return resultSet;
}
}
请注意,您可以将 ClusterBuilder
传递给 ProcessFunction
,因为它是可序列化的。现在我们执行查询的 cassandraLookUp
方法。
public class CassandraLookUp {
public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
Session session = cassandraSessionSingleton.session;
ResultSet resultSet = session.execute(query);
return resultSet;
}
}
只在第一次查询时创建单例对象运行,之后重复使用同一个对象,所以查找没有延迟。