使用数据流将数据插入 mysql
Insert data into mysql using dataflow
以下代码构建管道并生成 DAG。
RuntimeError: NotImplementedError [while 运行 'generatedPtransform-438']请让我知道在 python 中是否有用于 beam 的 mysql 的直接连接器。
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import mysql.connector
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="12344"
TOPIC = "projects/12344/topics/mytopic"
class insertfn(beam.Dofn):
def insertdata(self,data):
db_conn=mysql.connector.connect(host="localhost",user="abc",passwd="root",database="new")
db_cursor=db_conn.cursor()
emp_sql = " INSERT INTO emp(ename,eid,dept) VALUES (%s,%s,%s)"
db_cusror.executemany(emp_sql,(data[0],data[1],data[2]))
db_conn.commit()
print(db_cursor.rowcount,"record inserted")
class Split(beam.DoFn):
def process(self, data):
data = data.split(",")
return [{
'ename': data[0],
'eid': data[1],
'dept': data[2]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToMySQL' >> beam.ParDo(insertfn())
)
result = p.run()
result.wait_until_finish()
经过我们在评论部分的讨论,我注意到您没有使用正确的命令来执行 DataFlow 管道。
根据documentation, there are mandatory flags which must be defined in order to run the pipeline in Dataflow Managed Service。这些标志如下所述:
job_name - The name of the Dataflow job being executed.
project - The ID of your Google Cloud project. runner - The pipeline
runner - that will parse your program and construct your pipeline. For
cloud execution, this must be DataflowRunner.
staging_location - A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.
temp_location - A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
除了这些标志之外,您还可以使用其他标志,在您的情况下,因为您使用的是 PubSub 主题:
- --input_topic: sets the input Pub/Sub topic to read messages from.
因此,运行 数据流管道的示例如下:
python RunPipelineDataflow.py \
--job_name=jobName\
--project=$PROJECT_NAME \
--runner=DataflowRunner \
--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY\
--temp_location=gs://$BUCKET_NAME/temp
--input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \
我想指出使用 DataflowRunner, it allows you to use the Cloud Dataflow managed service, providing a fully managed service, autoscaling and dynamic work rebalancing. However, it is also possible to use DirectRunner 在您的机器上执行管道的重要性,它旨在验证管道。
以下代码构建管道并生成 DAG。 RuntimeError: NotImplementedError [while 运行 'generatedPtransform-438']请让我知道在 python 中是否有用于 beam 的 mysql 的直接连接器。
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import mysql.connector
import apache_beam as beam
import logging
import argparse
import sys
import re
PROJECT="12344"
TOPIC = "projects/12344/topics/mytopic"
class insertfn(beam.Dofn):
def insertdata(self,data):
db_conn=mysql.connector.connect(host="localhost",user="abc",passwd="root",database="new")
db_cursor=db_conn.cursor()
emp_sql = " INSERT INTO emp(ename,eid,dept) VALUES (%s,%s,%s)"
db_cusror.executemany(emp_sql,(data[0],data[1],data[2]))
db_conn.commit()
print(db_cursor.rowcount,"record inserted")
class Split(beam.DoFn):
def process(self, data):
data = data.split(",")
return [{
'ename': data[0],
'eid': data[1],
'dept': data[2]
}]
def main(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToMySQL' >> beam.ParDo(insertfn())
)
result = p.run()
result.wait_until_finish()
经过我们在评论部分的讨论,我注意到您没有使用正确的命令来执行 DataFlow 管道。
根据documentation, there are mandatory flags which must be defined in order to run the pipeline in Dataflow Managed Service。这些标志如下所述:
job_name - The name of the Dataflow job being executed.
project - The ID of your Google Cloud project. runner - The pipeline
runner - that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.
staging_location - A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.
temp_location - A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
除了这些标志之外,您还可以使用其他标志,在您的情况下,因为您使用的是 PubSub 主题:
- --input_topic: sets the input Pub/Sub topic to read messages from.
因此,运行 数据流管道的示例如下:
python RunPipelineDataflow.py \
--job_name=jobName\
--project=$PROJECT_NAME \
--runner=DataflowRunner \
--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY\
--temp_location=gs://$BUCKET_NAME/temp
--input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \
我想指出使用 DataflowRunner, it allows you to use the Cloud Dataflow managed service, providing a fully managed service, autoscaling and dynamic work rebalancing. However, it is also possible to use DirectRunner 在您的机器上执行管道的重要性,它旨在验证管道。