无法从 teradata 的结果集中可靠地写入 avro
Cannot reliably write to avro from resultset of teradata
我在 java 中的代码正在使用结果集读取 teradata 并打印终端中的每一行,但它没有正确写入 avro,即很少有行无故丢失。没有发生行丢失的模式,我在插入 avro 文件时将每种数据类型转换为字符串。我觉得我这边的 Avro 编写过程有错误,但无法弄清楚。
任何帮助表示赞赏。
这是代码。
`
static avroWriter(Schema schema, OutputStream outStream, ResultSet rs) {
final GenericRecord rec = new GenericData.Record(schema)
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema)
final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)
dataFileWriter.create(schema, outStream)
final int nrOfColumns = rs.getMetaData().getColumnCount()
while (rs.next()) {
for (int i = 1; i <= nrOfColumns; i++) {
final Object colValue = rs.getObject(i)
String value
if(colValue == null){
value = "null"
}else{
value = colValue.toString()
}
//println("i :" + i + " value : "+ value)
rec.put(i - 1,value)
}
try {
dataFileWriter.append(rec)
println(rec)
}
catch (IOException e) {
log.error("Record :{} couldn't be read properly", rec, e)
}
}
}
`
终于解决了。每次写记录时刷新数据对我来说似乎很好。
dataFileWriter.append(rec)
dataFileWriter.flush()
我在 java 中的代码正在使用结果集读取 teradata 并打印终端中的每一行,但它没有正确写入 avro,即很少有行无故丢失。没有发生行丢失的模式,我在插入 avro 文件时将每种数据类型转换为字符串。我觉得我这边的 Avro 编写过程有错误,但无法弄清楚。 任何帮助表示赞赏。 这是代码。
`
static avroWriter(Schema schema, OutputStream outStream, ResultSet rs) {
final GenericRecord rec = new GenericData.Record(schema)
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema)
final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)
dataFileWriter.create(schema, outStream)
final int nrOfColumns = rs.getMetaData().getColumnCount()
while (rs.next()) {
for (int i = 1; i <= nrOfColumns; i++) {
final Object colValue = rs.getObject(i)
String value
if(colValue == null){
value = "null"
}else{
value = colValue.toString()
}
//println("i :" + i + " value : "+ value)
rec.put(i - 1,value)
}
try {
dataFileWriter.append(rec)
println(rec)
}
catch (IOException e) {
log.error("Record :{} couldn't be read properly", rec, e)
}
}
}
`
终于解决了。每次写记录时刷新数据对我来说似乎很好。
dataFileWriter.append(rec)
dataFileWriter.flush()