X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ip4.py;h=7f6e92fa37fc7619bc4b31ad6f13b58f89dec616;hb=696e88da9799056036f329676213f3c0c0a1db9c;hp=f67c3b9ce32b82e9e4627873d05d86bc6ec8979a;hpb=16a14cdb160160573e2d1ed69a52998cc30ce34f;p=vpp.git diff --git a/test/test_ip4.py b/test/test_ip4.py index f67c3b9ce32..7f6e92fa37f 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -5,10 +5,11 @@ import unittest from framework import VppTestCase, VppTestRunner from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint +from vpp_ip_route import VppIpRoute, VppRoutePath from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q -from scapy.layers.inet import IP, UDP +from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes from util import ppp @@ -108,8 +109,7 @@ class TestIPv4(VppTestCase): pkts = [] for i in range(0, 257): dst_if = self.flows[src_if][i % 2] - info = self.create_packet_info( - src_if.sw_if_index, dst_if.sw_if_index) + info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) / @@ -149,8 +149,9 @@ class TestIPv4(VppTestCase): payload_info = self.payload_to_info(str(packet[Raw])) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (dst_if.name, payload_info.src, packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" % + (dst_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) @@ -210,7 +211,7 @@ class TestIPv4FibCrud(VppTestCase): - add new 1k, - del 1.5k - ..note:: Python API is to slow to add many routes, needs C code replacement. + ..note:: Python API is too slow to add many routes, needs replacement. """ def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, count): @@ -222,8 +223,9 @@ class TestIPv4FibCrud(VppTestCase): :return list: added ips with 32 prefix """ added_ips = [] - dest_addr = int( - socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16) + dest_addr = int(socket.inet_pton(socket.AF_INET, + start_dest_addr).encode('hex'), + 16) dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): @@ -237,8 +239,9 @@ class TestIPv4FibCrud(VppTestCase): def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count): removed_ips = [] - dest_addr = int( - socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16) + dest_addr = int(socket.inet_pton(socket.AF_INET, + start_dest_addr).encode('hex'), + 16) dest_addr_len = 32 n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr) for _ in range(count): @@ -254,15 +257,13 @@ class TestIPv4FibCrud(VppTestCase): for _ in range(count): dst_addr = random.choice(dst_ips) - info = self.create_packet_info( - src_if.sw_if_index, dst_if.sw_if_index) + info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IP(src=src_if.remote_ip4, dst=dst_addr) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() - size = random.choice(self.pg_if_packet_sizes) self.extend_packet(p, random.choice(self.pg_if_packet_sizes)) pkts.append(p) @@ -270,7 +271,8 @@ class TestIPv4FibCrud(VppTestCase): def _find_ip_match(self, find_in, pkt): for p in find_in: - if self.payload_to_info(str(p[Raw])) == self.payload_to_info(str(pkt[Raw])): + if self.payload_to_info(str(p[Raw])) == \ + self.payload_to_info(str(pkt[Raw])): if p[IP].src != pkt[IP].src: break if p[IP].dst != pkt[IP].dst: @@ -357,7 +359,7 @@ class TestIPv4FibCrud(VppTestCase): def setUp(self): super(TestIPv4FibCrud, self).setUp() - self.packet_infos = {} + self.reset_packet_infos() def test_1_add_routes(self): """ Add 1k routes @@ -381,10 +383,9 @@ class TestIPv4FibCrud(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - pkts = self.pg0.get_capture() + pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2)) self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2) - def test_2_del_routes(self): """ Delete 100 routes @@ -411,7 +412,7 @@ class TestIPv4FibCrud(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - pkts = self.pg0.get_capture() + pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2)) self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2) def test_3_add_new_routes(self): @@ -446,7 +447,7 @@ class TestIPv4FibCrud(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - pkts = self.pg0.get_capture() + pkts = self.pg0.get_capture(len(self.stream_1) + len(self.stream_2)) self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2) def test_4_del_routes(self): @@ -465,5 +466,85 @@ class TestIPv4FibCrud(VppTestCase): self.verify_not_in_route_dump(fib_dump, self.deleted_routes) +class TestIPNull(VppTestCase): + """ IPv4 routes via NULL """ + + def setUp(self): + super(TestIPNull, self).setUp() + + # create 2 pg interfaces + self.create_pg_interfaces(range(1)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def tearDown(self): + super(TestIPNull, self).tearDown() + for i in self.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + + def test_ip_null(self): + """ IP NULL route """ + + # + # A route via IP NULL that will reply with ICMP unreachables + # + ip_unreach = VppIpRoute(self, "10.0.0.1", 32, [], is_unreach=1) + ip_unreach.add_vpp_config() + + p_unreach = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst="10.0.0.1") / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + self.pg0.add_stream(p_unreach) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + rx = self.pg0.get_capture(1) + rx = rx[0] + icmp = rx[ICMP] + + self.assertEqual(icmptypes[icmp.type], "dest-unreach") + self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-unreachable") + self.assertEqual(icmp.src, self.pg0.remote_ip4) + self.assertEqual(icmp.dst, "10.0.0.1") + + # + # ICMP replies are rate limited. so sit and spin. + # + self.sleep(1) + + # + # A route via IP NULL that will reply with ICMP prohibited + # + ip_prohibit = VppIpRoute(self, "10.0.0.2", 32, [], is_prohibit=1) + ip_prohibit.add_vpp_config() + + p_prohibit = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst="10.0.0.2") / + UDP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + self.pg0.add_stream(p_prohibit) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + rx = self.pg0.get_capture(1) + + rx = rx[0] + icmp = rx[ICMP] + + self.assertEqual(icmptypes[icmp.type], "dest-unreach") + self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-prohibited") + self.assertEqual(icmp.src, self.pg0.remote_ip4) + self.assertEqual(icmp.dst, "10.0.0.2") + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)