您如何处理数据块中具有长路径的 blob 存储中的许多文件?

How do you process many files from a blob storage with long paths in databricks?

我已经为 API 管理服务启用了日志记录,并且日志存储在存储帐户中。现在我正尝试在 Azure Databricks 工作区中处理它们,但我正在努力访问这些文件。

问题似乎是自动生成的虚拟文件夹结构如下所示:

/insights-logs-gatewaylogs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json

我已将 insights-logs-gatewaylogs 容器安装在 /mnt/diags 下,并且 dbutils.fs.ls('/mnt/diags') 正确列出了 resourceId= 文件夹,但 dbutils.fs.ls('/mnt/diags/resourceId=') 声明文件未找到

如果我沿着虚拟文件夹结构创建空标记 blob,我可以列出每个后续级别,但该策略显然会失败,因为路径的最后一部分是由 year/month/day/hour 动态组织的。

例如一个

spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json")

此错误中的产量:

java.io.FileNotFoundException: File/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019 does not exist.

很明显,外卡已经找到了第一年的文件夹,但拒绝进一步向下。

我在 Azure 数据工厂中设置了一个复制作业,它成功地复制了同一 blob 存储帐户中的所有 json blob 并删除了 resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service> 前缀(因此根文件夹以年份组件开头) 并且可以一直成功访问而无需创建空标记 blob。

所以问题似乎与大部分为空的长虚拟文件夹结构有关。

有没有其他方法可以处理数据块中的此类文件夹结构?

更新:我也尝试在安装时提供路径作为 source 的一部分,但这也无济于事

尝试直接从 blob 中读取,而不是通过装载

您需要为此设置访问密钥或 sas,但我假设您知道

SAS

spark.conf.set(
   "fs.azure.sas.<container-name>.<storage-account-name>.blob.core.windows.net",
   "<complete-query-string-of-sas-for-the-container>")

访问密钥

spark.conf.set(
   "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
   "<storage-account-access-key>")

然后

val df = spark.read.json("wasbs://<container>@<account-name>.blob.core.windows.net/<path>")

我想我可能已经找到了根本原因。应该早点尝试这个,但我提供了现有 blob 的确切路径,如下所示:

spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019/m=08/d=20/h=06/m=00/PT1H.json")

我得到了一个更有意义的错误:

shaded.databricks.org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: Incorrect Blob type, please use the correct Blob type to access a blob on the server. Expected BLOCK_BLOB, actual APPEND_BLOB.

事实证明,开箱即用的日志记录会创建追加 blob(并且似乎没有办法更改它)并且从这张票证的外观来看,对追加 blob 的支持仍然是 WIP:https://issues.apache.org/jira/browse/HADOOP-13475

FileNotFoundException 可能是一个转移注意力的问题,这可能是由于在尝试扩展通配符并找到不受支持的 blob 类型时吞噬了内部异常。

更新

终于找到了合理的解决方法。我在我的工作区中安装了 azure-storage Python 包(如果你在家使用 Scala,它已经安装好了)并自己加载了 blob。下面的大部分代码是添加 globbing 支持,如果您愿意只匹配路径前缀,则不需要它:

%python

import re
import json
from azure.storage.blob import AppendBlobService


abs = AppendBlobService(account_name='<account>', account_key="<access_key>")

base_path = 'resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>'
pattern = base_path + '/*/*/*/*/m=00/*.json'
filter = glob2re(pattern)

spark.sparkContext \
     .parallelize([blob.name for blob in abs.list_blobs('insights-logs-gatewaylogs', prefix=base_path) if re.match(filter, blob.name)]) \
     .map(lambda blob_name: abs.get_blob_to_bytes('insights-logs-gatewaylogs', blob_name).content.decode('utf-8').splitlines()) \
     .flatMap(lambda lines: [json.loads(l) for l in lines]) \
     .collect()

glob2re 由 :

提供
def glob2re(pat):
    """Translate a shell PATTERN to a regular expression.

    There is no way to quote meta-characters.
    """

    i, n = 0, len(pat)
    res = ''
    while i < n:
        c = pat[i]
        i = i+1
        if c == '*':
            #res = res + '.*'
            res = res + '[^/]*'
        elif c == '?':
            #res = res + '.'
            res = res + '[^/]'
        elif c == '[':
            j = i
            if j < n and pat[j] == '!':
                j = j+1
            if j < n and pat[j] == ']':
                j = j+1
            while j < n and pat[j] != ']':
                j = j+1
            if j >= n:
                res = res + '\['
            else:
                stuff = pat[i:j].replace('\','\\')
                i = j+1
                if stuff[0] == '!':
                    stuff = '^' + stuff[1:]
                elif stuff[0] == '^':
                    stuff = '\' + stuff
                res = '%s[%s]' % (res, stuff)
        else:
            res = res + re.escape(c)
    return res + '\Z(?ms)'

不漂亮,但避免了数据的复制,可以包含在一个小实用程序中 class。

暂时不支持此操作。微软提供的技术不能相互工作是废话(例如 BYOML -> Log Analytics 和导出规则到存储帐户,然后从 Databricks 读取数据)。

有一个解决方法。您可以创建自己的自定义 class 并阅读。请查看在 BYOML git:

上读取 am-securityevent 数据的示例

https://github.com/Azure/Azure-Sentinel-BYOML