return pkts
def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- packet_num=3, dst_ip=None, is_ip6=False):
+ dst_ip=None, is_ip6=False):
"""
Verify captured packets on outside network
:param capture: Captured packets
:param nat_ip: Translated IP address (Default use global NAT address)
:param same_port: Sorce port number is not translated (Default False)
- :param packet_num: Expected number of packets (Default 3)
:param dst_ip: Destination IP address (Default do not verify)
:param is_ip6: If L3 protocol is IPv6 (Default False)
"""
ICMP46 = ICMP
if nat_ip is None:
nat_ip = self.nat_addr
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
if not is_ip6:
raise
def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
- packet_num=3, dst_ip=None):
+ dst_ip=None):
"""
Verify captured packets on outside network
:param capture: Captured packets
:param nat_ip: Translated IP address
:param same_port: Sorce port number is not translated (Default False)
- :param packet_num: Expected number of packets (Default 3)
:param dst_ip: Destination IP address (Default do not verify)
"""
- return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
- dst_ip, True)
+ return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
+ True)
- def verify_capture_in(self, capture, in_if, packet_num=3):
+ def verify_capture_in(self, capture, in_if):
"""
Verify captured packets on inside network
:param capture: Captured packets
:param in_if: Inside interface
- :param packet_num: Expected number of packets (Default 3)
"""
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
self.assert_packet_checksums_valid(packet)
"(inside network):", packet))
raise
- def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
+ def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
"""
Verify captured IPv6 packets on inside network
:param capture: Captured packets
:param src_ip: Source IP
:param dst_ip: Destination IP address
- :param packet_num: Expected number of packets (Default 3)
"""
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
self.assertEqual(packet[IPv6].src, src_ip)
raise
def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
- packet_num=3, icmp_type=11):
+ icmp_type=11):
"""
Verify captured packets with ICMP errors on outside network
:param capture: Captured packets
:param src_ip: Translated IP address or IP address of VPP
(Default use global NAT address)
- :param packet_num: Expected number of packets (Default 3)
:param icmp_type: Type of error ICMP packet
we are expecting (Default 11)
"""
if src_ip is None:
src_ip = self.nat_addr
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
self.assertEqual(packet[IP].src, src_ip)
"(outside network):", packet))
raise
- def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
- icmp_type=11):
+ def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
"""
Verify captured packets with ICMP errors on inside network
:param capture: Captured packets
:param in_if: Inside interface
- :param packet_num: Expected number of packets (Default 3)
:param icmp_type: Type of error ICMP packet
we are expecting (Default 11)
"""
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
self.assertEqual(packet[IP].dst, in_if.remote_ip4)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(len(pkts))
- self.assertEqual(1, len(capture))
packet = capture[0]
try:
self.assertEqual(packet[IP].src, self.pg1.local_ip4)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg0.get_capture(1)
- self.verify_capture_in(capture, self.pg0, packet_num=1)
+ self.verify_capture_in(capture, self.pg0)
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
# in2out
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(1)
- self.verify_capture_out(capture, same_port=True, packet_num=1)
+ self.verify_capture_out(capture, same_port=True)
self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
def test_forwarding(self):
return pkts
- def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
+ def verify_capture_out(self, capture, nat_ip=None):
"""
Verify captured packets on outside network
:param capture: Captured packets
:param nat_ip: Translated IP address (Default use global NAT address)
:param same_port: Sorce port number is not translated (Default False)
- :param packet_num: Expected number of packets (Default 3)
"""
if nat_ip is None:
nat_ip = self.nat_addr
- self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
self.assertEqual(packet[IP].src, nat_ip)
# Wait for Neighbor Solicitation
capture = self.pg5.get_capture(len(pkts))
- self.assertEqual(1, len(capture))
packet = capture[0]
try:
self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
# Wait for ping reply
capture = self.pg5.get_capture(len(pkts))
- self.assertEqual(1, len(capture))
packet = capture[0]
try:
self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(1)
- self.assertEqual(1, len(capture))
capture = capture[0]
self.assertEqual(capture[IPv6].src, aftr_ip6)
self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.get_capture(1)
- self.assertEqual(1, len(capture))
capture = capture[0]
self.assertEqual(capture[IPv6].src, b4_ip6)
self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)