使用数据流将数据插入 mysql

Insert data into mysql using dataflow

以下代码构建管道并生成 DAG。 RuntimeError: NotImplementedError [while 运行 'generatedPtransform-438']请让我知道在 python 中是否有用于 beam 的 mysql 的直接连接器。

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import  mysql.connector
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="12344"
TOPIC = "projects/12344/topics/mytopic"    

class insertfn(beam.Dofn):
def insertdata(self,data):
db_conn=mysql.connector.connect(host="localhost",user="abc",passwd="root",database="new")
db_cursor=db_conn.cursor()
emp_sql = " INSERT INTO emp(ename,eid,dept) VALUES (%s,%s,%s)"
db_cusror.executemany(emp_sql,(data[0],data[1],data[2]))
db_conn.commit()
print(db_cursor.rowcount,"record inserted")
class Split(beam.DoFn):
def process(self, data):
data = data.split(",")               
return [{ 
'ename': data[0],
'eid': data[1],
'dept': data[2]         
}]

def main(argv=None):

parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)


p = beam.Pipeline(options=PipelineOptions())

(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToMySQL' >> beam.ParDo(insertfn())
)
result = p.run()
result.wait_until_finish()

经过我们在评论部分的讨论,我注意到您没有使用正确的命令来执行 DataFlow 管道。

根据documentation, there are mandatory flags which must be defined in order to run the pipeline in Dataflow Managed Service。这些标志如下所述:

  • job_name - The name of the Dataflow job being executed.

  • project - The ID of your Google Cloud project. runner - The pipeline

  • runner - that will parse your program and construct your pipeline. For cloud execution, this must be DataflowRunner.

  • staging_location - A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.

  • temp_location - A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.

除了这些标志之外,您还可以使用其他标志,在您的情况下,因为您使用的是 PubSub 主题:

  • --input_topic: sets the input Pub/Sub topic to read messages from.

因此,运行 数据流管道的示例如下:

  python RunPipelineDataflow.py \
  --job_name=jobName\ 
  --project=$PROJECT_NAME \
  --runner=DataflowRunner \
  --staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY\
  --temp_location=gs://$BUCKET_NAME/temp
  --input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \

我想指出使用 DataflowRunner, it allows you to use the Cloud Dataflow managed service, providing a fully managed service, autoscaling and dynamic work rebalancing. However, it is also possible to use DirectRunner 在您的机器上执行管道的重要性,它旨在验证管道。