6 from framework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
8 from template_ipsec import IPSecIPv6Fwd
11 class SpdFastPathOutbound(IPSecIPv4Fwd):
12 # Override setUpConstants to enable outbound fast path in config
14 def setUpConstants(cls):
15 super(SpdFastPathOutbound, cls).setUpConstants()
16 cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
17 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
20 class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
21 # Override setUpConstants to enable outbound fast path in config
23 def setUpConstants(cls):
24 super(SpdFastPathIPv6Outbound, cls).setUpConstants()
25 cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
26 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
29 class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
30 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
33 def test_ipsec_spd_outbound_add(self):
34 # In this test case, packets in IPv4 FWD path are configured
35 # to go through IPSec outbound SPD policy lookup.
36 # 2 SPD rules (1 HIGH and 1 LOW) are added.
37 # High priority rule action is set to BYPASS.
38 # Low priority rule action is set to DISCARD.
39 # Traffic sent on pg0 interface should match high priority
40 # rule and should be sent out on pg1 interface.
41 self.create_interfaces(2)
47 self.spd_create_and_intf_add(1, [self.pg1])
48 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
56 local_port_start=s_port_s,
57 local_port_stop=s_port_e,
58 remote_port_start=d_port_s,
59 remote_port_stop=d_port_e,
61 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
68 policy_type="discard",
69 local_port_start=s_port_s,
70 local_port_stop=s_port_e,
71 remote_port_start=d_port_s,
72 remote_port_stop=d_port_e,
75 # create the packet stream
76 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
77 # add the stream to the source interface + enable capture
78 self.pg0.add_stream(packets)
79 self.pg0.enable_capture()
80 self.pg1.enable_capture()
81 # start the packet generator
84 capture = self.pg1.get_capture()
85 for packet in capture:
87 self.logger.debug(ppp("SPD - Got packet:", packet))
89 self.logger.error(ppp("Unexpected or invalid packet:", packet))
91 self.logger.debug("SPD: Num packets: %s", len(capture.res))
93 # assert nothing captured on pg0
94 self.pg0.assert_nothing_captured()
95 # verify captured packets
96 self.verify_capture(self.pg0, self.pg1, capture)
97 # verify all policies matched the expected number of times
98 self.verify_policy_match(pkt_count, policy_0)
99 self.verify_policy_match(0, policy_1)
102 class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
103 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
104 (add all ips port range rule)"""
106 def test_ipsec_spd_outbound_add(self):
107 # In this test case, packets in IPv4 FWD path are configured
108 # to go through IPSec outbound SPD policy lookup.
109 # 2 SPD rules (1 HIGH and 1 LOW) are added.
110 # High priority rule action is set to BYPASS.
111 # Low priority rule action is set to DISCARD.
112 # Traffic sent on pg0 interface should match high priority
113 # rule and should be sent out on pg1 interface.
114 self.create_interfaces(2)
120 self.spd_create_and_intf_add(1, [self.pg1])
121 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
128 policy_type="bypass",
130 local_port_start=s_port_s,
131 local_port_stop=s_port_e,
132 remote_port_start=d_port_s,
133 remote_port_stop=d_port_e,
135 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
142 policy_type="discard",
144 local_port_start=s_port_s,
145 local_port_stop=s_port_e,
146 remote_port_start=d_port_s,
147 remote_port_stop=d_port_e,
150 # create the packet stream
151 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
152 # add the stream to the source interface + enable capture
153 self.pg0.add_stream(packets)
154 self.pg0.enable_capture()
155 self.pg1.enable_capture()
156 # start the packet generator
159 capture = self.pg1.get_capture()
160 for packet in capture:
162 self.logger.debug(ppp("SPD - Got packet:", packet))
164 self.logger.error(ppp("Unexpected or invalid packet:", packet))
166 self.logger.debug("SPD: Num packets: %s", len(capture.res))
168 # assert nothing captured on pg0
169 self.pg0.assert_nothing_captured()
170 # verify captured packets
171 self.verify_capture(self.pg0, self.pg1, capture)
172 # verify all policies matched the expected number of times
173 self.verify_policy_match(pkt_count, policy_0)
174 self.verify_policy_match(0, policy_1)
177 class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
178 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
179 (add ips range with any port rule)"""
181 def test_ipsec_spd_outbound_add(self):
182 # In this test case, packets in IPv4 FWD path are configured
183 # to go through IPSec outbound SPD policy lookup.
184 # 2 SPD rules (1 HIGH and 1 LOW) are added.
185 # High priority rule action is set to BYPASS.
186 # Low priority rule action is set to DISCARD.
187 # Traffic sent on pg0 interface should match high priority
188 # rule and should be sent out on pg1 interface.
189 self.create_interfaces(2)
191 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
192 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
193 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
194 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
195 self.spd_create_and_intf_add(1, [self.pg1])
196 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
203 policy_type="bypass",
205 local_ip_start=s_ip_s,
206 local_ip_stop=s_ip_e,
207 remote_ip_start=d_ip_s,
208 remote_ip_stop=d_ip_e,
210 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
217 policy_type="discard",
219 local_ip_start=s_ip_s,
220 local_ip_stop=s_ip_e,
221 remote_ip_start=d_ip_s,
222 remote_ip_stop=d_ip_e,
225 # create the packet stream
226 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
227 # add the stream to the source interface + enable capture
228 self.pg0.add_stream(packets)
229 self.pg0.enable_capture()
230 self.pg1.enable_capture()
231 # start the packet generator
234 capture = self.pg1.get_capture()
235 for packet in capture:
237 self.logger.debug(ppp("SPD - Got packet:", packet))
239 self.logger.error(ppp("Unexpected or invalid packet:", packet))
241 self.logger.debug("SPD: Num packets: %s", len(capture.res))
243 # assert nothing captured on pg0
244 self.pg0.assert_nothing_captured()
245 # verify captured packets
246 self.verify_capture(self.pg0, self.pg1, capture)
247 # verify all policies matched the expected number of times
248 self.verify_policy_match(pkt_count, policy_0)
249 self.verify_policy_match(0, policy_1)
252 class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
253 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
254 (add all ips range rule)"""
256 def test_ipsec_spd_outbound_add(self):
257 # In this test case, packets in IPv4 FWD path are configured
258 # to go through IPSec outbound SPD policy lookup.
259 # 2 SPD rules (1 HIGH and 1 LOW) are added.
260 # High priority rule action is set to BYPASS.
261 # Low priority rule action is set to DISCARD.
262 # Traffic sent on pg0 interface should match high priority
263 # rule and should be sent out on pg1 interface.
264 # in this test we define ranges of ports and ip addresses.
265 self.create_interfaces(2)
268 s_port_e = 1000 + 1023
270 d_port_e = 5000 + 1023
272 s_ip_s = ipaddress.ip_address(
273 int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
275 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
276 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
277 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
278 self.spd_create_and_intf_add(1, [self.pg1])
279 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
286 policy_type="bypass",
288 local_ip_start=s_ip_s,
289 local_ip_stop=s_ip_e,
290 remote_ip_start=d_ip_s,
291 remote_ip_stop=d_ip_e,
292 local_port_start=s_port_s,
293 local_port_stop=s_port_e,
294 remote_port_start=d_port_s,
295 remote_port_stop=d_port_e,
297 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
304 policy_type="discard",
306 local_ip_start=s_ip_s,
307 local_ip_stop=s_ip_e,
308 remote_ip_start=d_ip_s,
309 remote_ip_stop=d_ip_e,
310 local_port_start=s_port_s,
311 local_port_stop=s_port_e,
312 remote_port_start=d_port_s,
313 remote_port_stop=d_port_e,
316 # create the packet stream
317 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
318 # add the stream to the source interface + enable capture
319 self.pg0.add_stream(packets)
320 self.pg0.enable_capture()
321 self.pg1.enable_capture()
322 # start the packet generator
325 capture = self.pg1.get_capture()
326 for packet in capture:
328 self.logger.debug(ppp("SPD - Got packet:", packet))
330 self.logger.error(ppp("Unexpected or invalid packet:", packet))
332 self.logger.debug("SPD: Num packets: %s", len(capture.res))
334 # assert nothing captured on pg0
335 self.pg0.assert_nothing_captured()
336 # verify captured packets
337 self.verify_capture(self.pg0, self.pg1, capture)
338 # verify all policies matched the expected number of times
339 self.verify_policy_match(pkt_count, policy_0)
340 self.verify_policy_match(0, policy_1)
343 class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
344 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
345 (add all ips ports rule)"""
347 def test_ipsec_spd_outbound_add(self):
348 # In this test case, packets in IPv4 FWD path are configured
349 # to go through IPSec outbound SPD policy lookup.
350 # 2 SPD rules (1 HIGH and 1 LOW) are added.
351 # Low priority rule action is set to BYPASS all ips.
352 # High priority rule action is set to DISCARD all ips.
353 # Traffic sent on pg0 interface when LOW priority rule is added,
354 # expect the packet is being sent out to pg1. Then HIGH priority
355 # rule is added and send the same traffic to pg0, this time expect
356 # the traffic is dropped.
357 self.create_interfaces(2)
359 self.spd_create_and_intf_add(1, [self.pg1])
360 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
367 policy_type="bypass",
371 # create the packet stream
372 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
373 # add the stream to the source interface + enable capture
374 self.pg0.add_stream(packets)
375 self.pg0.enable_capture()
376 self.pg1.enable_capture()
377 # start the packet generator
380 capture = self.pg1.get_capture()
381 for packet in capture:
383 self.logger.debug(ppp("SPD - Got packet:", packet))
385 self.logger.error(ppp("Unexpected or invalid packet:", packet))
387 self.logger.debug("SPD: Num packets: %s", len(capture.res))
389 # assert nothing captured on pg0
390 self.pg0.assert_nothing_captured()
391 # verify captured packets
392 self.verify_capture(self.pg0, self.pg1, capture)
393 # verify all policies matched the expected number of times
394 self.verify_policy_match(pkt_count, policy_0)
396 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
403 policy_type="discard",
407 # create the packet stream
408 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
409 # add the stream to the source interface + enable capture
410 self.pg0.add_stream(packets)
411 self.pg0.enable_capture()
412 self.pg1.enable_capture()
413 # start the packet generator
415 # assert nothing captured on pg0 and pg1
416 self.pg0.assert_nothing_captured()
417 self.pg1.assert_nothing_captured()
420 class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
421 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
424 def test_ipsec_spd_outbound_remove(self):
425 # In this test case, packets in IPv4 FWD path are configured
426 # to go through IPSec outbound SPD policy lookup.
427 # 2 SPD rules (1 HIGH and 1 LOW) are added.
428 # High priority rule action is set to BYPASS.
429 # Low priority rule action is set to DISCARD.
430 # High priority rule is then removed.
431 # Traffic sent on pg0 interface should match low priority
432 # rule and should be discarded after SPD lookup.
433 self.create_interfaces(2)
435 self.spd_create_and_intf_add(1, [self.pg1])
436 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
443 policy_type="bypass",
445 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
452 policy_type="discard",
455 # create the packet stream
456 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
457 # add the stream to the source interface + enable capture
458 self.pg0.add_stream(packets)
459 self.pg0.enable_capture()
460 self.pg1.enable_capture()
461 # start the packet generator
464 capture = self.pg1.get_capture()
465 for packet in capture:
467 self.logger.debug(ppp("SPD - Got packet:", packet))
469 self.logger.error(ppp("Unexpected or invalid packet:", packet))
472 # assert nothing captured on pg0
473 self.pg0.assert_nothing_captured()
474 # verify capture on pg1
475 self.logger.debug("SPD: Num packets: %s", len(capture.res))
476 self.verify_capture(self.pg0, self.pg1, capture)
477 # verify all policies matched the expected number of times
478 self.verify_policy_match(pkt_count, policy_0)
479 self.verify_policy_match(0, policy_1)
480 # now remove the bypass rule
481 self.spd_add_rem_policy( # outbound, priority 10
488 policy_type="bypass",
492 # resend the same packets
493 self.pg0.add_stream(packets)
494 self.pg0.enable_capture() # flush the old captures
495 self.pg1.enable_capture()
497 # assert nothing captured on pg0
498 self.pg0.assert_nothing_captured()
499 # all packets will be dropped by SPD rule
500 self.pg1.assert_nothing_captured()
501 # verify all policies matched the expected number of times
502 self.verify_policy_match(pkt_count, policy_0)
503 self.verify_policy_match(pkt_count, policy_1)
506 class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
507 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
508 (add, remove, re-add)"""
510 def test_ipsec_spd_outbound_readd(self):
511 # In this test case, packets in IPv4 FWD path are configured
512 # to go through IPSec outbound SPD policy lookup.
513 # 2 SPD rules (1 HIGH and 1 LOW) are added.
514 # High priority rule action is set to BYPASS.
515 # Low priority rule action is set to DISCARD.
516 # Traffic sent on pg0 interface should match high priority
517 # rule and should be sent out on pg1 interface.
518 # High priority rule is then removed.
519 # Traffic sent on pg0 interface should match low priority
520 # rule and should be discarded after SPD lookup.
521 # Readd high priority rule.
522 # Traffic sent on pg0 interface should match high priority
523 # rule and should be sent out on pg1 interface.
524 self.create_interfaces(2)
526 self.spd_create_and_intf_add(1, [self.pg1])
527 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
534 policy_type="bypass",
536 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
543 policy_type="discard",
546 # create the packet stream
547 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
548 # add the stream to the source interface + enable capture
549 self.pg0.add_stream(packets)
550 self.pg0.enable_capture()
551 self.pg1.enable_capture()
552 # start the packet generator
555 capture = self.pg1.get_capture()
556 for packet in capture:
558 self.logger.debug(ppp("SPD - Got packet:", packet))
560 self.logger.error(ppp("Unexpected or invalid packet:", packet))
562 self.logger.debug("SPD: Num packets: %s", len(capture.res))
564 # assert nothing captured on pg0
565 self.pg0.assert_nothing_captured()
566 # verify capture on pg1
567 self.verify_capture(self.pg0, self.pg1, capture)
568 # verify all policies matched the expected number of times
569 self.verify_policy_match(pkt_count, policy_0)
570 self.verify_policy_match(0, policy_1)
571 # remove the bypass rule, leaving only the discard rule
572 self.spd_add_rem_policy( # outbound, priority 10
579 policy_type="bypass",
583 # resend the same packets
584 self.pg0.add_stream(packets)
585 self.pg0.enable_capture() # flush the old captures
586 self.pg1.enable_capture()
589 # assert nothing captured on pg0
590 self.pg0.assert_nothing_captured()
591 # all packets will be dropped by SPD rule
592 self.pg1.assert_nothing_captured()
593 # verify all policies matched the expected number of times
594 self.verify_policy_match(pkt_count, policy_0)
595 self.verify_policy_match(pkt_count, policy_1)
597 # now readd the bypass rule
598 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
605 policy_type="bypass",
608 # resend the same packets
609 self.pg0.add_stream(packets)
610 self.pg0.enable_capture() # flush the old captures
611 self.pg1.enable_capture()
615 capture = self.pg1.get_capture(pkt_count)
616 for packet in capture:
618 self.logger.debug(ppp("SPD - Got packet:", packet))
620 self.logger.error(ppp("Unexpected or invalid packet:", packet))
622 self.logger.debug("SPD: Num packets: %s", len(capture.res))
624 # assert nothing captured on pg0
625 self.pg0.assert_nothing_captured()
626 # verify captured packets
627 self.verify_capture(self.pg0, self.pg1, capture)
628 # verify all policies matched the expected number of times
629 self.verify_policy_match(pkt_count, policy_0)
630 self.verify_policy_match(pkt_count, policy_1)
633 class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
634 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
635 (multiple interfaces, multiple rules)"""
637 def test_ipsec_spd_outbound_multiple(self):
638 # In this test case, packets in IPv4 FWD path are configured to go
639 # through IPSec outbound SPD policy lookup.
640 # Multiples rules on multiple interfaces are tested at the same time.
641 # 3x interfaces are configured, binding the same SPD to each.
642 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
643 # On pg0 & pg1, the BYPASS rule is HIGH priority
644 # On pg2, the DISCARD rule is HIGH priority
645 # Traffic should be received on pg0 & pg1 and dropped on pg2.
646 self.create_interfaces(3)
648 # bind SPD to all interfaces
649 self.spd_create_and_intf_add(1, self.pg_interfaces)
650 # add rules on all interfaces
651 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
658 policy_type="bypass",
660 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
667 policy_type="discard",
670 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
677 policy_type="bypass",
679 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
686 policy_type="discard",
689 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
696 policy_type="bypass",
698 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
705 policy_type="discard",
708 # interfaces bound to an SPD, will by default drop inbound
709 # traffic with no matching policies. add catch-all inbound
710 # bypass rule to SPD:
711 self.spd_add_rem_policy( # inbound, all interfaces
718 policy_type="bypass",
722 # create the packet streams
723 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
724 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
725 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
726 # add the streams to the source interfaces
727 self.pg0.add_stream(packets0)
728 self.pg1.add_stream(packets1)
729 self.pg2.add_stream(packets2)
730 # enable capture on all interfaces
731 for pg in self.pg_interfaces:
733 # start the packet generator
738 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
739 if_caps.append(pg.get_capture())
740 for packet in if_caps[-1]:
742 self.logger.debug(ppp("SPD - Got packet:", packet))
744 self.logger.error(ppp("Unexpected or invalid packet:", packet))
746 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
747 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
749 # verify captures that matched BYPASS rule
750 self.verify_capture(self.pg0, self.pg1, if_caps[0])
751 self.verify_capture(self.pg1, self.pg2, if_caps[1])
752 # verify that traffic to pg0 matched DISCARD rule and was dropped
753 self.pg0.assert_nothing_captured()
754 # verify all packets that were expected to match rules, matched
756 self.verify_policy_match(pkt_count, policy_01)
757 self.verify_policy_match(0, policy_02)
759 self.verify_policy_match(pkt_count, policy_11)
760 self.verify_policy_match(0, policy_12)
762 self.verify_policy_match(0, policy_21)
763 self.verify_policy_match(pkt_count, policy_22)
766 class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
767 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
770 def test_ipsec_spd_outbound_add(self):
771 # In this test case, packets in IPv4 FWD path are configured
772 # to go through IPSec outbound SPD policy lookup.
773 # 2 SPD rules (1 HIGH and 1 LOW) are added.
774 # High priority rule action is set to BYPASS.
775 # Low priority rule action is set to DISCARD.
776 # Traffic sent on pg0 interface should match high priority
777 # rule and should be sent out on pg1 interface.
778 self.create_interfaces(2)
784 self.spd_create_and_intf_add(1, [self.pg1])
785 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
792 policy_type="bypass",
793 local_port_start=s_port_s,
794 local_port_stop=s_port_e,
795 remote_port_start=d_port_s,
796 remote_port_stop=d_port_e,
798 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
805 policy_type="discard",
806 local_port_start=s_port_s,
807 local_port_stop=s_port_e,
808 remote_port_start=d_port_s,
809 remote_port_stop=d_port_e,
812 # create the packet stream
813 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
814 # add the stream to the source interface + enable capture
815 self.pg0.add_stream(packets)
816 self.pg0.enable_capture()
817 self.pg1.enable_capture()
818 # start the packet generator
821 capture = self.pg1.get_capture()
822 for packet in capture:
824 self.logger.debug(ppp("SPD - Got packet:", packet))
826 self.logger.error(ppp("Unexpected or invalid packet:", packet))
828 self.logger.debug("SPD: Num packets: %s", len(capture.res))
830 # assert nothing captured on pg0
831 self.pg0.assert_nothing_captured()
832 # verify captured packets
833 self.verify_capture(self.pg0, self.pg1, capture)
834 # verify all policies matched the expected number of times
835 self.verify_policy_match(pkt_count, policy_0)
836 self.verify_policy_match(0, policy_1)
839 class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
840 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
841 (add all ips ports rule)"""
843 def test_ipsec_spd_outbound_add(self):
844 # In this test case, packets in IPv4 FWD path are configured
845 # to go through IPSec outbound SPD policy lookup.
846 # 2 SPD rules (1 HIGH and 1 LOW) are added.
847 # Low priority rule action is set to BYPASS all ips.
848 # High priority rule action is set to DISCARD all ips.
849 # Traffic sent on pg0 interface when LOW priority rule is added,
850 # expect the packet is being sent out to pg1. Then HIGH priority
851 # rule is added and send the same traffic to pg0, this time expect
852 # the traffic is dropped.
853 self.create_interfaces(2)
855 self.spd_create_and_intf_add(1, [self.pg1])
856 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
863 policy_type="bypass",
867 # create the packet stream
868 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
869 # add the stream to the source interface + enable capture
870 self.pg0.add_stream(packets)
871 self.pg0.enable_capture()
872 self.pg1.enable_capture()
873 # start the packet generator
876 capture = self.pg1.get_capture()
877 for packet in capture:
879 self.logger.debug(ppp("SPD - Got packet:", packet))
881 self.logger.error(ppp("Unexpected or invalid packet:", packet))
883 self.logger.debug("SPD: Num packets: %s", len(capture.res))
885 # assert nothing captured on pg0
886 self.pg0.assert_nothing_captured()
887 # verify captured packets
888 self.verify_capture(self.pg0, self.pg1, capture)
889 # verify all policies matched the expected number of times
890 self.verify_policy_match(pkt_count, policy_0)
892 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
899 policy_type="discard",
903 # create the packet stream
904 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
905 # add the stream to the source interface + enable capture
906 self.pg0.add_stream(packets)
907 self.pg0.enable_capture()
908 self.pg1.enable_capture()
909 # start the packet generator
911 # assert nothing captured on pg0 and pg1
912 self.pg0.assert_nothing_captured()
913 self.pg1.assert_nothing_captured()
916 class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
917 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
918 (add all ips port range rule)"""
920 def test_ipsec_spd_outbound_add(self):
921 # In this test case, packets in IPv4 FWD path are configured
922 # to go through IPSec outbound SPD policy lookup.
923 # 2 SPD rules (1 HIGH and 1 LOW) are added.
924 # High priority rule action is set to BYPASS.
925 # Low priority rule action is set to DISCARD.
926 # Traffic sent on pg0 interface should match high priority
927 # rule and should be sent out on pg1 interface.
928 self.create_interfaces(2)
934 self.spd_create_and_intf_add(1, [self.pg1])
935 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
942 policy_type="bypass",
944 local_port_start=s_port_s,
945 local_port_stop=s_port_e,
946 remote_port_start=d_port_s,
947 remote_port_stop=d_port_e,
949 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
956 policy_type="discard",
958 local_port_start=s_port_s,
959 local_port_stop=s_port_e,
960 remote_port_start=d_port_s,
961 remote_port_stop=d_port_e,
964 # create the packet stream
965 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
966 # add the stream to the source interface + enable capture
967 self.pg0.add_stream(packets)
968 self.pg0.enable_capture()
969 self.pg1.enable_capture()
970 # start the packet generator
973 capture = self.pg1.get_capture()
974 for packet in capture:
976 self.logger.debug(ppp("SPD - Got packet:", packet))
978 self.logger.error(ppp("Unexpected or invalid packet:", packet))
980 self.logger.debug("SPD: Num packets: %s", len(capture.res))
982 # assert nothing captured on pg0
983 self.pg0.assert_nothing_captured()
984 # verify captured packets
985 self.verify_capture(self.pg0, self.pg1, capture)
986 # verify all policies matched the expected number of times
987 self.verify_policy_match(pkt_count, policy_0)
988 self.verify_policy_match(0, policy_1)
991 class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
992 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
993 (add ips range with any port rule)"""
995 def test_ipsec_spd_outbound_add(self):
996 # In this test case, packets in IPv4 FWD path are configured
997 # to go through IPSec outbound SPD policy lookup.
998 # 2 SPD rules (1 HIGH and 1 LOW) are added.
999 # High priority rule action is set to BYPASS.
1000 # Low priority rule action is set to DISCARD.
1001 # Traffic sent on pg0 interface should match high priority
1002 # rule and should be sent out on pg1 interface.
1003 self.create_interfaces(2)
1005 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
1006 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
1007 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1008 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
1009 self.spd_create_and_intf_add(1, [self.pg1])
1010 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1017 policy_type="bypass",
1019 local_ip_start=s_ip_s,
1020 local_ip_stop=s_ip_e,
1021 remote_ip_start=d_ip_s,
1022 remote_ip_stop=d_ip_e,
1024 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1031 policy_type="discard",
1033 local_ip_start=s_ip_s,
1034 local_ip_stop=s_ip_e,
1035 remote_ip_start=d_ip_s,
1036 remote_ip_stop=d_ip_e,
1039 # create the packet stream
1040 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1041 # add the stream to the source interface + enable capture
1042 self.pg0.add_stream(packets)
1043 self.pg0.enable_capture()
1044 self.pg1.enable_capture()
1045 # start the packet generator
1048 capture = self.pg1.get_capture()
1049 for packet in capture:
1051 self.logger.debug(ppp("SPD - Got packet:", packet))
1053 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1055 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1057 # assert nothing captured on pg0
1058 self.pg0.assert_nothing_captured()
1059 # verify captured packets
1060 self.verify_capture(self.pg0, self.pg1, capture)
1061 # verify all policies matched the expected number of times
1062 self.verify_policy_match(pkt_count, policy_0)
1063 self.verify_policy_match(0, policy_1)
1066 class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
1067 """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
1068 (add all ips range rule)"""
1070 def test_ipsec_spd_outbound_add(self):
1071 # In this test case, packets in IPv4 FWD path are configured
1072 # to go through IPSec outbound SPD policy lookup.
1073 # 2 SPD rules (1 HIGH and 1 LOW) are added.
1074 # High priority rule action is set to BYPASS.
1075 # Low priority rule action is set to DISCARD.
1076 # Traffic sent on pg0 interface should match high priority
1077 # rule and should be sent out on pg1 interface.
1078 # in this test we define ranges of ports and ip addresses.
1079 self.create_interfaces(2)
1082 s_port_e = 1000 + 1023
1084 d_port_e = 5000 + 1023
1086 s_ip_s = ipaddress.ip_address(
1087 int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
1089 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
1090 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1091 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
1092 self.spd_create_and_intf_add(1, [self.pg1])
1093 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1100 policy_type="bypass",
1102 local_ip_start=s_ip_s,
1103 local_ip_stop=s_ip_e,
1104 remote_ip_start=d_ip_s,
1105 remote_ip_stop=d_ip_e,
1106 local_port_start=s_port_s,
1107 local_port_stop=s_port_e,
1108 remote_port_start=d_port_s,
1109 remote_port_stop=d_port_e,
1111 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1118 policy_type="discard",
1120 local_ip_start=s_ip_s,
1121 local_ip_stop=s_ip_e,
1122 remote_ip_start=d_ip_s,
1123 remote_ip_stop=d_ip_e,
1124 local_port_start=s_port_s,
1125 local_port_stop=s_port_e,
1126 remote_port_start=d_port_s,
1127 remote_port_stop=d_port_e,
1130 # create the packet stream
1131 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1132 # add the stream to the source interface + enable capture
1133 self.pg0.add_stream(packets)
1134 self.pg0.enable_capture()
1135 self.pg1.enable_capture()
1136 # start the packet generator
1139 capture = self.pg1.get_capture()
1140 for packet in capture:
1142 self.logger.debug(ppp("SPD - Got packet:", packet))
1144 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1146 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1148 # assert nothing captured on pg0
1149 self.pg0.assert_nothing_captured()
1150 # verify captured packets
1151 self.verify_capture(self.pg0, self.pg1, capture)
1152 # verify all policies matched the expected number of times
1153 self.verify_policy_match(pkt_count, policy_0)
1154 self.verify_policy_match(0, policy_1)
1157 class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
1158 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1159 (add, remove, re-add)"""
1161 def test_ipsec_spd_outbound_readd(self):
1162 # In this test case, packets in IPv4 FWD path are configured
1163 # to go through IPSec outbound SPD policy lookup.
1164 # 2 SPD rules (1 HIGH and 1 LOW) are added.
1165 # High priority rule action is set to BYPASS.
1166 # Low priority rule action is set to DISCARD.
1167 # Traffic sent on pg0 interface should match high priority
1168 # rule and should be sent out on pg1 interface.
1169 # High priority rule is then removed.
1170 # Traffic sent on pg0 interface should match low priority
1171 # rule and should be discarded after SPD lookup.
1172 # Readd high priority rule.
1173 # Traffic sent on pg0 interface should match high priority
1174 # rule and should be sent out on pg1 interface.
1175 self.create_interfaces(2)
1177 self.spd_create_and_intf_add(1, [self.pg1])
1178 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1185 policy_type="bypass",
1187 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1194 policy_type="discard",
1197 # create the packet stream
1198 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1199 # add the stream to the source interface + enable capture
1200 self.pg0.add_stream(packets)
1201 self.pg0.enable_capture()
1202 self.pg1.enable_capture()
1203 # start the packet generator
1206 capture = self.pg1.get_capture()
1207 for packet in capture:
1209 self.logger.debug(ppp("SPD - Got packet:", packet))
1211 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1213 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1215 # assert nothing captured on pg0
1216 self.pg0.assert_nothing_captured()
1217 # verify capture on pg1
1218 self.verify_capture(self.pg0, self.pg1, capture)
1219 # verify all policies matched the expected number of times
1220 self.verify_policy_match(pkt_count, policy_0)
1221 self.verify_policy_match(0, policy_1)
1222 # remove the bypass rule, leaving only the discard rule
1223 self.spd_add_rem_policy( # outbound, priority 10
1230 policy_type="bypass",
1234 # resend the same packets
1235 self.pg0.add_stream(packets)
1236 self.pg0.enable_capture() # flush the old captures
1237 self.pg1.enable_capture()
1240 # assert nothing captured on pg0
1241 self.pg0.assert_nothing_captured()
1242 # all packets will be dropped by SPD rule
1243 self.pg1.assert_nothing_captured()
1244 # verify all policies matched the expected number of times
1245 self.verify_policy_match(pkt_count, policy_0)
1246 self.verify_policy_match(pkt_count, policy_1)
1248 # now readd the bypass rule
1249 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1256 policy_type="bypass",
1259 # resend the same packets
1260 self.pg0.add_stream(packets)
1261 self.pg0.enable_capture() # flush the old captures
1262 self.pg1.enable_capture()
1266 capture = self.pg1.get_capture(pkt_count)
1267 for packet in capture:
1269 self.logger.debug(ppp("SPD - Got packet:", packet))
1271 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1273 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1275 # assert nothing captured on pg0
1276 self.pg0.assert_nothing_captured()
1277 # verify captured packets
1278 self.verify_capture(self.pg0, self.pg1, capture)
1279 # verify all policies matched the expected number of times
1280 self.verify_policy_match(pkt_count, policy_0)
1281 self.verify_policy_match(pkt_count, policy_1)
1284 class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
1285 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1286 (multiple interfaces, multiple rules)"""
1288 def test_ipsec_spd_outbound_multiple(self):
1289 # In this test case, packets in IPv4 FWD path are configured to go
1290 # through IPSec outbound SPD policy lookup.
1291 # Multiples rules on multiple interfaces are tested at the same time.
1292 # 3x interfaces are configured, binding the same SPD to each.
1293 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
1294 # On pg0 & pg1, the BYPASS rule is HIGH priority
1295 # On pg2, the DISCARD rule is HIGH priority
1296 # Traffic should be received on pg0 & pg1 and dropped on pg2.
1297 self.create_interfaces(3)
1299 # bind SPD to all interfaces
1300 self.spd_create_and_intf_add(1, self.pg_interfaces)
1301 # add rules on all interfaces
1302 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
1309 policy_type="bypass",
1311 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
1318 policy_type="discard",
1321 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
1328 policy_type="bypass",
1330 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
1337 policy_type="discard",
1340 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
1347 policy_type="bypass",
1349 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
1356 policy_type="discard",
1359 # interfaces bound to an SPD, will by default drop inbound
1360 # traffic with no matching policies. add catch-all inbound
1361 # bypass rule to SPD:
1362 self.spd_add_rem_policy( # inbound, all interfaces
1369 policy_type="bypass",
1373 # create the packet streams
1374 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
1375 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
1376 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
1377 # add the streams to the source interfaces
1378 self.pg0.add_stream(packets0)
1379 self.pg1.add_stream(packets1)
1380 self.pg2.add_stream(packets2)
1381 # enable capture on all interfaces
1382 for pg in self.pg_interfaces:
1384 # start the packet generator
1389 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
1390 if_caps.append(pg.get_capture())
1391 for packet in if_caps[-1]:
1393 self.logger.debug(ppp("SPD - Got packet:", packet))
1395 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1397 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
1398 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
1400 # verify captures that matched BYPASS rule
1401 self.verify_capture(self.pg0, self.pg1, if_caps[0])
1402 self.verify_capture(self.pg1, self.pg2, if_caps[1])
1403 # verify that traffic to pg0 matched DISCARD rule and was dropped
1404 self.pg0.assert_nothing_captured()
1405 # verify all packets that were expected to match rules, matched
1407 self.verify_policy_match(pkt_count, policy_01)
1408 self.verify_policy_match(0, policy_02)
1410 self.verify_policy_match(pkt_count, policy_11)
1411 self.verify_policy_match(0, policy_12)
1413 self.verify_policy_match(0, policy_21)
1414 self.verify_policy_match(pkt_count, policy_22)
1417 if __name__ == "__main__":
1418 unittest.main(testRunner=VppTestRunner)