将 Pandas 数据帧转换为 Spark 数据帧错误
Converting Pandas dataframe into Spark dataframe error
我正在尝试将 Pandas DF 转换为 Spark One。
DF头:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
代码:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
我得到一个错误:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
我已经用你的数据试过了,它正在工作:
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
您需要确保您的 pandas 数据框列适合 spark 正在推断的类型。如果您的 pandas 数据框列出如下内容:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
你遇到了这个错误尝试:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
现在,确保 .astype(str)
确实是您希望这些列成为的类型。基本上,当底层 Java 代码尝试从 python 中的对象推断类型时,它会使用一些观察结果并进行猜测,如果该猜测不适用于列中的所有数据(s ) 它正在尝试从 pandas 转换为 spark 它将失败。
可以通过强加一个模式来避免与类型相关的错误,如下所示:
注意:创建了一个文本文件(test.csv),其中包含原始数据(如上)和假设列已插入姓名 ("col1","col2",...,"col25").
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
pandas 数据框的内容:
col1 col2 col3 col4 col5 col6 col7 col8 ...
0 10000001 1 0 1 12:35 OK 10002 1 ...
1 10000001 2 0 1 12:36 OK 10002 1 ...
2 10000002 1 0 4 12:19 PA 10003 1 ...
接下来,创建架构:
from pyspark.sql.types import *
mySchema = StructType([ StructField("col1", LongType(), True)\
,StructField("col2", IntegerType(), True)\
,StructField("col3", IntegerType(), True)\
,StructField("col4", IntegerType(), True)\
,StructField("col5", StringType(), True)\
,StructField("col6", StringType(), True)\
,StructField("col7", IntegerType(), True)\
,StructField("col8", IntegerType(), True)\
,StructField("col9", IntegerType(), True)\
,StructField("col10", IntegerType(), True)\
,StructField("col11", StringType(), True)\
,StructField("col12", StringType(), True)\
,StructField("col13", IntegerType(), True)\
,StructField("col14", IntegerType(), True)\
,StructField("col15", IntegerType(), True)\
,StructField("col16", IntegerType(), True)\
,StructField("col17", IntegerType(), True)\
,StructField("col18", IntegerType(), True)\
,StructField("col19", IntegerType(), True)\
,StructField("col20", IntegerType(), True)\
,StructField("col21", IntegerType(), True)\
,StructField("col22", IntegerType(), True)\
,StructField("col23", IntegerType(), True)\
,StructField("col24", IntegerType(), True)\
,StructField("col25", IntegerType(), True)])
注意:True
(表示允许为空)
创建 pyspark 数据框:
df = spark.createDataFrame(pdDF,schema=mySchema)
确认 pandas 数据框现在是 pyspark 数据框:
type(df)
输出:
pyspark.sql.dataframe.DataFrame
旁白:
要解决 Kate 在下面的评论 - 要强加通用(字符串)架构,您可以执行以下操作:
df=spark.createDataFrame(pdDF.astype(str))
我曾经收到过类似的错误消息,在我的例子中是因为我的 pandas 数据帧包含 NULL。我建议在转换为 spark 之前尝试在 pandas 中处理这个问题(这解决了我的问题)。
我制作了这个脚本,它适用于我的 10 个 pandas 数据帧
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return DoubleType()
elif f == 'float32': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
你也可以在这个gist
中看到
有了这个你只需要调用 spark_df = pandas_to_spark(pandas_df)
在 spark 版本 >= 3 中,您可以在一行中将 pandas 数据帧转换为 pyspark 数据帧
use spark.createDataFrame(pandasDF)
dataset = pd.read_csv("data/AS/test_v2.csv")
sparkDf = spark.createDataFrame(dataset);
如果您对 spark 会话变量感到困惑,
spark session如下
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession \
.builder \
.getOrCreate()
我正在尝试将 Pandas DF 转换为 Spark One。 DF头:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
代码:
dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)
我得到一个错误:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
我已经用你的数据试过了,它正在工作:
%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()
您需要确保您的 pandas 数据框列适合 spark 正在推断的类型。如果您的 pandas 数据框列出如下内容:
pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol 5062 non-null object
Col2 5062 non-null object
你遇到了这个错误尝试:
df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)
现在,确保 .astype(str)
确实是您希望这些列成为的类型。基本上,当底层 Java 代码尝试从 python 中的对象推断类型时,它会使用一些观察结果并进行猜测,如果该猜测不适用于列中的所有数据(s ) 它正在尝试从 pandas 转换为 spark 它将失败。
可以通过强加一个模式来避免与类型相关的错误,如下所示:
注意:创建了一个文本文件(test.csv),其中包含原始数据(如上)和假设列已插入姓名 ("col1","col2",...,"col25").
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
pdDF = pd.read_csv("test.csv")
pandas 数据框的内容:
col1 col2 col3 col4 col5 col6 col7 col8 ...
0 10000001 1 0 1 12:35 OK 10002 1 ...
1 10000001 2 0 1 12:36 OK 10002 1 ...
2 10000002 1 0 4 12:19 PA 10003 1 ...
接下来,创建架构:
from pyspark.sql.types import *
mySchema = StructType([ StructField("col1", LongType(), True)\
,StructField("col2", IntegerType(), True)\
,StructField("col3", IntegerType(), True)\
,StructField("col4", IntegerType(), True)\
,StructField("col5", StringType(), True)\
,StructField("col6", StringType(), True)\
,StructField("col7", IntegerType(), True)\
,StructField("col8", IntegerType(), True)\
,StructField("col9", IntegerType(), True)\
,StructField("col10", IntegerType(), True)\
,StructField("col11", StringType(), True)\
,StructField("col12", StringType(), True)\
,StructField("col13", IntegerType(), True)\
,StructField("col14", IntegerType(), True)\
,StructField("col15", IntegerType(), True)\
,StructField("col16", IntegerType(), True)\
,StructField("col17", IntegerType(), True)\
,StructField("col18", IntegerType(), True)\
,StructField("col19", IntegerType(), True)\
,StructField("col20", IntegerType(), True)\
,StructField("col21", IntegerType(), True)\
,StructField("col22", IntegerType(), True)\
,StructField("col23", IntegerType(), True)\
,StructField("col24", IntegerType(), True)\
,StructField("col25", IntegerType(), True)])
注意:True
(表示允许为空)
创建 pyspark 数据框:
df = spark.createDataFrame(pdDF,schema=mySchema)
确认 pandas 数据框现在是 pyspark 数据框:
type(df)
输出:
pyspark.sql.dataframe.DataFrame
旁白:
要解决 Kate 在下面的评论 - 要强加通用(字符串)架构,您可以执行以下操作:
df=spark.createDataFrame(pdDF.astype(str))
我曾经收到过类似的错误消息,在我的例子中是因为我的 pandas 数据帧包含 NULL。我建议在转换为 spark 之前尝试在 pandas 中处理这个问题(这解决了我的问题)。
我制作了这个脚本,它适用于我的 10 个 pandas 数据帧
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return DoubleType()
elif f == 'float32': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
你也可以在这个gist
中看到有了这个你只需要调用 spark_df = pandas_to_spark(pandas_df)
在 spark 版本 >= 3 中,您可以在一行中将 pandas 数据帧转换为 pyspark 数据帧
use spark.createDataFrame(pandasDF)
dataset = pd.read_csv("data/AS/test_v2.csv")
sparkDf = spark.createDataFrame(dataset);
如果您对 spark 会话变量感到困惑, spark session如下
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession \
.builder \
.getOrCreate()