Azure Blob 存储和 Azure 数据块之间的高效数据检索过程

Efficient data retrieval process between Azure Blob storage and Azure databricks

我正在设计一个新的数据格局,目前正在开发我的概念证明。在这里我使用以下架构: Azure 函数 --> Azure 事件中心 --> Azure Blob 存储 --> Azure 工厂 --> Azure databricks --> Azure SQL 服务器。

我目前正在思考的是如何优化“数据检索”以在 Azure Databricks 上提供我的 ETL 过程。

我正在处理通过前面的通道按分钟提交到 Azure blob 存储的事务性工厂数据。因此,我最终每天需要处理 86000 个文件。实际上,这是要处理的大量单独文件。目前,我使用以下代码来构建当前存在于 azure blob 存储中的文件名列表。接下来,我通过循环读取每个文件来检索它们。

我面临的问题是这个过程花费的时间。当然,我们在这里谈论的是需要读取的大量小文件。所以我不希望这个过程在几分钟内完成。

我知道升级 databricks 集群可能会解决问题,但我不确定只有这样才能解决问题,看看在这种情况下我需要传输的文件数量。我正在 运行 遵循 databricks 的代码。

# Define function to list content of mounted folder
def get_dir_content(ls_path):
  dir_paths = ""
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
filenames = []
paths = 0

mount_point = "PATH"

paths = get_dir_content(mount_point)
for p in paths:
#   print(p)
  filenames.append(p)

avroFile = pd.DataFrame(filenames)
avroFileList = avroFile[(avroFile[0].str.contains('.avro')) & (avroFile[0].str.contains('dbfs:/mnt/PATH'))]
avro_result = []
# avro_file = pd.DataFrame()
avro_complete = pd.DataFrame()
for i in avroFileList[0]:
  avro_file = spark.read.format("avro").load(i)
  avro_result.append(avro_file)

最后,我正在为所有这些文件做一个联合,以创建它们的一个数据框。

# Schema definiëren op basis van 
avro_df = avro_result[0]

# Union all dataframe
for i in avro_result:
  avro_df = avro_df.union(i)

display(avro_df)

我想知道如何优化这个过程。按分钟输出的原因是,一旦我们有了分析报告架构(为此我们只需要一个日常流程),我们计划稍后构建一个“近乎实时的洞察力”。

有多种方法可以做到这一点,但我会这样做:

每当在您的 Azure 存储帐户中创建新的 Blob 时,使用 Azure Functions 触发您的 python 代码。这将删除代码的轮询部分,并在存储帐户上有可用文件后立即将数据发送到数据块

例如,对于近乎实时的报告,您可以在事件中心使用 Azure 流分析和 运行 查询并输出到 Power Bi。

与其列出文件,然后单独阅读它们,我建议您查看 Azure Databricks Autoloader。它可能会使用通知来查找上传到 blob 存储的新文件,而不是列出文件。

它也可以同时处理多个文件,而不是一个一个地读取它们并进行合并。

如果不需要连续处理数据,那么可以使用.trigger(once=True)来模拟数据的批量加载。