Kafka 连接 属性 partition.duration.ms 和刷新大小之间的关系?
Kafka connect property relation between partition.duration.ms and flush size?
谁能解释一下下面配置中partition.duration.ms和flushsize的意义。
设置这些属性背后的想法是什么?
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"partition.duration.ms": "1000",
"topics.dir": "root_bucket",
"flush.size": "10",
"topics": "TEST_SRV",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"locale": "US",
"key.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "events-dev-s3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_data.created_at"
分区持续时间决定了基于时间的分区程序创建新 "path.format" 的频率。在您的情况下,1 秒的分区持续时间没有意义,因为您已将分区程序设置为仅按小时进行分区。
然后刷新大小是任何给定文件中将存在多少 Kafka 记录的上限
这些值背后的想法取决于您的主题的吞吐量以及在您从 S3 读取记录而不是直接从 Kafka 读取记录之前您愿意容忍多少延迟。
请注意,您为每次 S3 扫描付费,因此更高的刷新率和更少的整体文件将有助于节省资金
a 1 second partition duration doesn't make sense because you've set the partitioner to only make hourly partitions.
分区程序未设置为仅按小时进行分区。
"path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH"
这会将 目录结构粒度 设置为 小时
"partition.duration.ms": "1000"
这会将连接器配置为为每个 'second' 数据输出一个文件(..每个输入分区)
文件将写入 'hourly' 目录,其中包含 'second' 生成文件的目录。
即每小时目录将包含该小时的所有数据(在本例中,所有 per-second 文件)
谁能解释一下下面配置中partition.duration.ms和flushsize的意义。 设置这些属性背后的想法是什么?
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"partition.duration.ms": "1000",
"topics.dir": "root_bucket",
"flush.size": "10",
"topics": "TEST_SRV",
"tasks.max": "1",
"s3.part.size": "5242880",
"timezone": "UTC",
"locale": "US",
"key.converter.schemas.enable": "true",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "events-dev-s3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_data.created_at"
分区持续时间决定了基于时间的分区程序创建新 "path.format" 的频率。在您的情况下,1 秒的分区持续时间没有意义,因为您已将分区程序设置为仅按小时进行分区。
然后刷新大小是任何给定文件中将存在多少 Kafka 记录的上限
这些值背后的想法取决于您的主题的吞吐量以及在您从 S3 读取记录而不是直接从 Kafka 读取记录之前您愿意容忍多少延迟。
请注意,您为每次 S3 扫描付费,因此更高的刷新率和更少的整体文件将有助于节省资金
a 1 second partition duration doesn't make sense because you've set the partitioner to only make hourly partitions.
分区程序未设置为仅按小时进行分区。
"path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH"
这会将 目录结构粒度 设置为 小时
"partition.duration.ms": "1000"
这会将连接器配置为为每个 'second' 数据输出一个文件(..每个输入分区)
文件将写入 'hourly' 目录,其中包含 'second' 生成文件的目录。
即每小时目录将包含该小时的所有数据(在本例中,所有 per-second 文件)