X-Git-Url: https://gerrit.fd.io/r/gitweb?p=vpp.git;a=blobdiff_plain;f=test%2Ftest_ip4.py;h=5bd50ce3ea4ce873b1920a37c9638741381fe009;hp=55d16735a01f7e27a4e8ae6daeb7f7958f85dea9;hb=d91c1dbdb31f80db7d967f2f57c43d0a81d65297;hpb=268e64e312257b0ab36e0d5b9124cc3f2a1841a7 diff --git a/test/test_ip4.py b/test/test_ip4.py index 55d16735a01..5bd50ce3ea4 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -11,7 +11,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \ from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q, ARP -from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes +from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes from util import ppp from scapy.contrib.mpls import MPLS @@ -1009,5 +1009,115 @@ class TestIPVlan0(VppTestCase): self.send_and_expect(self.pg0, pkts, self.pg1) +class TestIPPunt(VppTestCase): + """ IPv4 Punt Police/Redirect """ + + def setUp(self): + super(TestIPPunt, self).setUp() + + self.create_pg_interfaces(range(2)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + def tearDown(self): + super(TestIPPunt, self).tearDown() + for i in self.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + + def send_and_expect(self, input, pkts, output): + self.vapi.cli("clear trace") + input.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + rx = output.get_capture(len(pkts)) + return rx + + def send_and_assert_no_replies(self, intf, pkts, remark): + self.vapi.cli("clear trace") + intf.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + for i in self.pg_interfaces: + i.get_capture(0) + i.assert_nothing_captured(remark=remark) + + def test_ip_punt(self): + """ IP punt police and redirect """ + + p = (Ether(src=self.pg0.remote_mac, + dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / + TCP(sport=1234, dport=1234) / + Raw('\xa5' * 100)) + + pkts = p * 1025 + + # + # Configure a punt redirect via pg1. + # + nh_addr = socket.inet_pton(socket.AF_INET, + self.pg1.remote_ip4) + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg1.sw_if_index, + nh_addr) + + self.send_and_expect(self.pg0, pkts, self.pg1) + + # + # add a policer + # + policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0, + rate_type=1) + self.vapi.ip_punt_police(policer.policer_index) + + self.vapi.cli("clear trace") + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # + # the number of packet recieved should be greater than 0, + # but not equal to the number sent, since some were policed + # + rx = self.pg1._get_capture(1) + self.assertTrue(len(rx) > 0) + self.assertTrue(len(rx) < len(pkts)) + + # + # remove the poilcer. back to full rx + # + self.vapi.ip_punt_police(policer.policer_index, is_add=0) + self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0, + rate_type=1, is_add=0) + self.send_and_expect(self.pg0, pkts, self.pg1) + + # + # remove the redirect. expect full drop. + # + self.vapi.ip_punt_redirect(self.pg0.sw_if_index, + self.pg1.sw_if_index, + nh_addr, + is_add=0) + self.send_and_assert_no_replies(self.pg0, pkts, + "IP no punt config") + + # + # Add a redirect that is not input port selective + # + self.vapi.ip_punt_redirect(0xffffffff, + self.pg1.sw_if_index, + nh_addr) + self.send_and_expect(self.pg0, pkts, self.pg1) + + self.vapi.ip_punt_redirect(0xffffffff, + self.pg1.sw_if_index, + nh_addr, + is_add=0) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)