ipsec: add per-SA error counters
[vpp.git] / test / test_ipsec_spd_fp_input.py
1 import socket
2 import unittest
3 import ipaddress
4
5 from util import ppp
6 from framework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
8 from template_ipsec import IPSecIPv6Fwd
9 from test_ipsec_esp import TemplateIpsecEsp
10
11
12 def debug_signal_handler(signal, frame):
13     import pdb
14
15     pdb.set_trace()
16
17
18 import signal
19
20 signal.signal(signal.SIGINT, debug_signal_handler)
21
22
23 class SpdFastPathInbound(IPSecIPv4Fwd):
24     # In test cases derived from this class, packets in IPv4 FWD path
25     # are configured to go through IPSec inbound SPD policy lookup.
26     # Note that order in which the rules are applied is
27     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
28     # precedence over DISCARD.
29     #
30     # Override setUpConstants to enable inbound fast path in config
31     @classmethod
32     def setUpConstants(cls):
33         super(SpdFastPathInbound, cls).setUpConstants()
34         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
35         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
36
37
38 class SpdFastPathInboundProtect(TemplateIpsecEsp):
39     @classmethod
40     def setUpConstants(cls):
41         super(SpdFastPathInboundProtect, cls).setUpConstants()
42         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
43         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
44
45     @classmethod
46     def setUpClass(cls):
47         super(SpdFastPathInboundProtect, cls).setUpClass()
48
49     @classmethod
50     def tearDownClass(cls):
51         super(SpdFastPathInboundProtect, cls).tearDownClass()
52
53     def setUp(self):
54         super(SpdFastPathInboundProtect, self).setUp()
55
56     def tearDown(self):
57         self.unconfig_network()
58         super(SpdFastPathInboundProtect, self).tearDown()
59
60
61 class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
62     # In test cases derived from this class, packets in IPvr6 FWD path
63     # are configured to go through IPSec inbound SPD policy lookup.
64     # Note that order in which the rules are applied is
65     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
66     # precedence over DISCARDi.
67
68     # Override setUpConstants to enable inbound fast path in config
69     @classmethod
70     def setUpConstants(cls):
71         super(SpdFastPathIPv6Inbound, cls).setUpConstants()
72         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
73         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
74
75
76 class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
77     @classmethod
78     def setUpConstants(cls):
79         super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
80         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
81         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
82
83     @classmethod
84     def setUpClass(cls):
85         super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
86
87     @classmethod
88     def tearDownClass(cls):
89         super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
90
91     def setUp(self):
92         super(SpdFastPathIPv6InboundProtect, self).setUp()
93
94     def tearDown(self):
95         self.unconfig_network()
96         super(SpdFastPathIPv6InboundProtect, self).tearDown()
97
98
99 class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
100     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
101         (add bypass)"""
102
103     def test_ipsec_spd_inbound_bypass(self):
104         # In this test case, packets in IPv4 FWD path are configured
105         # to go through IPSec inbound SPD policy lookup.
106         #
107         # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
108         # - High priority rule action is set to DISCARD.
109         # - Low priority rule action is set to BYPASS.
110         #
111         # Since BYPASS rules take precedence over DISCARD
112         # (the order being PROTECT, BYPASS, DISCARD) we expect the
113         # BYPASS rule to match and traffic to be correctly forwarded.
114         self.create_interfaces(2)
115         pkt_count = 5
116
117         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
118
119         # create input rules
120         # bypass rule should take precedence over discard rule,
121         # even though it's lower priority, because for input policies
122         # matching PROTECT policies precedes matching BYPASS policies
123         # which preceeds matching for DISCARD policies.
124         # Any hit stops the process.
125         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
126             1,
127             self.pg1,
128             self.pg0,
129             socket.IPPROTO_UDP,
130             is_out=0,
131             priority=10,
132             policy_type="bypass",
133             ip_range=True,
134             local_ip_start=self.pg1.remote_ip4,
135             local_ip_stop=self.pg1.remote_ip4,
136             remote_ip_start=self.pg0.remote_ip4,
137             remote_ip_stop=self.pg0.remote_ip4,
138         )
139         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
140             1,
141             self.pg1,
142             self.pg0,
143             socket.IPPROTO_UDP,
144             is_out=0,
145             priority=15,
146             policy_type="discard",
147             ip_range=True,
148             local_ip_start=self.pg1.remote_ip4,
149             local_ip_stop=self.pg1.remote_ip4,
150             remote_ip_start=self.pg0.remote_ip4,
151             remote_ip_stop=self.pg0.remote_ip4,
152         )
153
154         # create output rule so we can capture forwarded packets
155         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
156             1,
157             self.pg0,
158             self.pg1,
159             socket.IPPROTO_UDP,
160             is_out=1,
161             priority=10,
162             policy_type="bypass",
163         )
164
165         # create the packet stream
166         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
167         # add the stream to the source interface
168         self.pg0.add_stream(packets)
169         self.pg1.enable_capture()
170         self.pg_start()
171
172         # check capture on pg1
173         capture = self.pg1.get_capture()
174         for packet in capture:
175             try:
176                 self.logger.debug(ppp("SPD Add - Got packet:", packet))
177             except Exception:
178                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
179                 raise
180         self.logger.debug("SPD: Num packets: %s", len(capture.res))
181
182         # verify captured packets
183         self.verify_capture(self.pg0, self.pg1, capture)
184         # verify all policies matched the expected number of times
185         self.verify_policy_match(pkt_count, policy_0)
186         self.verify_policy_match(0, policy_1)
187         self.verify_policy_match(pkt_count, policy_2)
188
189
190 class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
191     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
192             (add discard)"""
193
194     def test_ipsec_spd_inbound_discard(self):
195         # In this test case, packets in IPv4 FWD path are configured
196         # to go through IPSec inbound SPD policy lookup.
197         #
198         #  Rule action is set to DISCARD.
199
200         self.create_interfaces(2)
201         pkt_count = 5
202
203         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
204
205         # create input rules
206         # bypass rule should take precedence over discard rule,
207         # even though it's lower priority
208         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
209             1,
210             self.pg1,
211             self.pg0,
212             socket.IPPROTO_UDP,
213             is_out=0,
214             priority=10,
215             policy_type="discard",
216         )
217
218         # create output rule so we can capture forwarded packets
219         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
220             1,
221             self.pg1,
222             self.pg0,
223             socket.IPPROTO_UDP,
224             is_out=1,
225             priority=10,
226             policy_type="bypass",
227         )
228
229         # create the packet stream
230         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
231         # add the stream to the source interface
232         self.pg0.add_stream(packets)
233         self.pg1.enable_capture()
234         self.pg_start()
235
236         # check capture on pg1
237         capture = self.pg1.assert_nothing_captured()
238
239         # verify all policies matched the expected number of times
240         self.verify_policy_match(pkt_count, policy_0)
241         self.verify_policy_match(0, policy_1)
242
243
244 class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
245     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
246     (add protect)"""
247
248     @classmethod
249     def setUpClass(cls):
250         super(IPSec4SpdTestCaseProtect, cls).setUpClass()
251
252     @classmethod
253     def tearDownClass(cls):
254         super(IPSec4SpdTestCaseProtect, cls).tearDownClass()
255
256     def setUp(self):
257         super(IPSec4SpdTestCaseProtect, self).setUp()
258
259     def tearDown(self):
260         super(IPSec4SpdTestCaseProtect, self).tearDown()
261
262     def test_ipsec_spd_inbound_protect(self):
263         # In this test case, encrypted packets in IPv4
264         # PROTECT path are configured
265         # to go through IPSec inbound SPD policy lookup.
266
267         pkt_count = 5
268         payload_size = 64
269         p = self.params[socket.AF_INET]
270         send_pkts = self.gen_encrypt_pkts(
271             p,
272             p.scapy_tra_sa,
273             self.tra_if,
274             src=self.tra_if.remote_ip4,
275             dst=self.tra_if.local_ip4,
276             count=pkt_count,
277             payload_size=payload_size,
278         )
279         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
280
281         self.logger.info(self.vapi.ppcli("show error"))
282         self.logger.info(self.vapi.ppcli("show ipsec all"))
283
284         pkts = p.tra_sa_in.get_stats()["packets"]
285         self.assertEqual(
286             pkts,
287             pkt_count,
288             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
289         )
290         pkts = p.tra_sa_out.get_stats()["packets"]
291         self.assertEqual(
292             pkts,
293             pkt_count,
294             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
295         )
296         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
297         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
298
299
300 class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound):
301     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
302         (add  ips  range with any port rule)"""
303
304     def test_ipsec_spd_inbound_add(self):
305         # In this test case, packets in IPv4 FWD path are configured
306         # to go through IPSec inbound SPD policy lookup.
307         # 2 SPD bypass rules (1 for inbound and 1 for outbound) are added.
308         # Traffic sent on pg0 interface should match fast path priority
309         # rule and should be sent out on pg1 interface.
310         self.create_interfaces(2)
311         pkt_count = 5
312         s_ip_s1 = ipaddress.ip_address(self.pg0.remote_ip4)
313         s_ip_e1 = ipaddress.ip_address(int(s_ip_s1) + 5)
314         d_ip_s1 = ipaddress.ip_address(self.pg1.remote_ip4)
315         d_ip_e1 = ipaddress.ip_address(int(d_ip_s1) + 0)
316
317         s_ip_s0 = ipaddress.ip_address(self.pg0.remote_ip4)
318         s_ip_e0 = ipaddress.ip_address(int(s_ip_s0) + 6)
319         d_ip_s0 = ipaddress.ip_address(self.pg1.remote_ip4)
320         d_ip_e0 = ipaddress.ip_address(int(d_ip_s0) + 0)
321         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
322
323         policy_0 = self.spd_add_rem_policy(  # inbound fast path, priority 10
324             1,
325             self.pg0,
326             self.pg1,
327             socket.IPPROTO_UDP,
328             is_out=0,
329             priority=10,
330             policy_type="bypass",
331             ip_range=True,
332             local_ip_start=d_ip_s0,
333             local_ip_stop=d_ip_e0,
334             remote_ip_start=s_ip_s0,
335             remote_ip_stop=s_ip_e0,
336         )
337         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
338             1,
339             self.pg0,
340             self.pg1,
341             socket.IPPROTO_UDP,
342             is_out=1,
343             priority=5,
344             policy_type="bypass",
345             ip_range=True,
346             local_ip_start=s_ip_s1,
347             local_ip_stop=s_ip_e1,
348             remote_ip_start=d_ip_s1,
349             remote_ip_stop=d_ip_e1,
350         )
351
352         # create the packet stream
353         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
354         # add the stream to the source interface + enable capture
355         self.pg0.add_stream(packets)
356         self.pg0.enable_capture()
357         self.pg1.enable_capture()
358         # start the packet generator
359         self.pg_start()
360         # get capture
361         capture = self.pg1.get_capture()
362         for packet in capture:
363             try:
364                 self.logger.debug(ppp("SPD - Got packet:", packet))
365             except Exception:
366                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
367                 raise
368         self.logger.debug("SPD: Num packets: %s", len(capture.res))
369
370         # assert nothing captured on pg0
371         self.pg0.assert_nothing_captured()
372         # verify captured packets
373         self.verify_capture(self.pg0, self.pg1, capture)
374         # verify all policies matched the expected number of times
375         self.verify_policy_match(pkt_count, policy_0)
376         self.verify_policy_match(pkt_count, policy_1)
377
378
379 class IPSec4SpdTestCaseAddAll(SpdFastPathInbound):
380     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
381         (add all ips ports rule)"""
382
383     def test_ipsec_spd_inbound_add(self):
384         # In this test case, packets in IPv4 FWD path are configured
385         # to go through IPSec inbound SPD policy lookup.
386         # 2 SPD rules (1 HIGH and 1 LOW) are added.
387         # Low priority rule action is set to BYPASS all ips.
388         # High priority rule action is set to DISCARD all ips.
389         # Traffic not sent on pg0 interface when HIGH discard priority rule is added.
390         # Then LOW priority
391         # rule is added and send the same traffic to pg0, this time expect
392         # the traffic is bypassed as bypass takes priority over discard.
393         self.create_interfaces(2)
394         pkt_count = 5
395         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
396
397         policy_0 = self.spd_add_rem_policy(  # inbound, priority 20
398             1,
399             self.pg0,
400             self.pg1,
401             socket.IPPROTO_UDP,
402             is_out=0,
403             priority=20,
404             policy_type="discard",
405             all_ips=True,
406         )
407
408         policy_1 = self.spd_add_rem_policy(  # inbound, priority 20
409             1,
410             self.pg0,
411             self.pg1,
412             socket.IPPROTO_UDP,
413             is_out=True,
414             priority=5,
415             policy_type="bypass",
416             all_ips=True,
417         )
418
419         # create the packet stream
420         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
421         # add the stream to the source interface + enable capture
422         self.pg0.add_stream(packets)
423         self.pg0.enable_capture()
424         self.pg1.enable_capture()
425         # start the packet generator
426         self.pg_start()
427         # assert nothing captured on pg0 and pg1
428         self.pg0.assert_nothing_captured()
429         self.pg1.assert_nothing_captured()
430
431         policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
432             1,
433             self.pg0,
434             self.pg1,
435             socket.IPPROTO_UDP,
436             is_out=0,
437             priority=10,
438             policy_type="bypass",
439             all_ips=True,
440         )
441
442         # create the packet stream
443         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
444         # add the stream to the source interface + enable capture
445         self.pg0.add_stream(packets)
446         self.pg0.enable_capture()
447         self.pg1.enable_capture()
448         # start the packet generator
449         self.pg_start()
450         # get capture
451         capture = self.pg1.get_capture(expected_count=pkt_count)
452         for packet in capture:
453             try:
454                 self.logger.debug(ppp("SPD - Got packet:", packet))
455             except Exception:
456                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
457                 raise
458         self.logger.debug("SPD: Num packets: %s", len(capture.res))
459
460         # assert nothing captured on pg0
461         self.pg0.assert_nothing_captured()
462         # verify all policies matched the expected number of times
463         self.verify_policy_match(pkt_count, policy_2)
464
465
466 class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
467     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
468         (remove rule)"""
469
470     def test_ipsec_spd_inbound_remove(self):
471         # In this test case, packets in IPv4 FWD path are configured
472         # to go through IPSec inbound SPD policy lookup.
473         # 2 SPD rules (1 HIGH and 1 LOW) are added.
474         # High priority rule action is set to BYPASS.
475         # Low priority rule action is set to DISCARD.
476         # High priority rule is then removed.
477         # Traffic sent on pg0 interface should match low priority
478         # rule and should be discarded after SPD lookup.
479         self.create_interfaces(2)
480         pkt_count = 5
481         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
482         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
483             1,
484             self.pg1,
485             self.pg0,
486             socket.IPPROTO_UDP,
487             is_out=0,
488             priority=10,
489             policy_type="bypass",
490         )
491         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
492             1,
493             self.pg1,
494             self.pg0,
495             socket.IPPROTO_UDP,
496             is_out=0,
497             priority=5,
498             policy_type="discard",
499         )
500
501         policy_out = self.spd_add_rem_policy(  # outbound, priority 10
502             1,
503             self.pg0,
504             self.pg1,
505             socket.IPPROTO_UDP,
506             is_out=1,
507             priority=10,
508             policy_type="bypass",
509         )
510
511         # create the packet stream
512         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
513         # add the stream to the source interface + enable capture
514         self.pg0.add_stream(packets)
515         self.pg0.enable_capture()
516         self.pg1.enable_capture()
517         # start the packet generator
518         self.pg_start()
519         # get capture
520         capture = self.pg1.get_capture()
521         for packet in capture:
522             try:
523                 self.logger.debug(ppp("SPD - Got packet:", packet))
524             except Exception:
525                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
526                 raise
527
528         # assert nothing captured on pg0
529         self.pg0.assert_nothing_captured()
530         # verify capture on pg1
531         self.logger.debug("SPD: Num packets: %s", len(capture.res))
532         self.verify_capture(self.pg0, self.pg1, capture)
533         # verify all policies matched the expected number of times
534         self.verify_policy_match(pkt_count, policy_0)
535         self.verify_policy_match(0, policy_1)
536         # now remove the bypass rule
537         self.spd_add_rem_policy(  # outbound, priority 10
538             1,
539             self.pg1,
540             self.pg0,
541             socket.IPPROTO_UDP,
542             is_out=0,
543             priority=10,
544             policy_type="bypass",
545             remove=True,
546         )
547
548         # resend the same packets
549         self.pg0.add_stream(packets)
550         self.pg0.enable_capture()  # flush the old captures
551         self.pg1.enable_capture()
552         self.pg_start()
553         # assert nothing captured on pg0
554         self.pg0.assert_nothing_captured()
555         # all packets will be dropped by SPD rule
556         self.pg1.assert_nothing_captured()
557         # verify all policies matched the expected number of times
558         self.verify_policy_match(pkt_count, policy_0)
559         self.verify_policy_match(pkt_count, policy_1)
560
561
562 class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
563     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
564         (add, remove, re-add)"""
565
566     def test_ipsec_spd_inbound_readd(self):
567         # In this test case, packets in IPv4 FWD path are configured
568         # to go through IPSec outbound SPD policy lookup.
569         # 2 SPD rules (1 HIGH and 1 LOW) are added.
570         # High priority rule action is set to BYPASS.
571         # Low priority rule action is set to DISCARD.
572         # Traffic sent on pg0 interface should match high priority
573         # rule and should be sent out on pg1 interface.
574         # High priority rule is then removed.
575         # Traffic sent on pg0 interface should match low priority
576         # rule and should be discarded after SPD lookup.
577         # Readd high priority rule.
578         # Traffic sent on pg0 interface should match high priority
579         # rule and should be sent out on pg1 interface.
580         self.create_interfaces(2)
581         pkt_count = 5
582         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
583         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
584             1,
585             self.pg1,
586             self.pg0,
587             socket.IPPROTO_UDP,
588             is_out=0,
589             priority=10,
590             policy_type="bypass",
591         )
592         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
593             1,
594             self.pg1,
595             self.pg0,
596             socket.IPPROTO_UDP,
597             is_out=0,
598             priority=5,
599             policy_type="discard",
600         )
601         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
602             1,
603             self.pg0,
604             self.pg1,
605             socket.IPPROTO_UDP,
606             is_out=1,
607             priority=10,
608             policy_type="bypass",
609         )
610
611         # create the packet stream
612         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
613         # add the stream to the source interface + enable capture
614         self.pg0.add_stream(packets)
615         self.pg0.enable_capture()
616         self.pg1.enable_capture()
617         # start the packet generator
618         self.pg_start()
619         # get capture
620         capture = self.pg1.get_capture()
621         for packet in capture:
622             try:
623                 self.logger.debug(ppp("SPD - Got packet:", packet))
624             except Exception:
625                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
626                 raise
627         self.logger.debug("SPD: Num packets: %s", len(capture.res))
628
629         # assert nothing captured on pg0
630         self.pg0.assert_nothing_captured()
631         # verify capture on pg1
632         self.verify_capture(self.pg0, self.pg1, capture)
633         # verify all policies matched the expected number of times
634         self.verify_policy_match(pkt_count, policy_0)
635         self.verify_policy_match(0, policy_1)
636         # remove the bypass rule, leaving only the discard rule
637         self.spd_add_rem_policy(  # inbound, priority 10
638             1,
639             self.pg1,
640             self.pg0,
641             socket.IPPROTO_UDP,
642             is_out=0,
643             priority=10,
644             policy_type="bypass",
645             remove=True,
646         )
647
648         # resend the same packets
649         self.pg0.add_stream(packets)
650         self.pg0.enable_capture()  # flush the old captures
651         self.pg1.enable_capture()
652         self.pg_start()
653
654         # assert nothing captured on pg0
655         self.pg0.assert_nothing_captured()
656         # all packets will be dropped by SPD rule
657         self.pg1.assert_nothing_captured()
658         # verify all policies matched the expected number of times
659         self.verify_policy_match(pkt_count, policy_0)
660         self.verify_policy_match(pkt_count, policy_1)
661
662         # now readd the bypass rule
663         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
664             1,
665             self.pg1,
666             self.pg0,
667             socket.IPPROTO_UDP,
668             is_out=0,
669             priority=10,
670             policy_type="bypass",
671         )
672
673         # resend the same packets
674         self.pg0.add_stream(packets)
675         self.pg0.enable_capture()  # flush the old captures
676         self.pg1.enable_capture()
677         self.pg_start()
678
679         # get capture
680         capture = self.pg1.get_capture(pkt_count)
681         for packet in capture:
682             try:
683                 self.logger.debug(ppp("SPD - Got packet:", packet))
684             except Exception:
685                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
686                 raise
687         self.logger.debug("SPD: Num packets: %s", len(capture.res))
688
689         # assert nothing captured on pg0
690         self.pg0.assert_nothing_captured()
691         # verify captured packets
692         self.verify_capture(self.pg0, self.pg1, capture)
693         # verify all policies matched the expected number of times
694         self.verify_policy_match(pkt_count, policy_0)
695         self.verify_policy_match(pkt_count, policy_1)
696
697
698 class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
699     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
700         (multiple interfaces, multiple rules)"""
701
702     def test_ipsec_spd_inbound_multiple(self):
703         # In this test case, packets in IPv4 FWD path are configured to go
704         # through IPSec outbound SPD policy lookup.
705         # Multiples rules on multiple interfaces are tested at the same time.
706         # 3x interfaces are configured, binding the same SPD to each.
707         # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
708         # On pg0 & pg1, the BYPASS rule is HIGH priority
709         # On pg2, the DISCARD rule is HIGH priority
710         # Traffic should be received on pg0 & pg1 and dropped on pg2.
711         self.create_interfaces(3)
712         pkt_count = 5
713         # bind SPD to all interfaces
714         self.spd_create_and_intf_add(1, self.pg_interfaces)
715         # add rules on all interfaces
716         policy_01 = self.spd_add_rem_policy(  # inbound, priority 10
717             1,
718             self.pg1,
719             self.pg0,
720             socket.IPPROTO_UDP,
721             is_out=0,
722             priority=10,
723             policy_type="bypass",
724         )
725         policy_02 = self.spd_add_rem_policy(  # inbound, priority 5
726             1,
727             self.pg1,
728             self.pg0,
729             socket.IPPROTO_UDP,
730             is_out=0,
731             priority=5,
732             policy_type="discard",
733         )
734
735         policy_11 = self.spd_add_rem_policy(  # inbound, priority 10
736             1,
737             self.pg2,
738             self.pg1,
739             socket.IPPROTO_UDP,
740             is_out=0,
741             priority=10,
742             policy_type="bypass",
743         )
744         policy_12 = self.spd_add_rem_policy(  # inbound, priority 5
745             1,
746             self.pg2,
747             self.pg1,
748             socket.IPPROTO_UDP,
749             is_out=0,
750             priority=5,
751             policy_type="discard",
752         )
753
754         policy_21 = self.spd_add_rem_policy(  # inbound, priority 5
755             1,
756             self.pg0,
757             self.pg2,
758             socket.IPPROTO_UDP,
759             is_out=0,
760             priority=5,
761             policy_type="bypass",
762         )
763         policy_22 = self.spd_add_rem_policy(  # inbound, priority 10
764             1,
765             self.pg0,
766             self.pg2,
767             socket.IPPROTO_UDP,
768             is_out=0,
769             priority=10,
770             policy_type="discard",
771         )
772
773         # interfaces bound to an SPD, will by default drop outbound
774         # traffic with no matching policies. add catch-all outbound
775         # bypass rule to SPD:
776         self.spd_add_rem_policy(  # outbound, all interfaces
777             1,
778             None,
779             None,
780             socket.IPPROTO_UDP,
781             is_out=1,
782             priority=10,
783             policy_type="bypass",
784             all_ips=True,
785         )
786
787         # create the packet streams
788         packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
789         packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
790         packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
791         # add the streams to the source interfaces
792         self.pg0.add_stream(packets0)
793         self.pg1.add_stream(packets1)
794         self.pg2.add_stream(packets2)
795         # enable capture on all interfaces
796         for pg in self.pg_interfaces:
797             pg.enable_capture()
798         # start the packet generator
799         self.pg_start()
800
801         # get captures
802         if_caps = []
803         for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
804             if_caps.append(pg.get_capture())
805             for packet in if_caps[-1]:
806                 try:
807                     self.logger.debug(ppp("SPD - Got packet:", packet))
808                 except Exception:
809                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
810                     raise
811         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
812         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
813
814         # verify captures that matched BYPASS rule
815         self.verify_capture(self.pg0, self.pg1, if_caps[0])
816         self.verify_capture(self.pg1, self.pg2, if_caps[1])
817         # verify that traffic to pg0 matched BYPASS rule
818         # although DISCARD rule had higher prioriy and was not dropped
819         self.verify_policy_match(pkt_count, policy_21)
820
821         # verify all packets that were expected to match rules, matched
822         # pg0 -> pg1
823         self.verify_policy_match(pkt_count, policy_01)
824         self.verify_policy_match(0, policy_02)
825         # pg1 -> pg2
826         self.verify_policy_match(pkt_count, policy_11)
827         self.verify_policy_match(0, policy_12)
828         # pg2 -> pg0
829         self.verify_policy_match(0, policy_22)
830
831
832 class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
833     """ IPSec/IPv6 inbound: Policy mode test case with fast path \
834     (add protect)"""
835
836     @classmethod
837     def setUpClass(cls):
838         super(IPSec6SpdTestCaseProtect, cls).setUpClass()
839
840     @classmethod
841     def tearDownClass(cls):
842         super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
843
844     def setUp(self):
845         super(IPSec6SpdTestCaseProtect, self).setUp()
846
847     def tearDown(self):
848         super(IPSec6SpdTestCaseProtect, self).tearDown()
849
850     def test_ipsec6_spd_inbound_protect(self):
851         pkt_count = 5
852         payload_size = 64
853         p = self.params[socket.AF_INET6]
854         send_pkts = self.gen_encrypt_pkts6(
855             p,
856             p.scapy_tra_sa,
857             self.tra_if,
858             src=self.tra_if.remote_ip6,
859             dst=self.tra_if.local_ip6,
860             count=pkt_count,
861             payload_size=payload_size,
862         )
863         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
864
865         self.logger.info(self.vapi.ppcli("show error"))
866         self.logger.info(self.vapi.ppcli("show ipsec all"))
867         pkts = p.tra_sa_in.get_stats()["packets"]
868         self.assertEqual(
869             pkts,
870             pkt_count,
871             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
872         )
873         pkts = p.tra_sa_out.get_stats()["packets"]
874         self.assertEqual(
875             pkts,
876             pkt_count,
877             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
878         )
879         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
880         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
881
882
883 if __name__ == "__main__":
884     unittest.main(testRunner=VppTestRunner)