如何使用 Apache Flink 从 Cassandra 读取数据?

How to read from Cassandra using Apache Flink?

我的 flink 程序应该为每个输入记录执行 Cassandra 查找,并根据结果做一些进一步的处理。

但我目前无法从 Cassandra 读取数据。这是我到目前为止想出的代码片段。

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .build();
        }
    };

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        cassandraInputFormat.configure(null);
        cassandraInputFormat.open(null);
        Tuple2<String, String> out = new Tuple8<>();
        cassandraInputFormat.nextRecord(out);
        System.out.println(out);
    }

但问题是,每次查找需要将近 10 秒,换句话说,这个 for 循环需要 50 秒才能执行。

如何加快此操作?或者,还有其他方法可以在 Flink 中查找 Cassandra 吗?

我想出了一个解决方案,该解决方案在使用流数据查询 Cassandra 时速度相当快。对有同样问题的人有用。

首先,Cassandra 可以用最少的代码查询,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

但这样做的问题是,创建 Session 是一项非常耗时的操作,并且应该为每个键 space 完成一次。您创建 Session 一次并将其重新用于所有读取查询。

现在,由于 Session 不是 Java 可序列化的,它不能作为参数传递给 Flink 运算符,如 MapProcessFunction。有几种方法可以解决这个问题,您可以使用 RichFunction 并在其 Open 方法中对其进行初始化,或者使用 Singleton。我将使用第二种解决方案。

在我们创建 Session.

的地方创建一个单例 Class
public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();
    }

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;
    }

}

您以后可以使用此会话进行所有查询。这里我以ProcessFunction查询为例。

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;
    }

    @Override
    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;
    }
}

请注意,您可以将 ClusterBuilder 传递给 ProcessFunction,因为它是可序列化的。现在我们执行查询的 cassandraLookUp 方法。

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;
    }
}

只在第一次查询时创建单例对象运行,之后重复使用同一个对象,所以查找没有延迟。