如何从 F# 记录中获取 Avro 序列化消息?

How to get Avro serialised message from a F# record?

我在 F# 中有以下内容:

type HitType = {
  hitHostname                                   : string;
  hitMemoryUsed                                 : float;
  hitPage                                       : string;
}

我尝试在 Avro 中序列化它:

  let specificDatumWriter : SpecificDatumWriter<HitType> =
    SpecificDatumWriter(avroSchema)

  let getAvroMsg msg =
    let memStream =
      new MemoryStream(256)
    let encoder =
      BinaryEncoder(memStream)
    specificDatumWriter.Write(msg, encoder)
    encoder.Flush()
    memStream.ToArray()

当我尝试通过以下方式获取 Avro 字节时:

let hit : HitType = { hitHostname = "abc"; hitMemoryUsed = 0.123; hitPage = "None" }
getAvroMsg hit

我得到以下异常:

Unable to cast object of type 'TrckFsharp.HitType' to type 'Avro.Specific.ISpecificRecord'.: InvalidCastException
   at Avro.Specific.SpecificDatumWriter`1.WriteRecordFields(Object recordObj, RecordFieldWriter[] writers, Encoder encoder)
   at TrckFsharp.Avro.getAvroMsg(HitType msg) in project-dev/Avro.fs:line 40
   at TrckFsharp.Handler.trckGet(APIGatewayProxyRequest request) in project/Handler.fs:line 77
   at TrckFsharp.Handler.handler(APIGatewayProxyRequest request) in project/Handler.fs:line 137
   at lambda_method(Closure , Stream , Stream , LambdaContextInternal )

不知道为什么。这不是用 Avro 生成消息的方式吗?

更新 1:

Avro 架构:

{
  "namespace": "com.lambdainsight.hit",
  "type": "record",
  "name": "Hit",
  "fields": [
     {"name": "hitHostname",                                     "type": "string" },
     {"name": "hitMemoryUsed",                                   "type": "float"  },
     {"name": "hitPage",                                         "type": "string" }
  ]
 }

我已将所有内容上传到一个存储库中:

https://github.com/LambdaInsight/avro-test/

A​​vro SDK 希望您的 HitType 需要实现 ISpecificRecord,这很不幸,因为最好使用记录类型。

现在你可以用这个 HitType:

type HitType() =
    let schema = Schema.Parse("""{
  "namespace": "com.lambdainsight.hit",
  "type": "record",
  "name": "Hit",
  "fields": [
     {"name": "hitHostname",   "type": "string" },
     {"name": "hitMemoryUsed", "type": "float"  },
     {"name": "hitPage",       "type": "string" }
  ]
 }""")

    member val HitHostname = "" with get, set
    member val HitMemoryUsed = 0.f with get, set
    member val HitPage = "" with get, set

    interface ISpecificRecord with
        member this.Schema with get() = schema
        member this.Get(fieldPos : int) = 
            match fieldPos with
            | 0 -> this.HitHostname :> obj
            | 1 -> this.HitMemoryUsed :> obj
            | 2 -> this.HitPage :> obj
            | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Get()" fieldPos))
        member this.Put(fieldPos : int, fieldValue : obj) = 
            match fieldPos with
            | 0 -> this.HitHostname <- string fieldValue
            | 1 -> this.HitMemoryUsed <- fieldValue :?> single
            | 2 -> this.HitPage <- string fieldValue
            | _ -> raise (AvroRuntimeException(sprintf "Bad index %i in Put()" fieldPos))