Apache Beam - 基于时间关系的流式连接
Apache Beam - Streaming Join on Temporal Relation
我想使用 Apache Beam Python SDK 构建一个用于回测的事件驱动交易系统。此系统中的许多 PTransform
操作属于“时间有效性 Window”/“时间连接”类型。
例如,以Streaming Systems, the book by the Beam folks, concerning currency quotes and trades. A similar example appears in an earlier paper中的工作示例为例。
Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07
Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59
假设这两个示例都是无界集合的代理(例如,2 个具有键=货币对的 Kafka 主题)。我完全不知道如何使用 Apache Beam API 对这两个(可能无界的)集合进行左连接以生成以下输出。
Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
如何使用 Windows、触发器和 CoGroupByKey
实现上述 PTransform
?
当前代码 - 只是一些带占位符的样板文件
"""Testing Apache beam joins."""
import logging
import datetime
import decimal
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Schema and Transforms
class Quote(typing.NamedTuple):
base_currency: str
quote_currency: str
price: decimal.Decimal
time_ms: int
class ConvertQuote(beam.DoFn):
def process(self, element):
pair, price_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
price = decimal.Decimal(price_str)
time_ms = int(self._time_ms(time_str))
yield Quote(base_currency, quote_currency, price, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddQuoteTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
class Order(typing.NamedTuple):
base_currency: str
quote_currency: str
quantity: int
time_ms: int
class ConvertOrder(beam.DoFn):
def process(self, element):
pair, quantity_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
quantity = int(quantity_str)
time_ms = int(self._time_ms(time_str))
yield Order(base_currency, quote_currency, quantity, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddOrderTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
PAIRS = ["EUR/JPY", "USD/JPY"] # Maybe pass this in as an option?
def by_pair(item, num_pairs):
return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")
# Administrative
LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
parser.add_argument("--trades-file", dest="trades_file", default="trades")
options = PipelineOptions()
my_options = options.view_as(MyOptions)
# Main
with beam.Pipeline(options=my_options) as p:
eurjpy_quotes, usdjpy_quotes = (
p
| "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
| "ConvertQuotes" >> beam.ParDo(ConvertQuote())
| "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
| "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
eurjpy_orders, usdjpy_orders = (
p
| "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
| "ConvertOrders" >> beam.ParDo(ConvertOrder())
| "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
| "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
# Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
# This is just a placeholder for now.
eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)
为了处理时间序列,通常最好使用 State 和 Timers API。
Original Blog on State and Timers
State and Timers Documentation
java 中还有一些关于临时连接的当前 WIP temporal example
我想使用 Apache Beam Python SDK 构建一个用于回测的事件驱动交易系统。此系统中的许多 PTransform
操作属于“时间有效性 Window”/“时间连接”类型。
例如,以Streaming Systems, the book by the Beam folks, concerning currency quotes and trades. A similar example appears in an earlier paper中的工作示例为例。
Right Table: quotes.txt (Currency Pair, Price, Event Time, Processing Time)
USD/JPY,102.0000,12:00:00,12:04:13
EUR/JPY,114.0000,12:00:30,12:06:23
EUR/JPY,119.0000,12:06:00,12:07:33
EUR/JPY,116.0000,12:03:00,12:09:07
Left Table: orders.txt (Currency Pair, Quantity, Event Time, Processing Time)
USD/JPY,1,12:03:00,12:03:44
EUR/JPY,2,12:02:00,12:05:07
EUR/JPY,5,12:05:00,12:08:00
EUR/JPY,3,12:08:00,12:09:33
USD/JPY,5,12:10:00,12:10:59
假设这两个示例都是无界集合的代理(例如,2 个具有键=货币对的 Kafka 主题)。我完全不知道如何使用 Apache Beam API 对这两个(可能无界的)集合进行左连接以生成以下输出。
Output Table With Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,000.0000,12:02:00,False,12:05:07
EUR/JPY,000.0000,12:02:00,True,12:06:23
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,570.0000,12:05:00,False,12:08:00
EUR/JPY,570.0000,12:05:00,True,12:09:07
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
"Final" Output Table Without Retractions: trades.txt (Currency Pair, Price*Quantity, Order Event Time, Retraction?, Trade Processing Time)
USD/JPY,102.0000,12:03:00,False,12:03:44
EUR/JPY,228.0000,12:02:00,False,12:06:23
EUR/JPY,580.0000,12:05:00,False,12:09:07
EUR/JPY,357.0000,12:08:00,False,12:09:33
USD/JPY,510.0000,12:10:00,False,12:10:59
如何使用 Windows、触发器和 CoGroupByKey
实现上述 PTransform
?
当前代码 - 只是一些带占位符的样板文件
"""Testing Apache beam joins."""
import logging
import datetime
import decimal
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Schema and Transforms
class Quote(typing.NamedTuple):
base_currency: str
quote_currency: str
price: decimal.Decimal
time_ms: int
class ConvertQuote(beam.DoFn):
def process(self, element):
pair, price_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
price = decimal.Decimal(price_str)
time_ms = int(self._time_ms(time_str))
yield Quote(base_currency, quote_currency, price, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddQuoteTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
class Order(typing.NamedTuple):
base_currency: str
quote_currency: str
quantity: int
time_ms: int
class ConvertOrder(beam.DoFn):
def process(self, element):
pair, quantity_str, time_str, _ = element.rstrip().split(",")
base_currency, quote_currency = pair.split("/")
quantity = int(quantity_str)
time_ms = int(self._time_ms(time_str))
yield Order(base_currency, quote_currency, quantity, time_ms)
def _time_ms(self, time):
epoch = datetime.datetime.utcfromtimestamp(0)
dt = datetime.datetime.strptime(time, "%H:%M:%S")
return (dt - epoch).total_seconds() * 1000
class AddOrderTimestamp(beam.DoFn):
def process(self, element):
yield beam.window.TimestampedValue(element, element.time_ms)
PAIRS = ["EUR/JPY", "USD/JPY"] # Maybe pass this in as an option?
def by_pair(item, num_pairs):
return PAIRS.index(f"{item.base_currency}/{item.quote_currency}")
# Administrative
LOGGING_MSG_FMT = "%(asctime)s - %(levelname)s: %(message)s"
LOGGING_DATE_FMT = "%Y-%m-%d %H:%M:%S%z"
logging.basicConfig(format=LOGGING_MSG_FMT, datefmt=LOGGING_DATE_FMT, level=logging.INFO)
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--quotes-file", dest="quotes_file", default="quotes.txt")
parser.add_argument("--orders-file", dest="orders_file", default="orders.txt")
parser.add_argument("--trades-file", dest="trades_file", default="trades")
options = PipelineOptions()
my_options = options.view_as(MyOptions)
# Main
with beam.Pipeline(options=my_options) as p:
eurjpy_quotes, usdjpy_quotes = (
p
| "ReadQuotes" >> beam.io.ReadFromText(my_options.quotes_file)
| "ConvertQuotes" >> beam.ParDo(ConvertQuote())
| "AddQuoteTimestamps" >> beam.ParDo(AddQuoteTimestamp())
| "PartitionQuotes" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
eurjpy_orders, usdjpy_orders = (
p
| "ReadOrders" >> beam.io.ReadFromText(my_options.orders_file)
| "ConvertOrders" >> beam.ParDo(ConvertOrder())
| "AddOrderTimestamps" >> beam.ParDo(AddOrderTimestamp())
| "PartitionOrders" >> beam.Partition(by_pair, len(PAIRS))
# Some kind of windowing/triggering?
)
# Something here using CoGroupByKey on eurjpy_quotes and eurjpy_orders
# This is just a placeholder for now.
eurjpy_quotes | "WriteEURJPYTrades" >> beam.io.WriteToText(my_options.trades_file)
为了处理时间序列,通常最好使用 State 和 Timers API。
Original Blog on State and Timers
State and Timers Documentation
java 中还有一些关于临时连接的当前 WIP temporal example