X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_ipsec_spd_fp_input.py;h=d70623ef2ea9f3b4e9cd585ff5a27827ea92d5e1;hb=816b5d613c3a977811af0494e2d215aac6684515;hp=199fbdf7c5db688f0b306bf8cf32978896725acd;hpb=993b6bee63d4f455db0a6021c9659aad4545acf2;p=vpp.git diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py index 199fbdf7c5d..d70623ef2ea 100644 --- a/test/test_ipsec_spd_fp_input.py +++ b/test/test_ipsec_spd_fp_input.py @@ -7,7 +7,6 @@ from framework import VppTestRunner from template_ipsec import IPSecIPv4Fwd from template_ipsec import IPSecIPv6Fwd from test_ipsec_esp import TemplateIpsecEsp -import pdb def debug_signal_handler(signal, frame): @@ -35,30 +34,6 @@ class SpdFastPathInbound(IPSecIPv4Fwd): cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"]) cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - @classmethod - def create_enc_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): - packets = [] - params = self.params[socket.AF_INET] - for i in range(pkt_count): - # create packet info stored in the test case instance - info = self.create_packet_info(src_if, dst_if) - # convert the info into packet payload - payload = self.info_to_payload(info) - # create the packet itself - p = Ether( - src=self.tra_if.remote_mac, dst=self.tra_if.local_mac - ) / params.scapy_tra_sa.encrypt( - IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) - ) - # store a copy of the packet in the packet info - info.data = p.copy() - # append the packet to the list - packets.append(p) - # return the created packet list - return packets - class SpdFastPathInboundProtect(TemplateIpsecEsp): @classmethod @@ -98,6 +73,29 @@ class SpdFastPathIPv6Inbound(IPSecIPv6Fwd): cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) +class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp): + @classmethod + def setUpConstants(cls): + super(SpdFastPathIPv6InboundProtect, cls).setUpConstants() + cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"]) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) + + @classmethod + def setUpClass(cls): + super(SpdFastPathIPv6InboundProtect, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(SpdFastPathIPv6InboundProtect, cls).tearDownClass() + + def setUp(self): + super(SpdFastPathIPv6InboundProtect, self).setUp() + + def tearDown(self): + self.unconfig_network() + super(SpdFastPathIPv6InboundProtect, self).tearDown() + + class IPSec4SpdTestCaseBypass(SpdFastPathInbound): """ IPSec/IPv4 inbound: Policy mode test case with fast path \ (add bypass)""" @@ -120,7 +118,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound): # create input rules # bypass rule should take precedence over discard rule, - # even though it's lower priority + # even though it's lower priority, because for input policies + # matching PROTECT policies precedes matching BYPASS policies + # which preceeds matching for DISCARD policies. + # Any hit stops the process. policy_0 = self.spd_add_rem_policy( # inbound, priority 10 1, self.pg1, @@ -130,10 +131,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound): priority=10, policy_type="bypass", ip_range=True, - local_ip_start=self.pg0.remote_ip4, - local_ip_stop=self.pg0.remote_ip4, - remote_ip_start=self.pg1.remote_ip4, - remote_ip_stop=self.pg1.remote_ip4, + local_ip_start=self.pg1.remote_ip4, + local_ip_stop=self.pg1.remote_ip4, + remote_ip_start=self.pg0.remote_ip4, + remote_ip_stop=self.pg0.remote_ip4, ) policy_1 = self.spd_add_rem_policy( # inbound, priority 15 1, @@ -144,10 +145,10 @@ class IPSec4SpdTestCaseBypass(SpdFastPathInbound): priority=15, policy_type="discard", ip_range=True, - local_ip_start=self.pg0.remote_ip4, - local_ip_stop=self.pg0.remote_ip4, - remote_ip_start=self.pg1.remote_ip4, - remote_ip_stop=self.pg1.remote_ip4, + local_ip_start=self.pg1.remote_ip4, + local_ip_stop=self.pg1.remote_ip4, + remote_ip_start=self.pg0.remote_ip4, + remote_ip_stop=self.pg0.remote_ip4, ) # create output rule so we can capture forwarded packets @@ -212,18 +213,13 @@ class IPSec4SpdTestCaseDiscard(SpdFastPathInbound): is_out=0, priority=10, policy_type="discard", - ip_range=True, - local_ip_start=self.pg0.remote_ip4, - local_ip_stop=self.pg0.remote_ip4, - remote_ip_start=self.pg1.remote_ip4, - remote_ip_stop=self.pg1.remote_ip4, ) # create output rule so we can capture forwarded packets policy_1 = self.spd_add_rem_policy( # outbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=1, priority=10, @@ -264,16 +260,9 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect): super(IPSec4SpdTestCaseProtect, self).tearDown() def test_ipsec_spd_inbound_protect(self): - # In this test case, packets in IPv4 FWD path are configured + # In this test case, encrypted packets in IPv4 + # PROTECT path are configured # to go through IPSec inbound SPD policy lookup. - # - # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. - # - High priority rule action is set to DISCARD. - # - Low priority rule action is set to BYPASS. - # - # Since BYPASS rules take precedence over DISCARD - # (the order being PROTECT, BYPASS, DISCARD) we expect the - # BYPASS rule to match and traffic to be correctly forwarded. pkt_count = 5 payload_size = 64 @@ -282,8 +271,8 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect): p, p.scapy_tra_sa, self.tra_if, - src=self.tra_if.local_ip4, - dst=self.tra_if.remote_ip4, + src=self.tra_if.remote_ip4, + dst=self.tra_if.local_ip4, count=pkt_count, payload_size=payload_size, ) @@ -304,8 +293,8 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect): pkt_count, "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts), ) - self.assertEqual(p.tra_sa_out.get_lost(), 0) - self.assertEqual(p.tra_sa_in.get_lost(), 0) + self.assertEqual(p.tra_sa_out.get_err("lost"), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound): @@ -340,10 +329,10 @@ class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound): priority=10, policy_type="bypass", ip_range=True, - local_ip_start=s_ip_s0, - local_ip_stop=s_ip_e0, - remote_ip_start=d_ip_s0, - remote_ip_stop=d_ip_e0, + local_ip_start=d_ip_s0, + local_ip_stop=d_ip_e0, + remote_ip_start=s_ip_s0, + remote_ip_stop=s_ip_e0, ) policy_1 = self.spd_add_rem_policy( # outbound, priority 5 1, @@ -492,8 +481,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound): self.spd_create_and_intf_add(1, [self.pg0, self.pg1]) policy_0 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -501,8 +490,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound): ) policy_1 = self.spd_add_rem_policy( # inbound, priority 5 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=5, @@ -547,8 +536,8 @@ class IPSec4SpdTestCaseRemove(SpdFastPathInbound): # now remove the bypass rule self.spd_add_rem_policy( # outbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -593,8 +582,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound): self.spd_create_and_intf_add(1, [self.pg0, self.pg1]) policy_0 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -602,8 +591,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound): ) policy_1 = self.spd_add_rem_policy( # inbound, priority 5 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=5, @@ -647,8 +636,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound): # remove the bypass rule, leaving only the discard rule self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -673,8 +662,8 @@ class IPSec4SpdTestCaseReadd(SpdFastPathInbound): # now readd the bypass rule policy_0 = self.spd_add_rem_policy( # outbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -726,8 +715,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): # add rules on all interfaces policy_01 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -735,8 +724,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): ) policy_02 = self.spd_add_rem_policy( # inbound, priority 5 1, - self.pg0, self.pg1, + self.pg0, socket.IPPROTO_UDP, is_out=0, priority=5, @@ -745,8 +734,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): policy_11 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg1, self.pg2, + self.pg1, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -754,8 +743,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): ) policy_12 = self.spd_add_rem_policy( # inbound, priority 5 1, - self.pg1, self.pg2, + self.pg1, socket.IPPROTO_UDP, is_out=0, priority=5, @@ -764,8 +753,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): policy_21 = self.spd_add_rem_policy( # inbound, priority 5 1, - self.pg2, self.pg0, + self.pg2, socket.IPPROTO_UDP, is_out=0, priority=5, @@ -773,8 +762,8 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): ) policy_22 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg2, self.pg0, + self.pg2, socket.IPPROTO_UDP, is_out=0, priority=10, @@ -840,5 +829,56 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): self.verify_policy_match(0, policy_22) +class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect): + """ IPSec/IPv6 inbound: Policy mode test case with fast path \ + (add protect)""" + + @classmethod + def setUpClass(cls): + super(IPSec6SpdTestCaseProtect, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(IPSec6SpdTestCaseProtect, cls).tearDownClass() + + def setUp(self): + super(IPSec6SpdTestCaseProtect, self).setUp() + + def tearDown(self): + super(IPSec6SpdTestCaseProtect, self).tearDown() + + def test_ipsec6_spd_inbound_protect(self): + pkt_count = 5 + payload_size = 64 + p = self.params[socket.AF_INET6] + send_pkts = self.gen_encrypt_pkts6( + p, + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=pkt_count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) + + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + pkts = p.tra_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, + pkt_count, + "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts), + ) + pkts = p.tra_sa_out.get_stats()["packets"] + self.assertEqual( + pkts, + pkt_count, + "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts), + ) + self.assertEqual(p.tra_sa_out.get_err("lost"), 0) + self.assertEqual(p.tra_sa_in.get_err("lost"), 0) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)