使用 Spark 流将 protobuf 保存在 Hbase/HDFS 中

Saving protobuf in Hbase/HDFS using Spark streaming

我希望使用 spark 流将 protobuf 消息存储在 Hbase/HDFS 中。我有以下两个问题

  1. 存储大量protobuf的有效方法是什么 消息以及检索消息以执行某些操作的有效方法 分析?例如,它们应该存储为 Strings/byte[] 在 Hbase 中还是应该存储在 HDFS 等的镶木地板文件中
  2. 一个protobuf的层级结构应该如何 消息被存储?我的意思是,嵌套元素是否应该展平 在存储之前退出,或者是否有任何机制可以按原样存储它们? 如果嵌套元素是集合或映射,它们应该是 展开并存储为多行?

Protobuf消息的示例结构如下所示

>     +--MsgNode-1
>       +--Attribute1 - String
>       +--Attribute2 - Int
>       +--MsgNode-2
>         +--Attribute1 - String
>         +--Attribute2 - Double
>         +--MsgNode-3 - List of MsgNode-3's
>           +--Attribute1 - Int

我计划使用 Spark 流式处理以字节形式收集 protobuf 消息并将它们存储在 Hbase/HDFS。

问题一:

What is the efficient way of storing huge number of protobuf messages and the efficient way of retrieving them to do some analytics? For example, should they be stored as Strings/byte[] in Hbase or Should they be stored in parquet files in HDFS etc.

我会推荐 - 将 Proto-buf 存储为 Parquet AVRO 文件(使用 AVRO 模式拆分为有意义的消息)。

这可以使用数据帧 api spark 1.5 及更高版本(PartiotionBySaveMode.Append

来实现

看到这个a-powerful-big-data-trio

如果存储为字符串或字节数组,则无法直接进行数据分析(查询原始数据)。

如果你使用的是cloudera,impala(支持parquet-avro)可以用来查询rawdata

问题二:

How should the hierarchical structure of a protobuf messages be stored? I mean, should the nested elements be flattened out before storage, or is there any mechanism to store them as is? If the nested elements are collections or maps should they be exploded and stored as multiple rows?

如果你从spark streaming中以原始格式存储数据,如果业务想要查询并知道他们收到了什么样的数据,你将如何查询(这种需求很常见)。

首先,你必须了解你的数据(即protobuf中不同消息之间的关系,以便你可以决定单行或多行)然后开发protobuf解析器来解析protobuf的消息结构。根据您的数据,将其转换为 avro 通用记录以另存为 parquet 文件。

提示:

protobuf 解析器可以根据您的要求以不同的方式开发。 其中一种通用方式如下例所示。

public SortedMap<String, Object> convertProtoMessageToMap(GeneratedMessage src) {

    final SortedMap<String, Object> finalMap = new TreeMap<String, Object>();
    final Map<FieldDescriptor, Object> fields = src.getAllFields();

    for (final Map.Entry<FieldDescriptor, Object> fieldPair : fields.entrySet()) {

        final FieldDescriptor desc = fieldPair.getKey();

        if (desc.isRepeated()) {
            final List<?> fieldList = (List<?>) fieldPair.getValue();
            if (fieldList.size() != 0) {
                final List<String> arrayListOfElements = new ArrayList<String>();
                for (final Object o : fieldList) {
                    arrayListOfElements.add(o.toString());
                }
                finalMap.put(desc.getName(), arrayListOfElements);
            }
        } else {

            finalMap.put(desc.getName(), fieldPair.getValue().toString());

        }

    }
    return finalMap;
}