Python 在字段访问 ctypes 时崩溃
Python crashes on field access ctypes
我正在使用 ctypes 模块调用 GetTcpTable2
。
我一直在慢慢地将 C++ 中的示例 here 转换为 Python;但在现场访问期间出现崩溃。
if __name__ == "__main__":
ptcp_table = POINTER(MIB_TCPTABLE2)()
ptcp_table = cast(create_string_buffer(sizeof(MIB_TCPTABLE2)),
POINTER(MIB_TCPTABLE2))
ip_addr = in_addr()
size = c_ulong(sizeof(MIB_TCPTABLE2))
retval = GetTcpTable2(ptcp_table, byref(size), TRUE)
if retval == ERROR_INSUFFICIENT_BUFFER:
ptcp_table = cast(create_string_buffer(size.value),
POINTER(MIB_TCPTABLE2))
if not ptcp_table:
#throw error
pass
retval = GetTcpTable2(ptcp_table, byref(size), TRUE)
if retval == NO_ERROR:
print("Entries %d" % ptcp_table[0].dwNumEntries)
for i in range(0, ptcp_table[0].dwNumEntries):
print(ptcp_table[0].table[i])
#ip_addr.S_un.S_addr = ptcp_table[0].table[i].dwLocalAddr
#ip_addr_string = inet_nota(ip_addr)
#print(ip_addr_string)
#print(string_at(ip_addr_string))
尝试访问除 table[i]
之外的 dwLocalAddr
时崩溃。
ptcp_table[0].table[i].dwLocalAddr
然而它在打印 ptcp_table[0].table[i]
时不会崩溃。
我试过打印和访问其他字段;但是 Python 只是崩溃。
这是我的结构定义:
class MIB_TCPROW2(Structure):
_fields_ = [
("dwState", c_ulong),
("dwLocalAddr", c_ulong),
("dwLocalPort", c_ulong),
("dwRemoteAddr", c_ulong),
("dwRemotePort", c_ulong),
("dwOwningPid", c_ulong),
("dwOffloadState", c_int)
]
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", POINTER(MIB_TCPROW2))
]
GetTcpTable2
的定义:
GetTcpTable2 = windll.iphlpapi.GetTcpTable2
GetTcpTable2.argtypes = [POINTER(MIB_TCPTABLE2), POINTER(c_ulong), c_char]
GetTcpTable2.restype = c_ulong
我有一个小小的预感,在 MIB_TCPTABLE2
结构的定义中;文档说 table
是 MIB_TCPROW2
大小的数组 ANY_SIZE
;进一步检查 ANY_SIZE
是 1 从检查 iphlpapi.h
文件。我知道 POINTER(MIB_TCPROW2)
的大小不等于 MIB_TCPROW2
的大小。
我研究了围绕结构内部可变长度字段的其他 ctypes 问题,并得出一个 answer 建议使用工厂方法生成 class 定义。
def MIB_TCPTABLE2_FACTORY(size):
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * size)
]
return MIB_TCPTABLE2
我可以使用这个知道从 GetTcpTable2
返回的 size
来创建一个新类型。然后我所要做的就是更改 GetTcpTable2
的 argtypes
以接受 void *
.
GetTcpTable2.argtypes = [c_void_p, POINTER(c_ulong), c_char]
我就是这样解决的。我首先通过传入以下参数获得了所需的大小:
ret = windll.iphlpapi.GetTcpTable2(None, byref(tcp_table_size), True)
注意None是第一个参数,传入ctypes windows函数时相当于NULL。然后我定义了 MIB_TCPTABLE2 class 并在其中传入了第一次调用 GetTcpTable2 返回的大小:
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * tcp_table_size.value),
]
接下来,我创建了一个结构实例,并再次调用 GetTcpTable2 传入新创建的结构:
tcp_table = MIB_TCPTABL2()
ret = windll.iphlpapi.GetTcpTable2(byref(tcp_table), byref(tcp_table_size), True)
示例代码如下:
from ctypes import *
import socket
import struct
NO_ERROR = 0
ERROR_INSUFFICIENT_BUFFER = 122
TcpConnectionOffloadStateInHost = 0
TcpConnectionOffloadStateOffloading = 1
TcpConnectionOffloadStateOffloaded = 2
TcpConnectionOffloadStateUploading = 3
TcpConnectionOffloadStateMax = 4
class MIB_TCPROW2(Structure):
_fields_ = [
("dwState", c_ulong),
("dwLocalAddr", c_ulong),
("dwLocalPort", c_ulong),
("dwRemoteAddr", c_ulong),
("dwRemotePort", c_ulong),
("dwOwningPid", c_ulong),
("dwOffloadState", c_ulong),
]
def main():
windll.iphlpapi.GetTcpTable2.argtypes = [c_void_p, POINTER(c_ulong), c_bool]
tcp_table_size = c_ulong()
ret = windll.iphlpapi.GetTcpTable2(None, byref(tcp_table_size), True)
if ret == ERROR_INSUFFICIENT_BUFFER:
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * tcp_table_size.value),
]
tcp_table = MIB_TCPTABLE2()
ret = windll.iphlpapi.GetTcpTable2(byref(tcp_table), byref(tcp_table_size), True)
if ret != NO_ERROR:
print("ERROR: GetTcpTable2() failed, error = " + str(ret))
else:
for i in range(tcp_table.dwNumEntries):
dest_ip = socket.inet_ntoa(struct.pack('<L', tcp_table.table[i].dwRemoteAddr))
print("PID: " + str(tcp_table.table[i].dwOwningPid) + ", DEST IP: " + dest_ip)
if __name__ == "__main__":
main()
我正在使用 ctypes 模块调用 GetTcpTable2
。
我一直在慢慢地将 C++ 中的示例 here 转换为 Python;但在现场访问期间出现崩溃。
if __name__ == "__main__":
ptcp_table = POINTER(MIB_TCPTABLE2)()
ptcp_table = cast(create_string_buffer(sizeof(MIB_TCPTABLE2)),
POINTER(MIB_TCPTABLE2))
ip_addr = in_addr()
size = c_ulong(sizeof(MIB_TCPTABLE2))
retval = GetTcpTable2(ptcp_table, byref(size), TRUE)
if retval == ERROR_INSUFFICIENT_BUFFER:
ptcp_table = cast(create_string_buffer(size.value),
POINTER(MIB_TCPTABLE2))
if not ptcp_table:
#throw error
pass
retval = GetTcpTable2(ptcp_table, byref(size), TRUE)
if retval == NO_ERROR:
print("Entries %d" % ptcp_table[0].dwNumEntries)
for i in range(0, ptcp_table[0].dwNumEntries):
print(ptcp_table[0].table[i])
#ip_addr.S_un.S_addr = ptcp_table[0].table[i].dwLocalAddr
#ip_addr_string = inet_nota(ip_addr)
#print(ip_addr_string)
#print(string_at(ip_addr_string))
尝试访问除 table[i]
之外的 dwLocalAddr
时崩溃。
ptcp_table[0].table[i].dwLocalAddr
然而它在打印 ptcp_table[0].table[i]
时不会崩溃。
我试过打印和访问其他字段;但是 Python 只是崩溃。
这是我的结构定义:
class MIB_TCPROW2(Structure):
_fields_ = [
("dwState", c_ulong),
("dwLocalAddr", c_ulong),
("dwLocalPort", c_ulong),
("dwRemoteAddr", c_ulong),
("dwRemotePort", c_ulong),
("dwOwningPid", c_ulong),
("dwOffloadState", c_int)
]
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", POINTER(MIB_TCPROW2))
]
GetTcpTable2
的定义:
GetTcpTable2 = windll.iphlpapi.GetTcpTable2
GetTcpTable2.argtypes = [POINTER(MIB_TCPTABLE2), POINTER(c_ulong), c_char]
GetTcpTable2.restype = c_ulong
我有一个小小的预感,在 MIB_TCPTABLE2
结构的定义中;文档说 table
是 MIB_TCPROW2
大小的数组 ANY_SIZE
;进一步检查 ANY_SIZE
是 1 从检查 iphlpapi.h
文件。我知道 POINTER(MIB_TCPROW2)
的大小不等于 MIB_TCPROW2
的大小。
我研究了围绕结构内部可变长度字段的其他 ctypes 问题,并得出一个 answer 建议使用工厂方法生成 class 定义。
def MIB_TCPTABLE2_FACTORY(size):
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * size)
]
return MIB_TCPTABLE2
我可以使用这个知道从 GetTcpTable2
返回的 size
来创建一个新类型。然后我所要做的就是更改 GetTcpTable2
的 argtypes
以接受 void *
.
GetTcpTable2.argtypes = [c_void_p, POINTER(c_ulong), c_char]
我就是这样解决的。我首先通过传入以下参数获得了所需的大小:
ret = windll.iphlpapi.GetTcpTable2(None, byref(tcp_table_size), True)
注意None是第一个参数,传入ctypes windows函数时相当于NULL。然后我定义了 MIB_TCPTABLE2 class 并在其中传入了第一次调用 GetTcpTable2 返回的大小:
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * tcp_table_size.value),
]
接下来,我创建了一个结构实例,并再次调用 GetTcpTable2 传入新创建的结构:
tcp_table = MIB_TCPTABL2()
ret = windll.iphlpapi.GetTcpTable2(byref(tcp_table), byref(tcp_table_size), True)
示例代码如下:
from ctypes import *
import socket
import struct
NO_ERROR = 0
ERROR_INSUFFICIENT_BUFFER = 122
TcpConnectionOffloadStateInHost = 0
TcpConnectionOffloadStateOffloading = 1
TcpConnectionOffloadStateOffloaded = 2
TcpConnectionOffloadStateUploading = 3
TcpConnectionOffloadStateMax = 4
class MIB_TCPROW2(Structure):
_fields_ = [
("dwState", c_ulong),
("dwLocalAddr", c_ulong),
("dwLocalPort", c_ulong),
("dwRemoteAddr", c_ulong),
("dwRemotePort", c_ulong),
("dwOwningPid", c_ulong),
("dwOffloadState", c_ulong),
]
def main():
windll.iphlpapi.GetTcpTable2.argtypes = [c_void_p, POINTER(c_ulong), c_bool]
tcp_table_size = c_ulong()
ret = windll.iphlpapi.GetTcpTable2(None, byref(tcp_table_size), True)
if ret == ERROR_INSUFFICIENT_BUFFER:
class MIB_TCPTABLE2(Structure):
_fields_ = [
("dwNumEntries", c_ulong),
("table", MIB_TCPROW2 * tcp_table_size.value),
]
tcp_table = MIB_TCPTABLE2()
ret = windll.iphlpapi.GetTcpTable2(byref(tcp_table), byref(tcp_table_size), True)
if ret != NO_ERROR:
print("ERROR: GetTcpTable2() failed, error = " + str(ret))
else:
for i in range(tcp_table.dwNumEntries):
dest_ip = socket.inet_ntoa(struct.pack('<L', tcp_table.table[i].dwRemoteAddr))
print("PID: " + str(tcp_table.table[i].dwOwningPid) + ", DEST IP: " + dest_ip)
if __name__ == "__main__":
main()