Python3 来自字符串的 Scapy 自定义负载
Python3 Scapy custom payload from string
简介
我正在使用 python3 和 scapy 修改并重新发送捕获的数据包。
问题
当我在 Raw 部分发送修改后的负载时,消息的 \r 和 \n 部分未被解释为特殊字符。相反,它们作为字符串包含在下面的 cmd 和 wireshark 屏幕截图中。
当前有效载荷
预期有效载荷
这是在网络上捕获的正确数据包格式。
wireshark中显示的数据包如下:
SIP/2.0 486 Busy Here
Via: SIP/2.0/UDP 192.168.1.5:5060;branch=z9hG4bK226016822;received=192.168.1.5;rport=5060
From: sip:301@192.168.1.2;tag=2032604445
To: sip:300@192.168.1.2;tag=as1b0290be
Call-ID: 338695025
CSeq: 21 INVITE
Server: Asterisk PBX 16.10.0
Allow: INVITE, ACK, CANCEL, OPTIONS, BYE, REFER, SUBSCRIBE, NOTIFY, INFO, PUBLISH, MESSAGE
Supported: replaces, timer
X-Asterisk-HangupCause: Call Rejected
X-Asterisk-HangupCauseCode: 21
Content-Length: 0
密码
这是目前的代码:
from scapy.all import sniff, Ether, IP, UDP, sendp, ICMP, rdpcap,Raw
import scapy.fields
import re
import codecs
import argparse
def traffic_parser(packet):
BUSY_1 = 'SIP/2.0 486 Busy Here'
BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\\r\\nX-Asterisk-HangupCauseCode: 21'
payload = packet[3].command()
print(bytes(payload))
header=re.findall("Ringing", payload)
if header:
eth_attributes={}
eth_attributes['dst']=packet[0].dst
eth_attributes['src']=packet[0].src
eth_attributes['type']=packet[0].type
eth = Ether_layer(eth_attributes)
udp_attributes={}
udp_attributes['sport']=packet[2].sport
udp_attributes['dport']=packet[2].dport
udp_attributes['len']=444
udp = UDP_layer(udp_attributes)
# Implement packet modification
payload = payload.replace("SIP/2.0 180 Ringing", BUSY_1, 1)
payload = re.sub("Contact\:.*>", BUSY_2, payload,1)
payload = payload.replace("Raw(load=b\'", '', 1)
payload = re.sub("\'\)$", '', payload, 1)
for incr in range(1,5):
ip_attributes={}
ip_attributes['version']=packet[1].version
ip_attributes['tos']=packet[1].tos
ip_attributes['len']=464
ip_attributes['id']=0 #Zero is handled by scapy on send by default
ip_attributes['flags']=packet[1].flags
ip_attributes['frag']=packet[1].frag
ip_attributes['ttl']=packet[1].ttl
ip_attributes['proto']=packet[1].proto
ip_attributes['src']=packet[1].src
ip_attributes['dst']=packet[1].dst
ip = IP_layer(ip_attributes)
sendp(eth/ip/udp/Raw(load=payload))
print(payload)
print(Raw(load=payload))
print("\n")
def Ether_layer(attributes):
layer2=Ether()
layer2.dst=attributes['dst']
layer2.src=attributes['src']
layer2.type=attributes['type']
return layer2
def IP_layer(attributes):
layer3=IP()
layer3.version=attributes['version']
layer3.tos=attributes['tos']
layer3.len=attributes['len']
layer3.id=attributes['id']
layer3.flags=attributes['flags']
layer3.frag=attributes['frag']
layer3.ttl=attributes['ttl']
layer3.proto=attributes['proto']
layer3.src=attributes['src']
layer3.dst=attributes['dst']
return layer3
def UDP_layer(attributes):
layer4=UDP()
layer4.sport=attributes['sport']
layer4.dport=attributes['dport']
layer4.len=attributes['len']
return layer4
parser = argparse.ArgumentParser(description="rtp replay script. Arguments: -i <interface> -f <sniff filter> -o <sniff outputfile> Interface defaults to 'eth0' and filter defaults to 'udp and port 5060'")
parser.add_argument('-i', "--interface", default="eth0", help="interface to use for sniffing")
parser.add_argument('-f', '--filter', default="udp and port 5060", help="filter to be used in scapy")
parser.add_argument('-o', "--outfile", help="output file (optional)")
parser.add_argument('-t', "--testfile", help="parse test file (optional)")
args=parser.parse_args()
if __name__ == '__main__':
if args.testfile:
packets = rdpcap(args.testfile)
for packet in packets:
traffic_parser(packet)
else:
sniff(iface=args.interface, prn=traffic_parser, filter="udp and port 5060", store=0)
Q
如何以所需的形式附加负载?
编辑:
此pcap文件可用于测试。 pcap_file
指示性脚本执行:
sudo python byespam.py -t
\r
和 \n
被解释为文字字符串,因为您已经转义了它们
这是您的字符串:
>>> BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\\r\\nX-Asterisk-HangupCauseCode: 21'
>>> print(BUSY_2)
X-Asterisk-HangupCause: Call Rejected\r\nX-Asterisk-HangupCauseCode: 21
这是相同的字符串,但没有转义:
>>> BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\r\nX-Asterisk-HangupCauseCode: 21'
>>> print(BUSY_2)
X-Asterisk-HangupCause: Call Rejected
X-Asterisk-HangupCauseCode: 21
为了能够将有效负载用作字符串,您需要将其转换为一个字符串,并确保您具有正确的编码。
在你的情况下你应该:
- 您应该只从 scapy.packet.Packet.
中获取 Raw 的 load 部分
- 将 load 部分编码为 UTF-8,以便 Python 正确解释特殊字符。
以下是您的案例的一些工作代码:
payload = packet[UDP].payload.load *
payload = payload.decode("utf-8")
然后您可以打印有效负载,它会被“正确”解释。
* 我使用了 UDP,因为在你的脚本和 pcapng 文件中你只有 UDP 数据包。如果有 TCP 数据包,你应该使用 payload = packet[TCP].payload.load
简介
我正在使用 python3 和 scapy 修改并重新发送捕获的数据包。
问题
当我在 Raw 部分发送修改后的负载时,消息的 \r 和 \n 部分未被解释为特殊字符。相反,它们作为字符串包含在下面的 cmd 和 wireshark 屏幕截图中。
当前有效载荷
预期有效载荷
这是在网络上捕获的正确数据包格式。
wireshark中显示的数据包如下:
SIP/2.0 486 Busy Here Via: SIP/2.0/UDP 192.168.1.5:5060;branch=z9hG4bK226016822;received=192.168.1.5;rport=5060 From: sip:301@192.168.1.2;tag=2032604445 To: sip:300@192.168.1.2;tag=as1b0290be Call-ID: 338695025 CSeq: 21 INVITE Server: Asterisk PBX 16.10.0 Allow: INVITE, ACK, CANCEL, OPTIONS, BYE, REFER, SUBSCRIBE, NOTIFY, INFO, PUBLISH, MESSAGE Supported: replaces, timer X-Asterisk-HangupCause: Call Rejected X-Asterisk-HangupCauseCode: 21 Content-Length: 0
密码
这是目前的代码:
from scapy.all import sniff, Ether, IP, UDP, sendp, ICMP, rdpcap,Raw
import scapy.fields
import re
import codecs
import argparse
def traffic_parser(packet):
BUSY_1 = 'SIP/2.0 486 Busy Here'
BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\\r\\nX-Asterisk-HangupCauseCode: 21'
payload = packet[3].command()
print(bytes(payload))
header=re.findall("Ringing", payload)
if header:
eth_attributes={}
eth_attributes['dst']=packet[0].dst
eth_attributes['src']=packet[0].src
eth_attributes['type']=packet[0].type
eth = Ether_layer(eth_attributes)
udp_attributes={}
udp_attributes['sport']=packet[2].sport
udp_attributes['dport']=packet[2].dport
udp_attributes['len']=444
udp = UDP_layer(udp_attributes)
# Implement packet modification
payload = payload.replace("SIP/2.0 180 Ringing", BUSY_1, 1)
payload = re.sub("Contact\:.*>", BUSY_2, payload,1)
payload = payload.replace("Raw(load=b\'", '', 1)
payload = re.sub("\'\)$", '', payload, 1)
for incr in range(1,5):
ip_attributes={}
ip_attributes['version']=packet[1].version
ip_attributes['tos']=packet[1].tos
ip_attributes['len']=464
ip_attributes['id']=0 #Zero is handled by scapy on send by default
ip_attributes['flags']=packet[1].flags
ip_attributes['frag']=packet[1].frag
ip_attributes['ttl']=packet[1].ttl
ip_attributes['proto']=packet[1].proto
ip_attributes['src']=packet[1].src
ip_attributes['dst']=packet[1].dst
ip = IP_layer(ip_attributes)
sendp(eth/ip/udp/Raw(load=payload))
print(payload)
print(Raw(load=payload))
print("\n")
def Ether_layer(attributes):
layer2=Ether()
layer2.dst=attributes['dst']
layer2.src=attributes['src']
layer2.type=attributes['type']
return layer2
def IP_layer(attributes):
layer3=IP()
layer3.version=attributes['version']
layer3.tos=attributes['tos']
layer3.len=attributes['len']
layer3.id=attributes['id']
layer3.flags=attributes['flags']
layer3.frag=attributes['frag']
layer3.ttl=attributes['ttl']
layer3.proto=attributes['proto']
layer3.src=attributes['src']
layer3.dst=attributes['dst']
return layer3
def UDP_layer(attributes):
layer4=UDP()
layer4.sport=attributes['sport']
layer4.dport=attributes['dport']
layer4.len=attributes['len']
return layer4
parser = argparse.ArgumentParser(description="rtp replay script. Arguments: -i <interface> -f <sniff filter> -o <sniff outputfile> Interface defaults to 'eth0' and filter defaults to 'udp and port 5060'")
parser.add_argument('-i', "--interface", default="eth0", help="interface to use for sniffing")
parser.add_argument('-f', '--filter', default="udp and port 5060", help="filter to be used in scapy")
parser.add_argument('-o', "--outfile", help="output file (optional)")
parser.add_argument('-t', "--testfile", help="parse test file (optional)")
args=parser.parse_args()
if __name__ == '__main__':
if args.testfile:
packets = rdpcap(args.testfile)
for packet in packets:
traffic_parser(packet)
else:
sniff(iface=args.interface, prn=traffic_parser, filter="udp and port 5060", store=0)
Q
如何以所需的形式附加负载?
编辑:
此pcap文件可用于测试。 pcap_file
指示性脚本执行:
sudo python byespam.py -t
\r
和 \n
被解释为文字字符串,因为您已经转义了它们
这是您的字符串:
>>> BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\\r\\nX-Asterisk-HangupCauseCode: 21'
>>> print(BUSY_2)
X-Asterisk-HangupCause: Call Rejected\r\nX-Asterisk-HangupCauseCode: 21
这是相同的字符串,但没有转义:
>>> BUSY_2 = 'X-Asterisk-HangupCause: Call Rejected\r\nX-Asterisk-HangupCauseCode: 21'
>>> print(BUSY_2)
X-Asterisk-HangupCause: Call Rejected
X-Asterisk-HangupCauseCode: 21
为了能够将有效负载用作字符串,您需要将其转换为一个字符串,并确保您具有正确的编码。
在你的情况下你应该:
- 您应该只从 scapy.packet.Packet. 中获取 Raw 的 load 部分
- 将 load 部分编码为 UTF-8,以便 Python 正确解释特殊字符。
以下是您的案例的一些工作代码:
payload = packet[UDP].payload.load *
payload = payload.decode("utf-8")
然后您可以打印有效负载,它会被“正确”解释。
* 我使用了 UDP,因为在你的脚本和 pcapng 文件中你只有 UDP 数据包。如果有 TCP 数据包,你应该使用 payload = packet[TCP].payload.load