在 BigQuery 中加载 avro 文件 - 默认值的意外类型。预期为空,但找到了字符串:"null"
Load a avro file in BigQuery - Unexpected type for default value. Expected null, but found string: "null"
我需要将此查询的结果传输到 BigQuery,如您所见,我解码了在 Cloud Storage 中获得的数据,我创建了一个 avro 文件以将其加载到 BigQuery table 但我收到此错误:
BadRequest Traceback (most recent call last)
<ipython-input-8-78860f4800c4> in <module>
110 bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'
111
--> 112 insert_bigquery_avro(bucket_name1, dataset1, tabela1)
<ipython-input-8-78860f4800c4> in insert_bigquery_avro(target_uri, dataset_id, table_id)
103 )
104 print('Starting job {}'.format(load_job.job_id))
--> 105 load_job.result()
106 print('Job finished.')
107
c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\cloud\bigquery\job.py in result(self, timeout)
695 self._begin()
696 # TODO: modify PollingFuture so it can pass a retry argument to done().
--> 697 return super(_AsyncJob, self).result(timeout=timeout)
698
699 def cancelled(self):
c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\api_core\future\polling.py in result(self, timeout)
125 # pylint: disable=raising-bad-type
126 # Pylint doesn't recognize that this is valid in this case.
--> 127 raise self._exception
128
129 return self._result
BadRequest: 400 Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Unexpected type for default value. Expected null, but found string: "null"
这是脚本流程:
import csv
import base64
import json
import io
import avro.schema
import avro.io
from avro.datafile import DataFileReader, DataFileWriter
import math
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from datetime import datetime, timedelta
import numpy as np
try:
script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
script_path = "C:\Users\me\Documents\Keys\key.json"
#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path
folder = str((datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'))
bucket_name = 'gs://new_bucket/table/*.csv'
dataset = 'dataset'
tabela = 'table'
schema = avro.schema.Parse(open("C:\Users\me\schema_table.avsc", "rb").read())
writer = DataFileWriter(open("C:\Users\me\table_register.avro", "wb"), avro.io.DatumWriter(), schema)
def insert_bigquery(target_uri, dataset_id, table_id):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField('id','STRING',mode='REQUIRED')
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.field_delimiter = ";"
uri = target_uri
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table(table_id),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
#insert_bigquery(bucket_name, dataset, tabela)
def get_data_from_bigquery():
"""query bigquery to get data to import to PSQL"""
bq = bigquery.Client()
#Busca IDs
query = """SELECT id FROM dataset.base64_data"""
query_job = bq.query(query)
data = query_job.result()
rows = list(data)
return rows
a = get_data_from_bigquery()
length = len(a)
line_count = 0
for row in range(length):
bytes = base64.b64decode(str(a[row][0]))
bytes = bytes[5:]
buf = io.BytesIO(bytes)
decoder = avro.io.BinaryDecoder(buf)
rec_reader = avro.io.DatumReader(avro.schema.Parse(open("C:\Users\me\schema_table.avsc").read()))
out=rec_reader.read(decoder)
writer.append(out)
writer.close()
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob("insert_transfer/" + destination_blob_name)
blob.upload_from_filename(source_file_name)
print('File {} uploaded to {}'.format(
source_file_name,
destination_blob_name
))
upload_blob('new_bucket', 'C:\Users\me\table_register.avro', 'table_register.avro')
def insert_bigquery_avro(target_uri, dataset_id, table_id):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.AVRO
time_partitioning = bigquery.table.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="date")
job_config.time_partitioning = time_partitioning
uri = target_uri
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table(table_id),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
dataset1 = 'dataset'
tabela1 = 'table'
bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'
insert_bigquery_avro(bucket_name1, dataset1, tabela1)
我在 Cloud Storage 中收到这样的 CSV 文件:
这个脚本像这样解码寄存器:
我想创建一个例程,将解码后的信息放入 BigQuery。
架构文件:
{
"namespace": "transfers",
"type": "record",
"name": "Transfer",
"doc": "Represents the The transfer request",
"fields": [
{
"name": "id",
"type": "string",
"doc": "the transfer request id"
},
{
"name": "date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "the date where the transaction happend"
},
{
"name": "merchant",
"type": "string",
"doc": "the merchant who owns the payment"
},
{
"name": "amount",
"type": ["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 2
}],
"default": "null",
"doc": "the foreign amount for the payment"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "transfer_status",
"symbols": [
"RECEIVED",
"WAITING_TRANSFER",
"ON_PROCESSING",
"EXECUTED",
"DENIED"
]
},
"default": "DENIED"
},
{
"name": "correlation_id",
"type": ["null", "string"],
"default": "null",
"doc": "the correlation id of the request"
},
{
"name": "transfer_period",
"type": ["null", "string"],
"default": "null",
"doc": "The transfer period spec"
},
{
"name": "payments",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "events",
"type": {
"type": "array",
"items": {
"name": "event",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "type",
"type": {
"type": "enum",
"name": "event_type",
"symbols": [
"REQUEST",
"VALIDATION",
"TRANSFER_SCHEDULE",
"TRANSFERENCE"
]
}
},
{
"name": "amount",
"type": ["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 2
}],
"doc": "the original currency amount",
"default": "null"
},
{
"name": "date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "the moment where this request was received by the platform"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "event_status",
"symbols": [
"SUCCESS",
"DENIED",
"ERROR",
"TIMEOUT",
"PENDING"
]
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "internal_metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "error",
"type": {
"type": "record",
"name": "Error",
"fields": [
{
"name": "code",
"type": ["null", "string"],
"default": "null"
},
{
"name": "message",
"type": ["null", "string"],
"default": "null"
}
]
}
},
{
"name": "message",
"type": ["null", "string"],
"default": "null"
}
]
}
}
}
]
}
尝试将 "default"
值从 "null"
更改为 null
。
我需要将此查询的结果传输到 BigQuery,如您所见,我解码了在 Cloud Storage 中获得的数据,我创建了一个 avro 文件以将其加载到 BigQuery table 但我收到此错误:
BadRequest Traceback (most recent call last)
<ipython-input-8-78860f4800c4> in <module>
110 bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'
111
--> 112 insert_bigquery_avro(bucket_name1, dataset1, tabela1)
<ipython-input-8-78860f4800c4> in insert_bigquery_avro(target_uri, dataset_id, table_id)
103 )
104 print('Starting job {}'.format(load_job.job_id))
--> 105 load_job.result()
106 print('Job finished.')
107
c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\cloud\bigquery\job.py in result(self, timeout)
695 self._begin()
696 # TODO: modify PollingFuture so it can pass a retry argument to done().
--> 697 return super(_AsyncJob, self).result(timeout=timeout)
698
699 def cancelled(self):
c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\api_core\future\polling.py in result(self, timeout)
125 # pylint: disable=raising-bad-type
126 # Pylint doesn't recognize that this is valid in this case.
--> 127 raise self._exception
128
129 return self._result
BadRequest: 400 Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Unexpected type for default value. Expected null, but found string: "null"
这是脚本流程:
import csv
import base64
import json
import io
import avro.schema
import avro.io
from avro.datafile import DataFileReader, DataFileWriter
import math
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from datetime import datetime, timedelta
import numpy as np
try:
script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
script_path = "C:\Users\me\Documents\Keys\key.json"
#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path
folder = str((datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'))
bucket_name = 'gs://new_bucket/table/*.csv'
dataset = 'dataset'
tabela = 'table'
schema = avro.schema.Parse(open("C:\Users\me\schema_table.avsc", "rb").read())
writer = DataFileWriter(open("C:\Users\me\table_register.avro", "wb"), avro.io.DatumWriter(), schema)
def insert_bigquery(target_uri, dataset_id, table_id):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField('id','STRING',mode='REQUIRED')
]
job_config.source_format = bigquery.SourceFormat.CSV
job_config.field_delimiter = ";"
uri = target_uri
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table(table_id),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
#insert_bigquery(bucket_name, dataset, tabela)
def get_data_from_bigquery():
"""query bigquery to get data to import to PSQL"""
bq = bigquery.Client()
#Busca IDs
query = """SELECT id FROM dataset.base64_data"""
query_job = bq.query(query)
data = query_job.result()
rows = list(data)
return rows
a = get_data_from_bigquery()
length = len(a)
line_count = 0
for row in range(length):
bytes = base64.b64decode(str(a[row][0]))
bytes = bytes[5:]
buf = io.BytesIO(bytes)
decoder = avro.io.BinaryDecoder(buf)
rec_reader = avro.io.DatumReader(avro.schema.Parse(open("C:\Users\me\schema_table.avsc").read()))
out=rec_reader.read(decoder)
writer.append(out)
writer.close()
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob("insert_transfer/" + destination_blob_name)
blob.upload_from_filename(source_file_name)
print('File {} uploaded to {}'.format(
source_file_name,
destination_blob_name
))
upload_blob('new_bucket', 'C:\Users\me\table_register.avro', 'table_register.avro')
def insert_bigquery_avro(target_uri, dataset_id, table_id):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.source_format = bigquery.SourceFormat.AVRO
time_partitioning = bigquery.table.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="date")
job_config.time_partitioning = time_partitioning
uri = target_uri
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table(table_id),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')
dataset1 = 'dataset'
tabela1 = 'table'
bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'
insert_bigquery_avro(bucket_name1, dataset1, tabela1)
我在 Cloud Storage 中收到这样的 CSV 文件:
这个脚本像这样解码寄存器:
我想创建一个例程,将解码后的信息放入 BigQuery。
架构文件:
{
"namespace": "transfers",
"type": "record",
"name": "Transfer",
"doc": "Represents the The transfer request",
"fields": [
{
"name": "id",
"type": "string",
"doc": "the transfer request id"
},
{
"name": "date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "the date where the transaction happend"
},
{
"name": "merchant",
"type": "string",
"doc": "the merchant who owns the payment"
},
{
"name": "amount",
"type": ["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 2
}],
"default": "null",
"doc": "the foreign amount for the payment"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "transfer_status",
"symbols": [
"RECEIVED",
"WAITING_TRANSFER",
"ON_PROCESSING",
"EXECUTED",
"DENIED"
]
},
"default": "DENIED"
},
{
"name": "correlation_id",
"type": ["null", "string"],
"default": "null",
"doc": "the correlation id of the request"
},
{
"name": "transfer_period",
"type": ["null", "string"],
"default": "null",
"doc": "The transfer period spec"
},
{
"name": "payments",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "events",
"type": {
"type": "array",
"items": {
"name": "event",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "type",
"type": {
"type": "enum",
"name": "event_type",
"symbols": [
"REQUEST",
"VALIDATION",
"TRANSFER_SCHEDULE",
"TRANSFERENCE"
]
}
},
{
"name": "amount",
"type": ["null", {
"type": "bytes",
"logicalType": "decimal",
"precision": 4,
"scale": 2
}],
"doc": "the original currency amount",
"default": "null"
},
{
"name": "date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "the moment where this request was received by the platform"
},
{
"name": "status",
"type": {
"type": "enum",
"name": "event_status",
"symbols": [
"SUCCESS",
"DENIED",
"ERROR",
"TIMEOUT",
"PENDING"
]
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "internal_metadata",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "error",
"type": {
"type": "record",
"name": "Error",
"fields": [
{
"name": "code",
"type": ["null", "string"],
"default": "null"
},
{
"name": "message",
"type": ["null", "string"],
"default": "null"
}
]
}
},
{
"name": "message",
"type": ["null", "string"],
"default": "null"
}
]
}
}
}
]
}
尝试将 "default"
值从 "null"
更改为 null
。