将命令行参数传递给 Presto Query

Passing command line argument to Presto Query

我是 python 的新手。我想将命令行参数传递给函数内部的 presto 查询,然后将结果写入 CSV 文件。但是当我尝试在终端上 运行 它时,它说 'Traceback (most recent call last): File "function2.py", line 3, in <module> from pyhive import presto ModuleNotFoundError: No module named 'pyhive'

pyhive需求已经满足。请找到我附上的代码:

from sys import argv
import argparse
from pyhive import presto
import prestodb
import csv
import sys


import pandas as pd

connection = presto.connect(host='xyz',port=8889,username='test')
cur = connection.cursor()
print('Connection Established')


def func1(object,start,end):
    object = argv[1]
    start = argv[2]
    end = argv[3]
    result = cur.execute("""
    with map_date as 
    (
     SELECT 
     object, 
     epoch,
     timestamp,
     date,
     map_agg(name, value) as map_values
    from hive.schema.test1
    where object = '${object}' 
    and (epoch >= '${start}' and epoch <= '${end}')
    and name in ('x','y')
    GROUP BY object,epoch,timestamp,date
    order by timestamp asc
    )
    SELECT
      epoch
    , timestamp
    , CASE WHEN element_at(map_values, 'x') IS NOT NULL THEN map_values['x'] ELSE NULL END AS x
    , CASE WHEN element_at(map_values, 'y') IS NOT NULL THEN map_values['y'] ELSE NULL END AS y
    , object
    , date AS date
    from map_date
    """)
rows = cur.fetchall()
print('Query Finished')     #Returns the list with one entry for each record
fp = open('/Users/xyz/Desktop/Python/function.csv', 'w')
print('File Created')
myFile = csv.writer(fp)
colnames = [desc[0] for desc in cur.description]     #store the headers in variable called 'colnames'
myFile.writerow(colnames)    #write the header to the file
myFile.writerows(rows)
fp.close()

func1(object,start,end)

cur.close()
connection.close()

如何将命令行参数传递给在函数内编写的 Presto 查询? 任何帮助深表感谢。提前致谢!

我只描述如何将命令行参数传递给函数和查询。


如果定义函数

def func1(object, start, end):
    # code

然后你必须将值作为变量发送,你必须在函数外使用 sys.argv

connection = presto.connect(host='xyz', port=8889, username='test')  # PEP8: spaces after commas
cur = connection.cursor()
print('Connection Established')

object_ = sys.argv[1]   # PEP8: there is class `object` so I add `_` to create different name
start = sys.argv[2]
end = sys.argv[3]

func1(object_, start, end)

cur.close()
connection.close()

您不必在函数外使用相同的名称

args1 = sys.argv[1]
args2 = sys.argv[2]
args3 = sys.argv[3]

func1(args1, args2, args3)

你甚至可以做到

func1(sys.argv[1], sys.argv[2], sys.argv[3])

因为当你 运行 这一行然后 python 获得定义 def func1(object, start, end): 并且它在 func1 内创建名称为 object, start, end 的局部变量并且它分配外部这些局部变量的值

object=objec_, start=start, end=end 

object=args1, start=args2, end=args2 

object=sys.argv[1], start=sys.argv[1], end=sys.argv[1]

最好也显式发送 cur 到函数

def func1(cur, object_, start, end):
    # code

func1(cur, sys.argv[1], sys.argv[2], sys.argv[3])

我不知道您在 SQL 查询中尝试做什么,但是 Python 使用 {start}(没有 $)将值放入字符串 (Bash 使用 ${start}) 并且它需要前缀 f 来创建 f-string - f"""... {start}...."""。没有 f 你必须使用正常的字符串格式 """... {start}....""".format(start=start)


import sys
import csv
from pyhive import presto

# --- functions ----

def func1(cur, object_, start, end):  # PEP8: spaces after commas
    
    # Python use `{star} {end}`, Bash uses `${start} ${end}`
    
    # String needs prefix `f` to use `{name} {end}` in f-string
    # or you have to use `"{start} {end}".format(start=value1, end=value2)`
    
    result = cur.execute(f"""
    WITH map_date AS 
    (
      SELECT 
        object, 
        epoch,
        timestamp,
        date,
        map_agg(name, value) AS map_values
      FROM hive.schema.test1
      WHERE object = '{object_}' 
        AND (epoch >= '{start}' AND epoch <= '{end}')
        AND name IN ('x','y')
      GROUP BY object,epoch,timestamp,date
      ORDER BY timestamp asc
    )
    SELECT
      epoch,
      timestamp,
      CASE WHEN element_at(map_values, 'x') IS NOT NULL THEN map_values['x'] ELSE NULL END AS x,
      CASE WHEN element_at(map_values, 'y') IS NOT NULL THEN map_values['y'] ELSE NULL END AS y,
      object,
      date AS date
    FROM map_date
    """)

    rows = cur.fetchall()
    colnames = [desc[0] for desc in cur.description]  # store the headers in variable called 'colnames'

    print('Query Finished')  # returns the list with one entry for each record

    fp = open('/Users/xyz/Desktop/Python/function.csv', 'w')
    
    my_file = csv.writer(fp)   # PEP8: lower_case_names for variables
    my_file.writerow(colnames)  # write the header to the file
    my_file.writerows(rows)
    
    fp.close()

    print('File Created')

# --- main ---

connection = presto.connect(host='xyz', port=8889, username='test')  # PEP8: spaces after commas
cur = connection.cursor()
print('Connection Established')

#object_ = sys.argv[1]   # PEP8: there is class `object` so I add `_` to create different name
#start = sys.argv[2]
#end = sys.argv[3]
#func1(cur, object_, start, end)

func1(cur, sys.argv[1], sys.argv[2], sys.argv[3])

cur.close()
connection.close()

如果您打算使用 argparse

parser = argparse.ArgumentParser()

parser.add_argument('-o', '--object', help='object to search')
parser.add_argument('-s', '--start',  help='epoch start')
parser.add_argument('-e', '--end',    help='epoch end')

args = parser.parse_args()

然后

func1(cur, args.object, args.start, args.end)

import argparse

# ... imports and functions ...

# --- main ---

parser = argparse.ArgumentParser()
parser.add_argument('-o', '--object', help='object to search')
parser.add_argument('-s', '--start',  help='epoch start')
parser.add_argument('-e', '--end',    help='epoch end')
#parser.add_argument('-D', '--debug', action='store_true', help='debug (display extra info)')
args = parser.parse_args()

#if args.debug:
#    print(args)

connection = presto.connect(host='xyz', port=8889, username='test')  # PEP8: spaces after commas
cur = connection.cursor()
print('Connection Established')

func1(cur, args.object, args.start, args.end)

cur.close()