Code Review
/
csit.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
FIX: IPsec TNL test should load plugins
[csit.git]
/
resources
/
traffic_scripts
/
send_tcp_udp.py
diff --git
a/resources/traffic_scripts/send_tcp_udp.py
b/resources/traffic_scripts/send_tcp_udp.py
index
77f9182
..
4cba732
100755
(executable)
--- a/
resources/traffic_scripts/send_tcp_udp.py
+++ b/
resources/traffic_scripts/send_tcp_udp.py
@@
-19,9
+19,9
@@
from one interface to the other.
import sys
import ipaddress
import sys
import ipaddress
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
from scapy.all import Ether
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
@@
-62,9
+62,8
@@
def valid_ipv6(ip):
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
- args = TrafficScriptArg(
- ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
+ args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
+ 'source_port', 'destination_port'])
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
@@
-77,7
+76,6
@@
def main():
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
@@
-90,7
+88,7
@@
def main():
elif protocol.upper() == 'UDP':
protocol = UDP
else:
elif protocol.upper() == 'UDP':
protocol = UDP
else:
- raise ValueError("Invalid
type of protocol
!")
+ raise ValueError("Invalid
protocol type
!")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
@@
-100,19
+98,27
@@
def main():
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("TCP/UDP Rx timeout")
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('TCP/UDP Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if
'TCP'
in ether:
+ if
TCP
in ether:
print ("TCP packet received.")
print ("TCP packet received.")
- elif
'UDP'
in ether:
+ elif
UDP
in ether:
print ("UDP packet received.")
else:
print ("UDP packet received.")
else:
- raise RuntimeError("Not an TCP or UDP packet received {0}"
-
.
format(ether.__repr__()))
+ raise RuntimeError("Not an TCP or UDP packet received {0}"
.
+ format(ether.__repr__()))
sys.exit(0)
sys.exit(0)