在 Glue 中使用 ID、年份和月份进行分区

Partition by using ID, year and month in Glue

我正在尝试使用 merchant_id、年份和月份执行分区,因为您可以在数据接收器中签入。

只涉及 merchant_id 的分区过程 运行 没问题。我的数据源中已经有了该列。

但是我没有年份和月份。所以我正在尝试获取 created_at,将其拆分并在同一 table 中添加 'year' 和 'month' 列。所以这样我就可以执行分区(merchant_id,年,月)。

有人可以帮我吗?这是 Glue 中的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "bills", transformation_ctx = "DataSource0")


df0 = DataSource0.toDF()
dataframe1 = df0.withColumn("filedate", df0.created_at)

dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")

def map_function(dynamicRecord):
    date = dynamicRecord["filedate"].split("-")[0][-8:]
    dynamicRecord["year"] = date[0:4]
    dynamicRecord["month"] = date[4:6]
    dynamicRecord["day"]= date[6:8]
    return dynamicRecord
    
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")

Transform2 = ApplyMapping.apply(frame = mapping3, mappings = [("op", "string", "bills_op", "string"), ("timestamp", "string", "bills_timestamp", "string"), ("id", "int", "bills_id", "int"), ("subscription_id", "int", "bills_subscription_id", "int"), ("customer_id", "int", "bills_customer_id", "int"), ("amount", "decimal", "bills_amount", "decimal"), ("created_at", "timestamp", "bills_created_at", "timestamp"), ("updated_at", "timestamp", "bills_updated_at", "timestamp"), ("status", "int", "bills_status", "int"), ("payment_method_id", "int", "bills_payment_method_id", "int"), ("due_at", "timestamp", "bills_due_at", "timestamp"), ("billing_at", "timestamp", "bills_billing_at", "timestamp"), ("installments", "int", "bills_installments", "int"), ("merchant_id", "int", "bills_merchant_id", "int"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "Transform2")

DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "clientes_ativos_enterprise", transformation_ctx = "DataSource1")

Transform0 = ApplyMapping.apply(frame = DataSource1, mappings = [("meta_id", "int", "filter_meta_id", "int"), ("meta_value", "string", "filter_meta_value", "string"), ("merc_id", "int", "filter_merc_id", "int")], transformation_ctx = "Transform0")

Transform1 = Join.apply(frame1 = Transform0, frame2 = Transform2, keys2 = ["bills_merchant_id"], keys1 = ["filter_merc_id"], transformation_ctx = "Transform1")

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/", "compression": "gzip", "partitionKeys": ["bills_merchant_id","year","month"]}, transformation_ctx = "DataSink0")
job.commit()

这是完整的消息错误:

 Traceback (most recent call last):
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o151.pyWriteDynamicFrame.
: org.apache.spark.sql.AnalysisException: 
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
         ;
    at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$validateSchema(DataSource.scala:733)
    at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:523)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame.apply(DataSink.scala:535)
    at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame.apply(DataSink.scala:522)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme.apply(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
    at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/bills_partition_filtered.py", line 71, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = 
{
    "path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/",
    "compression": "gzip",
    "partitionKeys": [
        "bills_merchant_id",
        "year",
        "month"
    ]
}
, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: '\nDatasource does not support writing empty or nested empty schemas.\nPlease make sure the data schema has at least one or more column(s).\n 

谢谢大家!!

  1. 错误跟踪表示数据帧模式存在问题。在写作之前,您应该先看看 df.printSchema 以了解您的架构是否正确。

  2. 您应该将 created_at 列转换为 date/datetime。

  3. 使用 withColumn 函数解析 created_at 列的年份和月份,而不是静态取值,这可能会导致将来不一致。

dynamicRecord["year"] = date[0:4]

这不是解析日期的好方法。

按照此处的答案申请#3: