从 TimezoneFinder() 创建新的 "timezone" 列,并将经度和纬度列作为 PySpark 中的输入

Create new "timezone" column from TimezoneFinder() with longitude and latitude columns as input in PySpark

我想创建一个新列,其中包含等效经度和纬度的时区。来自现有列的经度和纬度是 timezonefinder 函数的输入,即 get_timezone()。我不断收到 TypeError: an integer is required (got type Column)

谢谢。

from timezonefinder import TimezoneFinder

def get_timezone(longitude, latitude):
    tzf = TimezoneFinder()
    return tzf.timezone_at(lng=longitude, lat=latitude)

location_table = location_table.withColumn("timezone", get_timezone(location_table["location_longitude"], location_table["location_latitude"]))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<command-253463262459944> in <module>
      8 
      9 # df = sqlContext.read.parquet(INPUT)
---> 10 location_table.withColumn("timezone", get_timezone(location_table["location_longitude"].cast(IntegerType()), location_table["location_latitude"].cast(IntegerType())))
     11 #   .write.parquet(OUTPUT)

<command-253463262459944> in get_timezone(longitude, latitude)
      3 def get_timezone(longitude, latitude):
      4     tzf = TimezoneFinder()
----> 5     return tzf.timezone_at(lng=longitude, lat=latitude)
      6 
      7 # udf_timezone = F.udf(get_timezone, StringType())

/databricks/python/lib/python3.7/site-packages/timezonefinder/timezonefinder.py in timezone_at(self, lng, lat)
    657         :return: the timezone name of the matched timezone polygon. possibly "Etc/GMT+-XX" in case of an ocean timezone.
    658         """
--> 659         lng, lat = rectify_coordinates(lng, lat)
    660 
    661         shortcut_id_x, shortcut_id_y = coord2shortcut(lng, lat)

TypeError: an integer is required (got type Column)

您需要先将函数转换为UDF:

import pyspark.sql.functions as F
from timezonefinder import TimezoneFinder

@F.udf('string')
def get_timezone(longitude, latitude):
    if longitude is None or latitude is None:
        return None
    tzf = TimezoneFinder()
    return tzf.timezone_at(lng=longitude, lat=latitude)

location_table = location_table.withColumn("timezone", get_timezone(location_table["location_longitude"], location_table["location_latitude"]))