tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_ipsec_spd_fp_input.py
1 import socket
2 import unittest
3 import ipaddress
4
5 from util import ppp
6 from asfframework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
8 from template_ipsec import IPSecIPv6Fwd
9 from test_ipsec_esp import TemplateIpsecEsp
10 from template_ipsec import SpdFastPathTemplate
11
12
13 def debug_signal_handler(signal, frame):
14     import pdb
15
16     pdb.set_trace()
17
18
19 import signal
20
21 signal.signal(signal.SIGINT, debug_signal_handler)
22
23
24 class SpdFastPathInbound(SpdFastPathTemplate):
25     # In test cases derived from this class, packets in IPv4 FWD path
26     # are configured to go through IPSec inbound SPD policy lookup.
27     # Note that order in which the rules are applied is
28     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
29     # precedence over DISCARD.
30     #
31     # Override setUpConstants to enable inbound fast path in config
32     @classmethod
33     def setUpConstants(cls):
34         super(SpdFastPathInbound, cls).setUpConstants()
35         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
36         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
37
38
39 class SpdFastPathInboundProtect(TemplateIpsecEsp):
40     @classmethod
41     def setUpConstants(cls):
42         super(SpdFastPathInboundProtect, cls).setUpConstants()
43         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
44         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
45
46     @classmethod
47     def setUpClass(cls):
48         super(SpdFastPathInboundProtect, cls).setUpClass()
49
50     @classmethod
51     def tearDownClass(cls):
52         super(SpdFastPathInboundProtect, cls).tearDownClass()
53
54     def setUp(self):
55         super(SpdFastPathInboundProtect, self).setUp()
56
57     def tearDown(self):
58         self.unconfig_network()
59         super(SpdFastPathInboundProtect, self).tearDown()
60
61
62 class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
63     # In test cases derived from this class, packets in IPvr6 FWD path
64     # are configured to go through IPSec inbound SPD policy lookup.
65     # Note that order in which the rules are applied is
66     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
67     # precedence over DISCARDi.
68
69     # Override setUpConstants to enable inbound fast path in config
70     @classmethod
71     def setUpConstants(cls):
72         super(SpdFastPathIPv6Inbound, cls).setUpConstants()
73         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
74         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
75
76
77 class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
78     @classmethod
79     def setUpConstants(cls):
80         super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
81         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
82         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
83
84     @classmethod
85     def setUpClass(cls):
86         super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
87
88     @classmethod
89     def tearDownClass(cls):
90         super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
91
92     def setUp(self):
93         super(SpdFastPathIPv6InboundProtect, self).setUp()
94
95     def tearDown(self):
96         self.unconfig_network()
97         super(SpdFastPathIPv6InboundProtect, self).tearDown()
98
99
100 class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
101     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
102         (add bypass)"""
103
104     def test_ipsec_spd_inbound_bypass(self):
105         # In this test case, packets in IPv4 FWD path are configured
106         # to go through IPSec inbound SPD policy lookup.
107         #
108         # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
109         # - High priority rule action is set to DISCARD.
110         # - Low priority rule action is set to BYPASS.
111         #
112         # Since BYPASS rules take precedence over DISCARD
113         # (the order being PROTECT, BYPASS, DISCARD) we expect the
114         # BYPASS rule to match and traffic to be correctly forwarded.
115         self.create_interfaces(2)
116         pkt_count = 5
117
118         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
119
120         # create input rules
121         # bypass rule should take precedence over discard rule,
122         # even though it's lower priority, because for input policies
123         # matching PROTECT policies precedes matching BYPASS policies
124         # which preceeds matching for DISCARD policies.
125         # Any hit stops the process.
126         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
127             1,
128             self.pg1,
129             self.pg0,
130             socket.IPPROTO_UDP,
131             is_out=0,
132             priority=10,
133             policy_type="bypass",
134             ip_range=True,
135             local_ip_start=self.pg1.remote_ip4,
136             local_ip_stop=self.pg1.remote_ip4,
137             remote_ip_start=self.pg0.remote_ip4,
138             remote_ip_stop=self.pg0.remote_ip4,
139         )
140         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
141             1,
142             self.pg1,
143             self.pg0,
144             socket.IPPROTO_UDP,
145             is_out=0,
146             priority=15,
147             policy_type="discard",
148             ip_range=True,
149             local_ip_start=self.pg1.remote_ip4,
150             local_ip_stop=self.pg1.remote_ip4,
151             remote_ip_start=self.pg0.remote_ip4,
152             remote_ip_stop=self.pg0.remote_ip4,
153         )
154
155         # create output rule so we can capture forwarded packets
156         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
157             1,
158             self.pg0,
159             self.pg1,
160             socket.IPPROTO_UDP,
161             is_out=1,
162             priority=10,
163             policy_type="bypass",
164         )
165
166         # create the packet stream
167         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
168         # add the stream to the source interface
169         self.pg0.add_stream(packets)
170         self.pg1.enable_capture()
171         self.pg_start()
172
173         # check capture on pg1
174         capture = self.pg1.get_capture()
175         for packet in capture:
176             try:
177                 self.logger.debug(ppp("SPD Add - Got packet:", packet))
178             except Exception:
179                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
180                 raise
181         self.logger.debug("SPD: Num packets: %s", len(capture.res))
182
183         # verify captured packets
184         self.verify_capture(self.pg0, self.pg1, capture)
185         # verify all policies matched the expected number of times
186         self.verify_policy_match(pkt_count, policy_0)
187         self.verify_policy_match(0, policy_1)
188         self.verify_policy_match(pkt_count, policy_2)
189
190
191 class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
192     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
193             (add discard)"""
194
195     def test_ipsec_spd_inbound_discard(self):
196         # In this test case, packets in IPv4 FWD path are configured
197         # to go through IPSec inbound SPD policy lookup.
198         #
199         #  Rule action is set to DISCARD.
200
201         self.create_interfaces(2)
202         pkt_count = 5
203
204         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
205
206         # create input rules
207         # bypass rule should take precedence over discard rule,
208         # even though it's lower priority
209         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
210             1,
211             self.pg1,
212             self.pg0,
213             socket.IPPROTO_UDP,
214             is_out=0,
215             priority=10,
216             policy_type="discard",
217         )
218
219         # create output rule so we can capture forwarded packets
220         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
221             1,
222             self.pg1,
223             self.pg0,
224             socket.IPPROTO_UDP,
225             is_out=1,
226             priority=10,
227             policy_type="bypass",
228         )
229
230         # create the packet stream
231         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
232         # add the stream to the source interface
233         self.pg0.add_stream(packets)
234         self.pg1.enable_capture()
235         self.pg_start()
236
237         # check capture on pg1
238         capture = self.pg1.assert_nothing_captured()
239
240         # verify all policies matched the expected number of times
241         self.verify_policy_match(pkt_count, policy_0)
242         self.verify_policy_match(0, policy_1)
243
244
245 class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
246     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
247     (add protect)"""
248
249     @classmethod
250     def setUpClass(cls):
251         super(IPSec4SpdTestCaseProtect, cls).setUpClass()
252
253     @classmethod
254     def tearDownClass(cls):
255         super(IPSec4SpdTestCaseProtect, cls).tearDownClass()
256
257     def setUp(self):
258         super(IPSec4SpdTestCaseProtect, self).setUp()
259
260     def tearDown(self):
261         super(IPSec4SpdTestCaseProtect, self).tearDown()
262
263     def test_ipsec_spd_inbound_protect(self):
264         # In this test case, encrypted packets in IPv4
265         # PROTECT path are configured
266         # to go through IPSec inbound SPD policy lookup.
267
268         pkt_count = 5
269         payload_size = 64
270         p = self.params[socket.AF_INET]
271         send_pkts = self.gen_encrypt_pkts(
272             p,
273             p.scapy_tra_sa,
274             self.tra_if,
275             src=self.tra_if.remote_ip4,
276             dst=self.tra_if.local_ip4,
277             count=pkt_count,
278             payload_size=payload_size,
279         )
280         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
281
282         self.logger.info(self.vapi.ppcli("show error"))
283         self.logger.info(self.vapi.ppcli("show ipsec all"))
284
285         pkts = p.tra_sa_in.get_stats()["packets"]
286         self.assertEqual(
287             pkts,
288             pkt_count,
289             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
290         )
291         pkts = p.tra_sa_out.get_stats()["packets"]
292         self.assertEqual(
293             pkts,
294             pkt_count,
295             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
296         )
297         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
298         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
299
300
301 class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound):
302     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
303         (add  ips  range with any port rule)"""
304
305     def test_ipsec_spd_inbound_add(self):
306         # In this test case, packets in IPv4 FWD path are configured
307         # to go through IPSec inbound SPD policy lookup.
308         # 2 SPD bypass rules (1 for inbound and 1 for outbound) are added.
309         # Traffic sent on pg0 interface should match fast path priority
310         # rule and should be sent out on pg1 interface.
311         self.create_interfaces(2)
312         pkt_count = 5
313         s_ip_s1 = ipaddress.ip_address(self.pg0.remote_ip4)
314         s_ip_e1 = ipaddress.ip_address(int(s_ip_s1) + 5)
315         d_ip_s1 = ipaddress.ip_address(self.pg1.remote_ip4)
316         d_ip_e1 = ipaddress.ip_address(int(d_ip_s1) + 0)
317
318         s_ip_s0 = ipaddress.ip_address(self.pg0.remote_ip4)
319         s_ip_e0 = ipaddress.ip_address(int(s_ip_s0) + 6)
320         d_ip_s0 = ipaddress.ip_address(self.pg1.remote_ip4)
321         d_ip_e0 = ipaddress.ip_address(int(d_ip_s0) + 0)
322         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
323
324         policy_0 = self.spd_add_rem_policy(  # inbound fast path, priority 10
325             1,
326             self.pg0,
327             self.pg1,
328             socket.IPPROTO_UDP,
329             is_out=0,
330             priority=10,
331             policy_type="bypass",
332             ip_range=True,
333             local_ip_start=d_ip_s0,
334             local_ip_stop=d_ip_e0,
335             remote_ip_start=s_ip_s0,
336             remote_ip_stop=s_ip_e0,
337         )
338         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
339             1,
340             self.pg0,
341             self.pg1,
342             socket.IPPROTO_UDP,
343             is_out=1,
344             priority=5,
345             policy_type="bypass",
346             ip_range=True,
347             local_ip_start=s_ip_s1,
348             local_ip_stop=s_ip_e1,
349             remote_ip_start=d_ip_s1,
350             remote_ip_stop=d_ip_e1,
351         )
352
353         # create the packet stream
354         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
355         # add the stream to the source interface + enable capture
356         self.pg0.add_stream(packets)
357         self.pg0.enable_capture()
358         self.pg1.enable_capture()
359         # start the packet generator
360         self.pg_start()
361         # get capture
362         capture = self.pg1.get_capture()
363         for packet in capture:
364             try:
365                 self.logger.debug(ppp("SPD - Got packet:", packet))
366             except Exception:
367                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
368                 raise
369         self.logger.debug("SPD: Num packets: %s", len(capture.res))
370
371         # assert nothing captured on pg0
372         self.pg0.assert_nothing_captured()
373         # verify captured packets
374         self.verify_capture(self.pg0, self.pg1, capture)
375         # verify all policies matched the expected number of times
376         self.verify_policy_match(pkt_count, policy_0)
377         self.verify_policy_match(pkt_count, policy_1)
378
379
380 class IPSec4SpdTestCaseAddAll(SpdFastPathInbound):
381     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
382         (add all ips ports rule)"""
383
384     def test_ipsec_spd_inbound_add(self):
385         # In this test case, packets in IPv4 FWD path are configured
386         # to go through IPSec inbound SPD policy lookup.
387         # 2 SPD rules (1 HIGH and 1 LOW) are added.
388         # Low priority rule action is set to BYPASS all ips.
389         # High priority rule action is set to DISCARD all ips.
390         # Traffic not sent on pg0 interface when HIGH discard priority rule is added.
391         # Then LOW priority
392         # rule is added and send the same traffic to pg0, this time expect
393         # the traffic is bypassed as bypass takes priority over discard.
394         self.create_interfaces(2)
395         pkt_count = 5
396         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
397
398         policy_0 = self.spd_add_rem_policy(  # inbound, priority 20
399             1,
400             self.pg0,
401             self.pg1,
402             socket.IPPROTO_UDP,
403             is_out=0,
404             priority=20,
405             policy_type="discard",
406             all_ips=True,
407         )
408
409         policy_1 = self.spd_add_rem_policy(  # inbound, priority 20
410             1,
411             self.pg0,
412             self.pg1,
413             socket.IPPROTO_UDP,
414             is_out=True,
415             priority=5,
416             policy_type="bypass",
417             all_ips=True,
418         )
419
420         # create the packet stream
421         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
422         # add the stream to the source interface + enable capture
423         self.pg0.add_stream(packets)
424         self.pg0.enable_capture()
425         self.pg1.enable_capture()
426         # start the packet generator
427         self.pg_start()
428         # assert nothing captured on pg0 and pg1
429         self.pg0.assert_nothing_captured()
430         self.pg1.assert_nothing_captured()
431
432         policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
433             1,
434             self.pg0,
435             self.pg1,
436             socket.IPPROTO_UDP,
437             is_out=0,
438             priority=10,
439             policy_type="bypass",
440             all_ips=True,
441         )
442
443         # create the packet stream
444         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
445         # add the stream to the source interface + enable capture
446         self.pg0.add_stream(packets)
447         self.pg0.enable_capture()
448         self.pg1.enable_capture()
449         # start the packet generator
450         self.pg_start()
451         # get capture
452         capture = self.pg1.get_capture(expected_count=pkt_count)
453         for packet in capture:
454             try:
455                 self.logger.debug(ppp("SPD - Got packet:", packet))
456             except Exception:
457                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
458                 raise
459         self.logger.debug("SPD: Num packets: %s", len(capture.res))
460
461         # assert nothing captured on pg0
462         self.pg0.assert_nothing_captured()
463         # verify all policies matched the expected number of times
464         self.verify_policy_match(pkt_count, policy_2)
465
466
467 class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
468     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
469         (remove rule)"""
470
471     def test_ipsec_spd_inbound_remove(self):
472         # In this test case, packets in IPv4 FWD path are configured
473         # to go through IPSec inbound SPD policy lookup.
474         # 2 SPD rules (1 HIGH and 1 LOW) are added.
475         # High priority rule action is set to BYPASS.
476         # Low priority rule action is set to DISCARD.
477         # High priority rule is then removed.
478         # Traffic sent on pg0 interface should match low priority
479         # rule and should be discarded after SPD lookup.
480         self.create_interfaces(2)
481         pkt_count = 5
482         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
483         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
484             1,
485             self.pg1,
486             self.pg0,
487             socket.IPPROTO_UDP,
488             is_out=0,
489             priority=10,
490             policy_type="bypass",
491         )
492         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
493             1,
494             self.pg1,
495             self.pg0,
496             socket.IPPROTO_UDP,
497             is_out=0,
498             priority=5,
499             policy_type="discard",
500         )
501
502         policy_out = self.spd_add_rem_policy(  # outbound, priority 10
503             1,
504             self.pg0,
505             self.pg1,
506             socket.IPPROTO_UDP,
507             is_out=1,
508             priority=10,
509             policy_type="bypass",
510         )
511
512         # create the packet stream
513         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
514         # add the stream to the source interface + enable capture
515         self.pg0.add_stream(packets)
516         self.pg0.enable_capture()
517         self.pg1.enable_capture()
518         # start the packet generator
519         self.pg_start()
520         # get capture
521         capture = self.pg1.get_capture()
522         for packet in capture:
523             try:
524                 self.logger.debug(ppp("SPD - Got packet:", packet))
525             except Exception:
526                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
527                 raise
528
529         # assert nothing captured on pg0
530         self.pg0.assert_nothing_captured()
531         # verify capture on pg1
532         self.logger.debug("SPD: Num packets: %s", len(capture.res))
533         self.verify_capture(self.pg0, self.pg1, capture)
534         # verify all policies matched the expected number of times
535         self.verify_policy_match(pkt_count, policy_0)
536         self.verify_policy_match(0, policy_1)
537         # now remove the bypass rule
538         self.spd_add_rem_policy(  # outbound, priority 10
539             1,
540             self.pg1,
541             self.pg0,
542             socket.IPPROTO_UDP,
543             is_out=0,
544             priority=10,
545             policy_type="bypass",
546             remove=True,
547         )
548
549         # resend the same packets
550         self.pg0.add_stream(packets)
551         self.pg0.enable_capture()  # flush the old captures
552         self.pg1.enable_capture()
553         self.pg_start()
554         # assert nothing captured on pg0
555         self.pg0.assert_nothing_captured()
556         # all packets will be dropped by SPD rule
557         self.pg1.assert_nothing_captured()
558         # verify all policies matched the expected number of times
559         self.verify_policy_match(pkt_count, policy_0)
560         self.verify_policy_match(pkt_count, policy_1)
561
562
563 class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
564     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
565         (add, remove, re-add)"""
566
567     def test_ipsec_spd_inbound_readd(self):
568         # In this test case, packets in IPv4 FWD path are configured
569         # to go through IPSec outbound SPD policy lookup.
570         # 2 SPD rules (1 HIGH and 1 LOW) are added.
571         # High priority rule action is set to BYPASS.
572         # Low priority rule action is set to DISCARD.
573         # Traffic sent on pg0 interface should match high priority
574         # rule and should be sent out on pg1 interface.
575         # High priority rule is then removed.
576         # Traffic sent on pg0 interface should match low priority
577         # rule and should be discarded after SPD lookup.
578         # Readd high priority rule.
579         # Traffic sent on pg0 interface should match high priority
580         # rule and should be sent out on pg1 interface.
581         self.create_interfaces(2)
582         pkt_count = 5
583         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
584         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
585             1,
586             self.pg1,
587             self.pg0,
588             socket.IPPROTO_UDP,
589             is_out=0,
590             priority=10,
591             policy_type="bypass",
592         )
593         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
594             1,
595             self.pg1,
596             self.pg0,
597             socket.IPPROTO_UDP,
598             is_out=0,
599             priority=5,
600             policy_type="discard",
601         )
602         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
603             1,
604             self.pg0,
605             self.pg1,
606             socket.IPPROTO_UDP,
607             is_out=1,
608             priority=10,
609             policy_type="bypass",
610         )
611
612         # create the packet stream
613         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
614         # add the stream to the source interface + enable capture
615         self.pg0.add_stream(packets)
616         self.pg0.enable_capture()
617         self.pg1.enable_capture()
618         # start the packet generator
619         self.pg_start()
620         # get capture
621         capture = self.pg1.get_capture()
622         for packet in capture:
623             try:
624                 self.logger.debug(ppp("SPD - Got packet:", packet))
625             except Exception:
626                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
627                 raise
628         self.logger.debug("SPD: Num packets: %s", len(capture.res))
629
630         # assert nothing captured on pg0
631         self.pg0.assert_nothing_captured()
632         # verify capture on pg1
633         self.verify_capture(self.pg0, self.pg1, capture)
634         # verify all policies matched the expected number of times
635         self.verify_policy_match(pkt_count, policy_0)
636         self.verify_policy_match(0, policy_1)
637         # remove the bypass rule, leaving only the discard rule
638         self.spd_add_rem_policy(  # inbound, priority 10
639             1,
640             self.pg1,
641             self.pg0,
642             socket.IPPROTO_UDP,
643             is_out=0,
644             priority=10,
645             policy_type="bypass",
646             remove=True,
647         )
648
649         # resend the same packets
650         self.pg0.add_stream(packets)
651         self.pg0.enable_capture()  # flush the old captures
652         self.pg1.enable_capture()
653         self.pg_start()
654
655         # assert nothing captured on pg0
656         self.pg0.assert_nothing_captured()
657         # all packets will be dropped by SPD rule
658         self.pg1.assert_nothing_captured()
659         # verify all policies matched the expected number of times
660         self.verify_policy_match(pkt_count, policy_0)
661         self.verify_policy_match(pkt_count, policy_1)
662
663         # now readd the bypass rule
664         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
665             1,
666             self.pg1,
667             self.pg0,
668             socket.IPPROTO_UDP,
669             is_out=0,
670             priority=10,
671             policy_type="bypass",
672         )
673
674         # resend the same packets
675         self.pg0.add_stream(packets)
676         self.pg0.enable_capture()  # flush the old captures
677         self.pg1.enable_capture()
678         self.pg_start()
679
680         # get capture
681         capture = self.pg1.get_capture(pkt_count)
682         for packet in capture:
683             try:
684                 self.logger.debug(ppp("SPD - Got packet:", packet))
685             except Exception:
686                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
687                 raise
688         self.logger.debug("SPD: Num packets: %s", len(capture.res))
689
690         # assert nothing captured on pg0
691         self.pg0.assert_nothing_captured()
692         # verify captured packets
693         self.verify_capture(self.pg0, self.pg1, capture)
694         # verify all policies matched the expected number of times
695         self.verify_policy_match(pkt_count, policy_0)
696         self.verify_policy_match(pkt_count, policy_1)
697
698
699 class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
700     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
701         (multiple interfaces, multiple rules)"""
702
703     def test_ipsec_spd_inbound_multiple(self):
704         # In this test case, packets in IPv4 FWD path are configured to go
705         # through IPSec outbound SPD policy lookup.
706         # Multiples rules on multiple interfaces are tested at the same time.
707         # 3x interfaces are configured, binding the same SPD to each.
708         # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
709         # On pg0 & pg1, the BYPASS rule is HIGH priority
710         # On pg2, the DISCARD rule is HIGH priority
711         # Traffic should be received on pg0 & pg1 and dropped on pg2.
712         self.create_interfaces(3)
713         pkt_count = 5
714         # bind SPD to all interfaces
715         self.spd_create_and_intf_add(1, self.pg_interfaces)
716         # add rules on all interfaces
717         policy_01 = self.spd_add_rem_policy(  # inbound, priority 10
718             1,
719             self.pg1,
720             self.pg0,
721             socket.IPPROTO_UDP,
722             is_out=0,
723             priority=10,
724             policy_type="bypass",
725         )
726         policy_02 = self.spd_add_rem_policy(  # inbound, priority 5
727             1,
728             self.pg1,
729             self.pg0,
730             socket.IPPROTO_UDP,
731             is_out=0,
732             priority=5,
733             policy_type="discard",
734         )
735
736         policy_11 = self.spd_add_rem_policy(  # inbound, priority 10
737             1,
738             self.pg2,
739             self.pg1,
740             socket.IPPROTO_UDP,
741             is_out=0,
742             priority=10,
743             policy_type="bypass",
744         )
745         policy_12 = self.spd_add_rem_policy(  # inbound, priority 5
746             1,
747             self.pg2,
748             self.pg1,
749             socket.IPPROTO_UDP,
750             is_out=0,
751             priority=5,
752             policy_type="discard",
753         )
754
755         policy_21 = self.spd_add_rem_policy(  # inbound, priority 5
756             1,
757             self.pg0,
758             self.pg2,
759             socket.IPPROTO_UDP,
760             is_out=0,
761             priority=5,
762             policy_type="bypass",
763         )
764         policy_22 = self.spd_add_rem_policy(  # inbound, priority 10
765             1,
766             self.pg0,
767             self.pg2,
768             socket.IPPROTO_UDP,
769             is_out=0,
770             priority=10,
771             policy_type="discard",
772         )
773
774         # interfaces bound to an SPD, will by default drop outbound
775         # traffic with no matching policies. add catch-all outbound
776         # bypass rule to SPD:
777         self.spd_add_rem_policy(  # outbound, all interfaces
778             1,
779             None,
780             None,
781             socket.IPPROTO_UDP,
782             is_out=1,
783             priority=10,
784             policy_type="bypass",
785             all_ips=True,
786         )
787
788         # create the packet streams
789         packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
790         packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
791         packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
792         # add the streams to the source interfaces
793         self.pg0.add_stream(packets0)
794         self.pg1.add_stream(packets1)
795         self.pg2.add_stream(packets2)
796         # enable capture on all interfaces
797         for pg in self.pg_interfaces:
798             pg.enable_capture()
799         # start the packet generator
800         self.pg_start()
801
802         # get captures
803         if_caps = []
804         for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
805             if_caps.append(pg.get_capture())
806             for packet in if_caps[-1]:
807                 try:
808                     self.logger.debug(ppp("SPD - Got packet:", packet))
809                 except Exception:
810                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
811                     raise
812         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
813         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
814
815         # verify captures that matched BYPASS rule
816         self.verify_capture(self.pg0, self.pg1, if_caps[0])
817         self.verify_capture(self.pg1, self.pg2, if_caps[1])
818         # verify that traffic to pg0 matched BYPASS rule
819         # although DISCARD rule had higher prioriy and was not dropped
820         self.verify_policy_match(pkt_count, policy_21)
821
822         # verify all packets that were expected to match rules, matched
823         # pg0 -> pg1
824         self.verify_policy_match(pkt_count, policy_01)
825         self.verify_policy_match(0, policy_02)
826         # pg1 -> pg2
827         self.verify_policy_match(pkt_count, policy_11)
828         self.verify_policy_match(0, policy_12)
829         # pg2 -> pg0
830         self.verify_policy_match(0, policy_22)
831
832
833 class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
834     """ IPSec/IPv6 inbound: Policy mode test case with fast path \
835     (add protect)"""
836
837     @classmethod
838     def setUpClass(cls):
839         super(IPSec6SpdTestCaseProtect, cls).setUpClass()
840
841     @classmethod
842     def tearDownClass(cls):
843         super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
844
845     def setUp(self):
846         super(IPSec6SpdTestCaseProtect, self).setUp()
847
848     def tearDown(self):
849         super(IPSec6SpdTestCaseProtect, self).tearDown()
850
851     def test_ipsec6_spd_inbound_protect(self):
852         pkt_count = 5
853         payload_size = 64
854         p = self.params[socket.AF_INET6]
855         send_pkts = self.gen_encrypt_pkts6(
856             p,
857             p.scapy_tra_sa,
858             self.tra_if,
859             src=self.tra_if.remote_ip6,
860             dst=self.tra_if.local_ip6,
861             count=pkt_count,
862             payload_size=payload_size,
863         )
864         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
865
866         self.logger.info(self.vapi.ppcli("show error"))
867         self.logger.info(self.vapi.ppcli("show ipsec all"))
868         pkts = p.tra_sa_in.get_stats()["packets"]
869         self.assertEqual(
870             pkts,
871             pkt_count,
872             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
873         )
874         pkts = p.tra_sa_out.get_stats()["packets"]
875         self.assertEqual(
876             pkts,
877             pkt_count,
878             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
879         )
880         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
881         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
882
883
884 if __name__ == "__main__":
885     unittest.main(testRunner=VppTestRunner)