如何在 pyspark 数据帧上使用 forEachPartition?

How to use forEachPartition on pyspark dataframe?

我正在尝试在具有 8 个分区的 RDD 上使用 pyspark 来使用 forEachPartition() 方法。我的自定义函数尝试为给定的字符串输入生成字符串输出。这是代码

from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import pandas as pd
import datetime

def compute_sentiment_score(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.score)

def compute_sentiment_magnitude(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.magnitude)

import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path-to-file.json"

imdb_reviews = pd.read_csv('imdb_reviews.csv', header=None, names=['input1', 'input2'], encoding= "ISO-8859-1")

imdb_reviews.head()

    input1                                         input2
0   first think another Disney movie, might good, ...   1
1   Put aside Dr. House repeat missed, Desperate H...   0
2   big fan Stephen King's work, film made even gr...   1
3   watched horrid thing TV. Needless say one movi...   0
4   truly enjoyed film. acting terrific plot. Jeff...   1


spark_imdb_reviews = spark.createDataFrame(imdb_reviews) # create spark dataframe


spark_imdb_reviews.printSchema()
root
 |-- input1: string (nullable = true)
 |-- input2: long (nullable = true)

这是我的自定义函数 -

def compute_sentiment_score(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.score)

def compute_sentiment_magnitude(text):
    client = language.LanguageServiceClient()
    document = types.Document(content=text,type=enums.Document.Type.PLAIN_TEXT, language='en')
    sentiment = client.analyze_sentiment(document=document).document_sentiment
    return str(sentiment.magnitude)

以下是我尝试使用 forEachPartition() 方法的方法 -

create_rdd = spark_imdb_reviews.select("input1").rdd # create RDD
print(create_rdd.getNumPartitions()) # print the partitions
print(create_rdd.take(1)) # display data
new_rdd = create_rdd.foreachPartition(compute_sentiment_score) # compute score

这给出了这个输出和一个错误 -

8
[Row(input1="first think another Disney movie, might good, it's kids movie. watch it, can't help enjoy it. ages love movie. first saw movie 10 8 years later still love it! Danny Glover superb could play part better. Christopher Lloyd hilarious perfect part. Tony Danza believable Mel Clark. can't help, enjoy movie! give 10/10!")]

File "<ipython-input-106-e3fd65ce75cc>", line 3, in compute_sentiment_score
TypeError: <itertools.chain object at 0x11ab7f198> has type itertools.chain, but expected one of: bytes, unicode

有两个相似的函数:

两个函数都需要另一个函数作为参数(此处 compute_sentiment_score)。此函数获取以迭代器形式传递的分区的内容。问题中的text参数其实是一个迭代器,可以在compute_sentiment_score.

内部使用

foreachPartitionmapPartition的区别在于foreachPartition是一个Spark动作而mapPartition是一个转换。这意味着 foreachPartition 调用的代码会立即执行并且 RDD 保持不变,而 mapPartition 可用于创建新的 RDD。为了存储计算出的情绪分数,应使用 mapPartitions

def compute_sentiment_score(itr_text):
    #setup the things that are expensive and should be prepared only once per partition
    client = language.LanguageServiceClient()
    
    #run the loop for each row of the partition
    for text in itr_text:
        document = types.Document(content=text.value,type=enums.Document.Type.PLAIN_TEXT, language='en')
        sentiment = client.analyze_sentiment(document=document).document_sentiment
        yield (text.value, sentiment.score)

df_with_score = df.rdd.mapPartitions(compute_sentiment_score)
df_with_score.foreach(print)

在此示例中,每个分区调用一次 client = language.LanguageServiceClient()。可能必须减少分区的数量,例如 coalesce.