Python:Azure Databricks 中的数据匹配错误
Python: Data match wrongly in Azure Databricks
我正在尝试为 '/dbfs/FileStore/tables/FirstRate30mins'
中的所有文件创建 delta lake,但数据映射错误。
AAL最早正确的数据应该是:
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2013-12-09 08:30:00 22.8049 22.8049 21.7868 22.2016 141800
但是当我display(df_30mins_['AAL'])
的时候,价格输出是错误的。有2005年的数据。
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
当我尝试显示其他数据时,例如display(df_30mins_['A'])
,数据也是一样,映射错误。
A_date/time A_adjOpen A_adjHigh A_adjLow A_adjClose A_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
这里是源代码:
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name+"_date/time", StringType(), True),StructField(name+"_adjOpen", FloatType(), True),StructField(name+"_adjHigh", FloatType(), True),StructField(name+"_adjLow", FloatType(), True),StructField(name+"_adjClose", FloatType(), True),StructField(name+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/")
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name+'_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['A'])
display(df_30mins_['AAPL'])
#display(spark.sql('SELECT * FROM aal_30mins_delta'))
#display(spark.sql('SELECT * FROM a_30mins_delta'))
#display(spark.sql('SELECT * FROM aapl_30mins_delta'))
对不起。错过 +filename
这是正确的代码。
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name+"_date/time", StringType(), True),StructField(name+"_adjOpen", FloatType(), True),StructField(name+"_adjHigh", FloatType(), True),StructField(name+"_adjLow", FloatType(), True),StructField(name+"_adjClose", FloatType(), True),StructField(name+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/"+filename)
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name+'_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['AAPL'])
display(df_30mins_['A'])
display(spark.sql('SELECT * FROM aal_30mins_delta'))
display(spark.sql('SELECT * FROM aapl_30mins_delta'))
display(spark.sql('SELECT * FROM a_30mins_delta'))
我正在尝试为 '/dbfs/FileStore/tables/FirstRate30mins'
中的所有文件创建 delta lake,但数据映射错误。
AAL最早正确的数据应该是:
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2013-12-09 08:30:00 22.8049 22.8049 21.7868 22.2016 141800
但是当我display(df_30mins_['AAL'])
的时候,价格输出是错误的。有2005年的数据。
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
当我尝试显示其他数据时,例如display(df_30mins_['A'])
,数据也是一样,映射错误。
A_date/time A_adjOpen A_adjHigh A_adjLow A_adjClose A_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
这里是源代码:
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name+"_date/time", StringType(), True),StructField(name+"_adjOpen", FloatType(), True),StructField(name+"_adjHigh", FloatType(), True),StructField(name+"_adjLow", FloatType(), True),StructField(name+"_adjClose", FloatType(), True),StructField(name+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/")
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name+'_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['A'])
display(df_30mins_['AAPL'])
#display(spark.sql('SELECT * FROM aal_30mins_delta'))
#display(spark.sql('SELECT * FROM a_30mins_delta'))
#display(spark.sql('SELECT * FROM aapl_30mins_delta'))
对不起。错过 +filename
这是正确的代码。
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name+"_date/time", StringType(), True),StructField(name+"_adjOpen", FloatType(), True),StructField(name+"_adjHigh", FloatType(), True),StructField(name+"_adjLow", FloatType(), True),StructField(name+"_adjClose", FloatType(), True),StructField(name+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/"+filename)
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name+'_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['AAPL'])
display(df_30mins_['A'])
display(spark.sql('SELECT * FROM aal_30mins_delta'))
display(spark.sql('SELECT * FROM aapl_30mins_delta'))
display(spark.sql('SELECT * FROM a_30mins_delta'))