您如何处理数据块中具有长路径的 blob 存储中的许多文件?
How do you process many files from a blob storage with long paths in databricks?
我已经为 API 管理服务启用了日志记录,并且日志存储在存储帐户中。现在我正尝试在 Azure Databricks 工作区中处理它们,但我正在努力访问这些文件。
问题似乎是自动生成的虚拟文件夹结构如下所示:
/insights-logs-gatewaylogs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json
我已将 insights-logs-gatewaylogs
容器安装在 /mnt/diags
下,并且 dbutils.fs.ls('/mnt/diags')
正确列出了 resourceId=
文件夹,但 dbutils.fs.ls('/mnt/diags/resourceId=')
声明文件未找到
如果我沿着虚拟文件夹结构创建空标记 blob,我可以列出每个后续级别,但该策略显然会失败,因为路径的最后一部分是由 year/month/day/hour 动态组织的。
例如一个
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json")
此错误中的产量:
java.io.FileNotFoundException: File/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019 does not exist.
很明显,外卡已经找到了第一年的文件夹,但拒绝进一步向下。
我在 Azure 数据工厂中设置了一个复制作业,它成功地复制了同一 blob 存储帐户中的所有 json blob 并删除了 resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>
前缀(因此根文件夹以年份组件开头) 并且可以一直成功访问而无需创建空标记 blob。
所以问题似乎与大部分为空的长虚拟文件夹结构有关。
有没有其他方法可以处理数据块中的此类文件夹结构?
更新:我也尝试在安装时提供路径作为 source
的一部分,但这也无济于事
尝试直接从 blob 中读取,而不是通过装载
您需要为此设置访问密钥或 sas,但我假设您知道
SAS
spark.conf.set(
"fs.azure.sas.<container-name>.<storage-account-name>.blob.core.windows.net",
"<complete-query-string-of-sas-for-the-container>")
或访问密钥
spark.conf.set(
"fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
"<storage-account-access-key>")
然后
val df = spark.read.json("wasbs://<container>@<account-name>.blob.core.windows.net/<path>")
我想我可能已经找到了根本原因。应该早点尝试这个,但我提供了现有 blob 的确切路径,如下所示:
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019/m=08/d=20/h=06/m=00/PT1H.json")
我得到了一个更有意义的错误:
shaded.databricks.org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: Incorrect Blob type, please use the correct Blob type to access a blob on the server. Expected BLOCK_BLOB, actual APPEND_BLOB.
事实证明,开箱即用的日志记录会创建追加 blob(并且似乎没有办法更改它)并且从这张票证的外观来看,对追加 blob 的支持仍然是 WIP:https://issues.apache.org/jira/browse/HADOOP-13475
FileNotFoundException
可能是一个转移注意力的问题,这可能是由于在尝试扩展通配符并找到不受支持的 blob 类型时吞噬了内部异常。
更新
终于找到了合理的解决方法。我在我的工作区中安装了 azure-storage
Python 包(如果你在家使用 Scala,它已经安装好了)并自己加载了 blob。下面的大部分代码是添加 globbing 支持,如果您愿意只匹配路径前缀,则不需要它:
%python
import re
import json
from azure.storage.blob import AppendBlobService
abs = AppendBlobService(account_name='<account>', account_key="<access_key>")
base_path = 'resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>'
pattern = base_path + '/*/*/*/*/m=00/*.json'
filter = glob2re(pattern)
spark.sparkContext \
.parallelize([blob.name for blob in abs.list_blobs('insights-logs-gatewaylogs', prefix=base_path) if re.match(filter, blob.name)]) \
.map(lambda blob_name: abs.get_blob_to_bytes('insights-logs-gatewaylogs', blob_name).content.decode('utf-8').splitlines()) \
.flatMap(lambda lines: [json.loads(l) for l in lines]) \
.collect()
glob2re
由 :
提供
def glob2re(pat):
"""Translate a shell PATTERN to a regular expression.
There is no way to quote meta-characters.
"""
i, n = 0, len(pat)
res = ''
while i < n:
c = pat[i]
i = i+1
if c == '*':
#res = res + '.*'
res = res + '[^/]*'
elif c == '?':
#res = res + '.'
res = res + '[^/]'
elif c == '[':
j = i
if j < n and pat[j] == '!':
j = j+1
if j < n and pat[j] == ']':
j = j+1
while j < n and pat[j] != ']':
j = j+1
if j >= n:
res = res + '\['
else:
stuff = pat[i:j].replace('\','\\')
i = j+1
if stuff[0] == '!':
stuff = '^' + stuff[1:]
elif stuff[0] == '^':
stuff = '\' + stuff
res = '%s[%s]' % (res, stuff)
else:
res = res + re.escape(c)
return res + '\Z(?ms)'
不漂亮,但避免了数据的复制,可以包含在一个小实用程序中 class。
暂时不支持此操作。微软提供的技术不能相互工作是废话(例如 BYOML -> Log Analytics 和导出规则到存储帐户,然后从 Databricks 读取数据)。
有一个解决方法。您可以创建自己的自定义 class 并阅读。请查看在 BYOML git:
上读取 am-securityevent 数据的示例
我已经为 API 管理服务启用了日志记录,并且日志存储在存储帐户中。现在我正尝试在 Azure Databricks 工作区中处理它们,但我正在努力访问这些文件。
问题似乎是自动生成的虚拟文件夹结构如下所示:
/insights-logs-gatewaylogs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json
我已将 insights-logs-gatewaylogs
容器安装在 /mnt/diags
下,并且 dbutils.fs.ls('/mnt/diags')
正确列出了 resourceId=
文件夹,但 dbutils.fs.ls('/mnt/diags/resourceId=')
声明文件未找到
如果我沿着虚拟文件夹结构创建空标记 blob,我可以列出每个后续级别,但该策略显然会失败,因为路径的最后一部分是由 year/month/day/hour 动态组织的。
例如一个
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json")
此错误中的产量:
java.io.FileNotFoundException: File/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019 does not exist.
很明显,外卡已经找到了第一年的文件夹,但拒绝进一步向下。
我在 Azure 数据工厂中设置了一个复制作业,它成功地复制了同一 blob 存储帐户中的所有 json blob 并删除了 resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>
前缀(因此根文件夹以年份组件开头) 并且可以一直成功访问而无需创建空标记 blob。
所以问题似乎与大部分为空的长虚拟文件夹结构有关。
有没有其他方法可以处理数据块中的此类文件夹结构?
更新:我也尝试在安装时提供路径作为 source
的一部分,但这也无济于事
尝试直接从 blob 中读取,而不是通过装载
您需要为此设置访问密钥或 sas,但我假设您知道
SAS
spark.conf.set(
"fs.azure.sas.<container-name>.<storage-account-name>.blob.core.windows.net",
"<complete-query-string-of-sas-for-the-container>")
或访问密钥
spark.conf.set(
"fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
"<storage-account-access-key>")
然后
val df = spark.read.json("wasbs://<container>@<account-name>.blob.core.windows.net/<path>")
我想我可能已经找到了根本原因。应该早点尝试这个,但我提供了现有 blob 的确切路径,如下所示:
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019/m=08/d=20/h=06/m=00/PT1H.json")
我得到了一个更有意义的错误:
shaded.databricks.org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: Incorrect Blob type, please use the correct Blob type to access a blob on the server. Expected BLOCK_BLOB, actual APPEND_BLOB.
事实证明,开箱即用的日志记录会创建追加 blob(并且似乎没有办法更改它)并且从这张票证的外观来看,对追加 blob 的支持仍然是 WIP:https://issues.apache.org/jira/browse/HADOOP-13475
FileNotFoundException
可能是一个转移注意力的问题,这可能是由于在尝试扩展通配符并找到不受支持的 blob 类型时吞噬了内部异常。
更新
终于找到了合理的解决方法。我在我的工作区中安装了 azure-storage
Python 包(如果你在家使用 Scala,它已经安装好了)并自己加载了 blob。下面的大部分代码是添加 globbing 支持,如果您愿意只匹配路径前缀,则不需要它:
%python
import re
import json
from azure.storage.blob import AppendBlobService
abs = AppendBlobService(account_name='<account>', account_key="<access_key>")
base_path = 'resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>'
pattern = base_path + '/*/*/*/*/m=00/*.json'
filter = glob2re(pattern)
spark.sparkContext \
.parallelize([blob.name for blob in abs.list_blobs('insights-logs-gatewaylogs', prefix=base_path) if re.match(filter, blob.name)]) \
.map(lambda blob_name: abs.get_blob_to_bytes('insights-logs-gatewaylogs', blob_name).content.decode('utf-8').splitlines()) \
.flatMap(lambda lines: [json.loads(l) for l in lines]) \
.collect()
glob2re
由 :
def glob2re(pat):
"""Translate a shell PATTERN to a regular expression.
There is no way to quote meta-characters.
"""
i, n = 0, len(pat)
res = ''
while i < n:
c = pat[i]
i = i+1
if c == '*':
#res = res + '.*'
res = res + '[^/]*'
elif c == '?':
#res = res + '.'
res = res + '[^/]'
elif c == '[':
j = i
if j < n and pat[j] == '!':
j = j+1
if j < n and pat[j] == ']':
j = j+1
while j < n and pat[j] != ']':
j = j+1
if j >= n:
res = res + '\['
else:
stuff = pat[i:j].replace('\','\\')
i = j+1
if stuff[0] == '!':
stuff = '^' + stuff[1:]
elif stuff[0] == '^':
stuff = '\' + stuff
res = '%s[%s]' % (res, stuff)
else:
res = res + re.escape(c)
return res + '\Z(?ms)'
不漂亮,但避免了数据的复制,可以包含在一个小实用程序中 class。
暂时不支持此操作。微软提供的技术不能相互工作是废话(例如 BYOML -> Log Analytics 和导出规则到存储帐户,然后从 Databricks 读取数据)。
有一个解决方法。您可以创建自己的自定义 class 并阅读。请查看在 BYOML git:
上读取 am-securityevent 数据的示例