6 from framework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
10 class SpdFastPathOutbound(IPSecIPv4Fwd):
11 # Override setUpConstants to enable outbound fast path in config
13 def setUpConstants(cls):
14 super(SpdFastPathOutbound, cls).setUpConstants()
15 cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
16 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
19 class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
20 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
23 def test_ipsec_spd_outbound_add(self):
24 # In this test case, packets in IPv4 FWD path are configured
25 # to go through IPSec outbound SPD policy lookup.
26 # 2 SPD rules (1 HIGH and 1 LOW) are added.
27 # High priority rule action is set to BYPASS.
28 # Low priority rule action is set to DISCARD.
29 # Traffic sent on pg0 interface should match high priority
30 # rule and should be sent out on pg1 interface.
31 self.create_interfaces(2)
37 self.spd_create_and_intf_add(1, [self.pg1])
38 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
46 local_port_start=s_port_s,
47 local_port_stop=s_port_e,
48 remote_port_start=d_port_s,
49 remote_port_stop=d_port_e,
51 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
58 policy_type="discard",
59 local_port_start=s_port_s,
60 local_port_stop=s_port_e,
61 remote_port_start=d_port_s,
62 remote_port_stop=d_port_e,
65 # create the packet stream
66 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
67 # add the stream to the source interface + enable capture
68 self.pg0.add_stream(packets)
69 self.pg0.enable_capture()
70 self.pg1.enable_capture()
71 # start the packet generator
74 capture = self.pg1.get_capture()
75 for packet in capture:
77 self.logger.debug(ppp("SPD - Got packet:", packet))
79 self.logger.error(ppp("Unexpected or invalid packet:", packet))
81 self.logger.debug("SPD: Num packets: %s", len(capture.res))
83 # assert nothing captured on pg0
84 self.pg0.assert_nothing_captured()
85 # verify captured packets
86 self.verify_capture(self.pg0, self.pg1, capture)
87 # verify all policies matched the expected number of times
88 self.verify_policy_match(pkt_count, policy_0)
89 self.verify_policy_match(0, policy_1)
92 class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
93 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
94 (add all ips port range rule)"""
96 def test_ipsec_spd_outbound_add(self):
97 # In this test case, packets in IPv4 FWD path are configured
98 # to go through IPSec outbound SPD policy lookup.
99 # 2 SPD rules (1 HIGH and 1 LOW) are added.
100 # High priority rule action is set to BYPASS.
101 # Low priority rule action is set to DISCARD.
102 # Traffic sent on pg0 interface should match high priority
103 # rule and should be sent out on pg1 interface.
104 self.create_interfaces(2)
110 self.spd_create_and_intf_add(1, [self.pg1])
111 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
118 policy_type="bypass",
120 local_port_start=s_port_s,
121 local_port_stop=s_port_e,
122 remote_port_start=d_port_s,
123 remote_port_stop=d_port_e,
125 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
132 policy_type="discard",
134 local_port_start=s_port_s,
135 local_port_stop=s_port_e,
136 remote_port_start=d_port_s,
137 remote_port_stop=d_port_e,
140 # create the packet stream
141 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
142 # add the stream to the source interface + enable capture
143 self.pg0.add_stream(packets)
144 self.pg0.enable_capture()
145 self.pg1.enable_capture()
146 # start the packet generator
149 capture = self.pg1.get_capture()
150 for packet in capture:
152 self.logger.debug(ppp("SPD - Got packet:", packet))
154 self.logger.error(ppp("Unexpected or invalid packet:", packet))
156 self.logger.debug("SPD: Num packets: %s", len(capture.res))
158 # assert nothing captured on pg0
159 self.pg0.assert_nothing_captured()
160 # verify captured packets
161 self.verify_capture(self.pg0, self.pg1, capture)
162 # verify all policies matched the expected number of times
163 self.verify_policy_match(pkt_count, policy_0)
164 self.verify_policy_match(0, policy_1)
167 class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
168 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
169 (add ips range with any port rule)"""
171 def test_ipsec_spd_outbound_add(self):
172 # In this test case, packets in IPv4 FWD path are configured
173 # to go through IPSec outbound SPD policy lookup.
174 # 2 SPD rules (1 HIGH and 1 LOW) are added.
175 # High priority rule action is set to BYPASS.
176 # Low priority rule action is set to DISCARD.
177 # Traffic sent on pg0 interface should match high priority
178 # rule and should be sent out on pg1 interface.
179 self.create_interfaces(2)
181 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
182 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
183 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
184 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
185 self.spd_create_and_intf_add(1, [self.pg1])
186 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
193 policy_type="bypass",
195 local_ip_start=s_ip_s,
196 local_ip_stop=s_ip_e,
197 remote_ip_start=d_ip_s,
198 remote_ip_stop=d_ip_e,
200 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
207 policy_type="discard",
209 local_ip_start=s_ip_s,
210 local_ip_stop=s_ip_e,
211 remote_ip_start=d_ip_s,
212 remote_ip_stop=d_ip_e,
215 # create the packet stream
216 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
217 # add the stream to the source interface + enable capture
218 self.pg0.add_stream(packets)
219 self.pg0.enable_capture()
220 self.pg1.enable_capture()
221 # start the packet generator
224 capture = self.pg1.get_capture()
225 for packet in capture:
227 self.logger.debug(ppp("SPD - Got packet:", packet))
229 self.logger.error(ppp("Unexpected or invalid packet:", packet))
231 self.logger.debug("SPD: Num packets: %s", len(capture.res))
233 # assert nothing captured on pg0
234 self.pg0.assert_nothing_captured()
235 # verify captured packets
236 self.verify_capture(self.pg0, self.pg1, capture)
237 # verify all policies matched the expected number of times
238 self.verify_policy_match(pkt_count, policy_0)
239 self.verify_policy_match(0, policy_1)
242 class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
243 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
244 (add all ips range rule)"""
246 def test_ipsec_spd_outbound_add(self):
247 # In this test case, packets in IPv4 FWD path are configured
248 # to go through IPSec outbound SPD policy lookup.
249 # 2 SPD rules (1 HIGH and 1 LOW) are added.
250 # High priority rule action is set to BYPASS.
251 # Low priority rule action is set to DISCARD.
252 # Traffic sent on pg0 interface should match high priority
253 # rule and should be sent out on pg1 interface.
254 # in this test we define ranges of ports and ip addresses.
255 self.create_interfaces(2)
258 s_port_e = 1000 + 1023
260 d_port_e = 5000 + 1023
262 s_ip_s = ipaddress.ip_address(
263 int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
265 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
266 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
267 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
268 self.spd_create_and_intf_add(1, [self.pg1])
269 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
276 policy_type="bypass",
278 local_ip_start=s_ip_s,
279 local_ip_stop=s_ip_e,
280 remote_ip_start=d_ip_s,
281 remote_ip_stop=d_ip_e,
282 local_port_start=s_port_s,
283 local_port_stop=s_port_e,
284 remote_port_start=d_port_s,
285 remote_port_stop=d_port_e,
287 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
294 policy_type="discard",
296 local_ip_start=s_ip_s,
297 local_ip_stop=s_ip_e,
298 remote_ip_start=d_ip_s,
299 remote_ip_stop=d_ip_e,
300 local_port_start=s_port_s,
301 local_port_stop=s_port_e,
302 remote_port_start=d_port_s,
303 remote_port_stop=d_port_e,
306 # create the packet stream
307 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
308 # add the stream to the source interface + enable capture
309 self.pg0.add_stream(packets)
310 self.pg0.enable_capture()
311 self.pg1.enable_capture()
312 # start the packet generator
315 capture = self.pg1.get_capture()
316 for packet in capture:
318 self.logger.debug(ppp("SPD - Got packet:", packet))
320 self.logger.error(ppp("Unexpected or invalid packet:", packet))
322 self.logger.debug("SPD: Num packets: %s", len(capture.res))
324 # assert nothing captured on pg0
325 self.pg0.assert_nothing_captured()
326 # verify captured packets
327 self.verify_capture(self.pg0, self.pg1, capture)
328 # verify all policies matched the expected number of times
329 self.verify_policy_match(pkt_count, policy_0)
330 self.verify_policy_match(0, policy_1)
333 class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
334 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
335 (add all ips ports rule)"""
337 def test_ipsec_spd_outbound_add(self):
338 # In this test case, packets in IPv4 FWD path are configured
339 # to go through IPSec outbound SPD policy lookup.
340 # 2 SPD rules (1 HIGH and 1 LOW) are added.
341 # Low priority rule action is set to BYPASS all ips.
342 # High priority rule action is set to DISCARD all ips.
343 # Traffic sent on pg0 interface when LOW priority rule is added,
344 # expect the packet is being sent out to pg1. Then HIGH priority
345 # rule is added and send the same traffic to pg0, this time expect
346 # the traffic is dropped.
347 self.create_interfaces(2)
349 self.spd_create_and_intf_add(1, [self.pg1])
350 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
357 policy_type="bypass",
361 # create the packet stream
362 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
363 # add the stream to the source interface + enable capture
364 self.pg0.add_stream(packets)
365 self.pg0.enable_capture()
366 self.pg1.enable_capture()
367 # start the packet generator
370 capture = self.pg1.get_capture()
371 for packet in capture:
373 self.logger.debug(ppp("SPD - Got packet:", packet))
375 self.logger.error(ppp("Unexpected or invalid packet:", packet))
377 self.logger.debug("SPD: Num packets: %s", len(capture.res))
379 # assert nothing captured on pg0
380 self.pg0.assert_nothing_captured()
381 # verify captured packets
382 self.verify_capture(self.pg0, self.pg1, capture)
383 # verify all policies matched the expected number of times
384 self.verify_policy_match(pkt_count, policy_0)
386 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
393 policy_type="discard",
397 # create the packet stream
398 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
399 # add the stream to the source interface + enable capture
400 self.pg0.add_stream(packets)
401 self.pg0.enable_capture()
402 self.pg1.enable_capture()
403 # start the packet generator
405 # assert nothing captured on pg0 and pg1
406 self.pg0.assert_nothing_captured()
407 self.pg1.assert_nothing_captured()
410 class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
411 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
414 def test_ipsec_spd_outbound_remove(self):
415 # In this test case, packets in IPv4 FWD path are configured
416 # to go through IPSec outbound SPD policy lookup.
417 # 2 SPD rules (1 HIGH and 1 LOW) are added.
418 # High priority rule action is set to BYPASS.
419 # Low priority rule action is set to DISCARD.
420 # High priority rule is then removed.
421 # Traffic sent on pg0 interface should match low priority
422 # rule and should be discarded after SPD lookup.
423 self.create_interfaces(2)
425 self.spd_create_and_intf_add(1, [self.pg1])
426 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
433 policy_type="bypass",
435 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
442 policy_type="discard",
445 # create the packet stream
446 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
447 # add the stream to the source interface + enable capture
448 self.pg0.add_stream(packets)
449 self.pg0.enable_capture()
450 self.pg1.enable_capture()
451 # start the packet generator
454 capture = self.pg1.get_capture()
455 for packet in capture:
457 self.logger.debug(ppp("SPD - Got packet:", packet))
459 self.logger.error(ppp("Unexpected or invalid packet:", packet))
462 # assert nothing captured on pg0
463 self.pg0.assert_nothing_captured()
464 # verify capture on pg1
465 self.logger.debug("SPD: Num packets: %s", len(capture.res))
466 self.verify_capture(self.pg0, self.pg1, capture)
467 # verify all policies matched the expected number of times
468 self.verify_policy_match(pkt_count, policy_0)
469 self.verify_policy_match(0, policy_1)
470 # check policy in SPD has been cached after traffic
471 # matched BYPASS rule in SPD
472 # now remove the bypass rule
473 self.spd_add_rem_policy( # outbound, priority 10
480 policy_type="bypass",
484 # resend the same packets
485 self.pg0.add_stream(packets)
486 self.pg0.enable_capture() # flush the old captures
487 self.pg1.enable_capture()
489 # assert nothing captured on pg0
490 self.pg0.assert_nothing_captured()
491 # all packets will be dropped by SPD rule
492 self.pg1.assert_nothing_captured()
493 # verify all policies matched the expected number of times
494 self.verify_policy_match(pkt_count, policy_0)
495 self.verify_policy_match(pkt_count, policy_1)
498 class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
499 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
500 (add, remove, re-add)"""
502 def test_ipsec_spd_outbound_readd(self):
503 # In this test case, packets in IPv4 FWD path are configured
504 # to go through IPSec outbound SPD policy lookup.
505 # 2 SPD rules (1 HIGH and 1 LOW) are added.
506 # High priority rule action is set to BYPASS.
507 # Low priority rule action is set to DISCARD.
508 # Traffic sent on pg0 interface should match high priority
509 # rule and should be sent out on pg1 interface.
510 # High priority rule is then removed.
511 # Traffic sent on pg0 interface should match low priority
512 # rule and should be discarded after SPD lookup.
513 # Readd high priority rule.
514 # Traffic sent on pg0 interface should match high priority
515 # rule and should be sent out on pg1 interface.
516 self.create_interfaces(2)
518 self.spd_create_and_intf_add(1, [self.pg1])
519 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
526 policy_type="bypass",
528 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
535 policy_type="discard",
538 # create the packet stream
539 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
540 # add the stream to the source interface + enable capture
541 self.pg0.add_stream(packets)
542 self.pg0.enable_capture()
543 self.pg1.enable_capture()
544 # start the packet generator
547 capture = self.pg1.get_capture()
548 for packet in capture:
550 self.logger.debug(ppp("SPD - Got packet:", packet))
552 self.logger.error(ppp("Unexpected or invalid packet:", packet))
554 self.logger.debug("SPD: Num packets: %s", len(capture.res))
556 # assert nothing captured on pg0
557 self.pg0.assert_nothing_captured()
558 # verify capture on pg1
559 self.verify_capture(self.pg0, self.pg1, capture)
560 # verify all policies matched the expected number of times
561 self.verify_policy_match(pkt_count, policy_0)
562 self.verify_policy_match(0, policy_1)
563 # check policy in SPD has been cached after traffic
564 # matched BYPASS rule in SPD
566 # now remove the bypass rule, leaving only the discard rule
567 self.spd_add_rem_policy( # outbound, priority 10
574 policy_type="bypass",
578 # resend the same packets
579 self.pg0.add_stream(packets)
580 self.pg0.enable_capture() # flush the old captures
581 self.pg1.enable_capture()
584 # assert nothing captured on pg0
585 self.pg0.assert_nothing_captured()
586 # all packets will be dropped by SPD rule
587 self.pg1.assert_nothing_captured()
588 # verify all policies matched the expected number of times
589 self.verify_policy_match(pkt_count, policy_0)
590 self.verify_policy_match(pkt_count, policy_1)
592 # now readd the bypass rule
593 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
600 policy_type="bypass",
603 # resend the same packets
604 self.pg0.add_stream(packets)
605 self.pg0.enable_capture() # flush the old captures
606 self.pg1.enable_capture()
610 capture = self.pg1.get_capture(pkt_count)
611 for packet in capture:
613 self.logger.debug(ppp("SPD - Got packet:", packet))
615 self.logger.error(ppp("Unexpected or invalid packet:", packet))
617 self.logger.debug("SPD: Num packets: %s", len(capture.res))
619 # assert nothing captured on pg0
620 self.pg0.assert_nothing_captured()
621 # verify captured packets
622 self.verify_capture(self.pg0, self.pg1, capture)
623 # verify all policies matched the expected number of times
624 self.verify_policy_match(pkt_count, policy_0)
625 self.verify_policy_match(pkt_count, policy_1)
628 class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
629 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
630 (multiple interfaces, multiple rules)"""
632 def test_ipsec_spd_outbound_multiple(self):
633 # In this test case, packets in IPv4 FWD path are configured to go
634 # through IPSec outbound SPD policy lookup.
635 # Multiples rules on multiple interfaces are tested at the same time.
636 # 3x interfaces are configured, binding the same SPD to each.
637 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
638 # On pg0 & pg1, the BYPASS rule is HIGH priority
639 # On pg2, the DISCARD rule is HIGH priority
640 # Traffic should be received on pg0 & pg1 and dropped on pg2.
641 self.create_interfaces(3)
643 # bind SPD to all interfaces
644 self.spd_create_and_intf_add(1, self.pg_interfaces)
645 # add rules on all interfaces
646 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
653 policy_type="bypass",
655 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
662 policy_type="discard",
665 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
672 policy_type="bypass",
674 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
681 policy_type="discard",
684 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
691 policy_type="bypass",
693 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
700 policy_type="discard",
703 # interfaces bound to an SPD, will by default drop inbound
704 # traffic with no matching policies. add catch-all inbound
705 # bypass rule to SPD:
706 self.spd_add_rem_policy( # inbound, all interfaces
713 policy_type="bypass",
717 # create the packet streams
718 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
719 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
720 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
721 # add the streams to the source interfaces
722 self.pg0.add_stream(packets0)
723 self.pg1.add_stream(packets1)
724 self.pg2.add_stream(packets2)
725 # enable capture on all interfaces
726 for pg in self.pg_interfaces:
728 # start the packet generator
733 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
734 if_caps.append(pg.get_capture())
735 for packet in if_caps[-1]:
737 self.logger.debug(ppp("SPD - Got packet:", packet))
739 self.logger.error(ppp("Unexpected or invalid packet:", packet))
741 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
742 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
744 # verify captures that matched BYPASS rule
745 self.verify_capture(self.pg0, self.pg1, if_caps[0])
746 self.verify_capture(self.pg1, self.pg2, if_caps[1])
747 # verify that traffic to pg0 matched DISCARD rule and was dropped
748 self.pg0.assert_nothing_captured()
749 # verify all packets that were expected to match rules, matched
751 self.verify_policy_match(pkt_count, policy_01)
752 self.verify_policy_match(0, policy_02)
754 self.verify_policy_match(pkt_count, policy_11)
755 self.verify_policy_match(0, policy_12)
757 self.verify_policy_match(0, policy_21)
758 self.verify_policy_match(pkt_count, policy_22)
761 if __name__ == "__main__":
762 unittest.main(testRunner=VppTestRunner)