如何把一个Select * Postgres/Redshift的查询结果放入字典中(column/value)

How to Put a Select * Postgres/Redshift Query results into a dictionary(column/value)

我正在尝试使用列及其值将 postgres/redshift 查询的结果收集到字典中。

所以如果我的 select * 来自 ___ 语句的结果是:

字段 1 |字段2 |字段 3

值1 |值2 |值 3

如何将结果放入字典:field1:value1、field2、value2 等.....

这是我的 Airflow 脚本:

#Import Modules

from datetime import datetime, timedelta
from airflow import DAG
from paramiko.config import SSH_PORT
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from sshtunnel import SSHTunnelForwarder, create_logger
from io import StringIO
import logging
from distutils.util import execute
from contextlib import closing
import paramiko
import MySQLdb as sql
from contextlib import closing
import psycopg2
import psycopg2.extensions
from psycopg2.extras import RealDictCursor
import psycopg2.extras
import operator
import itertools
from query_tools import fetch, execute




def get_etl():
    pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
    connection = pg_hook.get_conn()
    col_query = "select * from schema.table"
    cursor = connection.cursor()
    cursor.execute(col_query)
    ff = cursor.fetchall()
    connection.commit()
    connection.close()


# Identify Deafult Arguments

default_args = {
    'owner': 'm',
    'depends_on_past': False,
    'start_date': datetime(2019,12,15),
    'email': ['ma@aol.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}


#Instantiate DAG instance

with DAG('try_me', description ='This ist by Maliva ', default_args = default_args, schedule_interval ='@hourly', catchup = False) as dag:
    t1 = PythonOperator(task_id ='new_one', python_callable = get_etl )

当运行此脚本时,它输出结果但只输出记录:value1,value2,value3。

关于获取与该值关联的列名有什么想法或建议吗?

您可以为此使用 itertools :

import itertools

pg_hook = PostgresHook(postgre_conn_id="postgres_default", schema='schema1')
connection = pg_hook.get_conn()
col_query = "select * from schema.table"
cursor = connection.cursor()
cursor.execute(col_query)
#fetchall to dictonary
desc = cursor.description
column_names = [col[0] for col in desc]
data = [dict(zip(column_names, row)) for row in cursor.fetchall()]
print(data)

#ff = cursor.fetchall()
connection.commit()
connection.close()