http_static: misc bug fixes
[vpp.git] / test / test_ipsec_spd_fp_input.py
1 import socket
2 import unittest
3 import ipaddress
4
5 from util import ppp
6 from framework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
8 from template_ipsec import IPSecIPv6Fwd
9 from test_ipsec_esp import TemplateIpsecEsp
10
11
12 def debug_signal_handler(signal, frame):
13     import pdb
14
15     pdb.set_trace()
16
17
18 import signal
19
20 signal.signal(signal.SIGINT, debug_signal_handler)
21
22
23 class SpdFastPathInbound(IPSecIPv4Fwd):
24     # In test cases derived from this class, packets in IPv4 FWD path
25     # are configured to go through IPSec inbound SPD policy lookup.
26     # Note that order in which the rules are applied is
27     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
28     # precedence over DISCARD.
29     #
30     # Override setUpConstants to enable inbound fast path in config
31     @classmethod
32     def setUpConstants(cls):
33         super(SpdFastPathInbound, cls).setUpConstants()
34         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
35         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
36
37
38 class SpdFastPathInboundProtect(TemplateIpsecEsp):
39     @classmethod
40     def setUpConstants(cls):
41         super(SpdFastPathInboundProtect, cls).setUpConstants()
42         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"])
43         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
44
45     @classmethod
46     def setUpClass(cls):
47         super(SpdFastPathInboundProtect, cls).setUpClass()
48
49     @classmethod
50     def tearDownClass(cls):
51         super(SpdFastPathInboundProtect, cls).tearDownClass()
52
53     def setUp(self):
54         super(SpdFastPathInboundProtect, self).setUp()
55
56     def tearDown(self):
57         self.unconfig_network()
58         super(SpdFastPathInboundProtect, self).tearDown()
59
60
61 class SpdFastPathIPv6Inbound(IPSecIPv6Fwd):
62     # In test cases derived from this class, packets in IPvr6 FWD path
63     # are configured to go through IPSec inbound SPD policy lookup.
64     # Note that order in which the rules are applied is
65     # PROTECT, BYPASS, DISCARD. Therefore BYPASS rules take
66     # precedence over DISCARDi.
67
68     # Override setUpConstants to enable inbound fast path in config
69     @classmethod
70     def setUpConstants(cls):
71         super(SpdFastPathIPv6Inbound, cls).setUpConstants()
72         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
73         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
74
75
76 class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp):
77     @classmethod
78     def setUpConstants(cls):
79         super(SpdFastPathIPv6InboundProtect, cls).setUpConstants()
80         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"])
81         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
82
83     @classmethod
84     def setUpClass(cls):
85         super(SpdFastPathIPv6InboundProtect, cls).setUpClass()
86
87     @classmethod
88     def tearDownClass(cls):
89         super(SpdFastPathIPv6InboundProtect, cls).tearDownClass()
90
91     def setUp(self):
92         super(SpdFastPathIPv6InboundProtect, self).setUp()
93
94     def tearDown(self):
95         self.unconfig_network()
96         super(SpdFastPathIPv6InboundProtect, self).tearDown()
97
98
99 class IPSec4SpdTestCaseBypass(SpdFastPathInbound):
100     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
101         (add bypass)"""
102
103     def test_ipsec_spd_inbound_bypass(self):
104         # In this test case, packets in IPv4 FWD path are configured
105         # to go through IPSec inbound SPD policy lookup.
106         #
107         # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
108         # - High priority rule action is set to DISCARD.
109         # - Low priority rule action is set to BYPASS.
110         #
111         # Since BYPASS rules take precedence over DISCARD
112         # (the order being PROTECT, BYPASS, DISCARD) we expect the
113         # BYPASS rule to match and traffic to be correctly forwarded.
114         self.create_interfaces(2)
115         pkt_count = 5
116
117         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
118
119         # create input rules
120         # bypass rule should take precedence over discard rule,
121         # even though it's lower priority
122         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
123             1,
124             self.pg1,
125             self.pg0,
126             socket.IPPROTO_UDP,
127             is_out=0,
128             priority=10,
129             policy_type="bypass",
130             ip_range=True,
131             local_ip_start=self.pg0.remote_ip4,
132             local_ip_stop=self.pg0.remote_ip4,
133             remote_ip_start=self.pg1.remote_ip4,
134             remote_ip_stop=self.pg1.remote_ip4,
135         )
136         policy_1 = self.spd_add_rem_policy(  # inbound, priority 15
137             1,
138             self.pg1,
139             self.pg0,
140             socket.IPPROTO_UDP,
141             is_out=0,
142             priority=15,
143             policy_type="discard",
144             ip_range=True,
145             local_ip_start=self.pg0.remote_ip4,
146             local_ip_stop=self.pg0.remote_ip4,
147             remote_ip_start=self.pg1.remote_ip4,
148             remote_ip_stop=self.pg1.remote_ip4,
149         )
150
151         # create output rule so we can capture forwarded packets
152         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
153             1,
154             self.pg0,
155             self.pg1,
156             socket.IPPROTO_UDP,
157             is_out=1,
158             priority=10,
159             policy_type="bypass",
160         )
161
162         # create the packet stream
163         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
164         # add the stream to the source interface
165         self.pg0.add_stream(packets)
166         self.pg1.enable_capture()
167         self.pg_start()
168
169         # check capture on pg1
170         capture = self.pg1.get_capture()
171         for packet in capture:
172             try:
173                 self.logger.debug(ppp("SPD Add - Got packet:", packet))
174             except Exception:
175                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
176                 raise
177         self.logger.debug("SPD: Num packets: %s", len(capture.res))
178
179         # verify captured packets
180         self.verify_capture(self.pg0, self.pg1, capture)
181         # verify all policies matched the expected number of times
182         self.verify_policy_match(pkt_count, policy_0)
183         self.verify_policy_match(0, policy_1)
184         self.verify_policy_match(pkt_count, policy_2)
185
186
187 class IPSec4SpdTestCaseDiscard(SpdFastPathInbound):
188     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
189             (add discard)"""
190
191     def test_ipsec_spd_inbound_discard(self):
192         # In this test case, packets in IPv4 FWD path are configured
193         # to go through IPSec inbound SPD policy lookup.
194         #
195         #  Rule action is set to DISCARD.
196
197         self.create_interfaces(2)
198         pkt_count = 5
199
200         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
201
202         # create input rules
203         # bypass rule should take precedence over discard rule,
204         # even though it's lower priority
205         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
206             1,
207             self.pg0,
208             self.pg1,
209             socket.IPPROTO_UDP,
210             is_out=0,
211             priority=10,
212             policy_type="discard",
213         )
214
215         # create output rule so we can capture forwarded packets
216         policy_1 = self.spd_add_rem_policy(  # outbound, priority 10
217             1,
218             self.pg0,
219             self.pg1,
220             socket.IPPROTO_UDP,
221             is_out=1,
222             priority=10,
223             policy_type="bypass",
224         )
225
226         # create the packet stream
227         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
228         # add the stream to the source interface
229         self.pg0.add_stream(packets)
230         self.pg1.enable_capture()
231         self.pg_start()
232
233         # check capture on pg1
234         capture = self.pg1.assert_nothing_captured()
235
236         # verify all policies matched the expected number of times
237         self.verify_policy_match(pkt_count, policy_0)
238         self.verify_policy_match(0, policy_1)
239
240
241 class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect):
242     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
243     (add protect)"""
244
245     @classmethod
246     def setUpClass(cls):
247         super(IPSec4SpdTestCaseProtect, cls).setUpClass()
248
249     @classmethod
250     def tearDownClass(cls):
251         super(IPSec4SpdTestCaseProtect, cls).tearDownClass()
252
253     def setUp(self):
254         super(IPSec4SpdTestCaseProtect, self).setUp()
255
256     def tearDown(self):
257         super(IPSec4SpdTestCaseProtect, self).tearDown()
258
259     def test_ipsec_spd_inbound_protect(self):
260         # In this test case, encrypted packets in IPv4
261         # PROTECT path are configured
262         # to go through IPSec inbound SPD policy lookup.
263
264         pkt_count = 5
265         payload_size = 64
266         p = self.params[socket.AF_INET]
267         send_pkts = self.gen_encrypt_pkts(
268             p,
269             p.scapy_tra_sa,
270             self.tra_if,
271             src=self.tra_if.local_ip4,
272             dst=self.tra_if.remote_ip4,
273             count=pkt_count,
274             payload_size=payload_size,
275         )
276         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
277
278         self.logger.info(self.vapi.ppcli("show error"))
279         self.logger.info(self.vapi.ppcli("show ipsec all"))
280
281         pkts = p.tra_sa_in.get_stats()["packets"]
282         self.assertEqual(
283             pkts,
284             pkt_count,
285             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
286         )
287         pkts = p.tra_sa_out.get_stats()["packets"]
288         self.assertEqual(
289             pkts,
290             pkt_count,
291             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
292         )
293         self.assertEqual(p.tra_sa_out.get_lost(), 0)
294         self.assertEqual(p.tra_sa_in.get_lost(), 0)
295
296
297 class IPSec4SpdTestCaseAddIPRange(SpdFastPathInbound):
298     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
299         (add  ips  range with any port rule)"""
300
301     def test_ipsec_spd_inbound_add(self):
302         # In this test case, packets in IPv4 FWD path are configured
303         # to go through IPSec inbound SPD policy lookup.
304         # 2 SPD bypass rules (1 for inbound and 1 for outbound) are added.
305         # Traffic sent on pg0 interface should match fast path priority
306         # rule and should be sent out on pg1 interface.
307         self.create_interfaces(2)
308         pkt_count = 5
309         s_ip_s1 = ipaddress.ip_address(self.pg0.remote_ip4)
310         s_ip_e1 = ipaddress.ip_address(int(s_ip_s1) + 5)
311         d_ip_s1 = ipaddress.ip_address(self.pg1.remote_ip4)
312         d_ip_e1 = ipaddress.ip_address(int(d_ip_s1) + 0)
313
314         s_ip_s0 = ipaddress.ip_address(self.pg0.remote_ip4)
315         s_ip_e0 = ipaddress.ip_address(int(s_ip_s0) + 6)
316         d_ip_s0 = ipaddress.ip_address(self.pg1.remote_ip4)
317         d_ip_e0 = ipaddress.ip_address(int(d_ip_s0) + 0)
318         self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
319
320         policy_0 = self.spd_add_rem_policy(  # inbound fast path, priority 10
321             1,
322             self.pg0,
323             self.pg1,
324             socket.IPPROTO_UDP,
325             is_out=0,
326             priority=10,
327             policy_type="bypass",
328             ip_range=True,
329             local_ip_start=s_ip_s0,
330             local_ip_stop=s_ip_e0,
331             remote_ip_start=d_ip_s0,
332             remote_ip_stop=d_ip_e0,
333         )
334         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
335             1,
336             self.pg0,
337             self.pg1,
338             socket.IPPROTO_UDP,
339             is_out=1,
340             priority=5,
341             policy_type="bypass",
342             ip_range=True,
343             local_ip_start=s_ip_s1,
344             local_ip_stop=s_ip_e1,
345             remote_ip_start=d_ip_s1,
346             remote_ip_stop=d_ip_e1,
347         )
348
349         # create the packet stream
350         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
351         # add the stream to the source interface + enable capture
352         self.pg0.add_stream(packets)
353         self.pg0.enable_capture()
354         self.pg1.enable_capture()
355         # start the packet generator
356         self.pg_start()
357         # get capture
358         capture = self.pg1.get_capture()
359         for packet in capture:
360             try:
361                 self.logger.debug(ppp("SPD - Got packet:", packet))
362             except Exception:
363                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
364                 raise
365         self.logger.debug("SPD: Num packets: %s", len(capture.res))
366
367         # assert nothing captured on pg0
368         self.pg0.assert_nothing_captured()
369         # verify captured packets
370         self.verify_capture(self.pg0, self.pg1, capture)
371         # verify all policies matched the expected number of times
372         self.verify_policy_match(pkt_count, policy_0)
373         self.verify_policy_match(pkt_count, policy_1)
374
375
376 class IPSec4SpdTestCaseAddAll(SpdFastPathInbound):
377     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
378         (add all ips ports rule)"""
379
380     def test_ipsec_spd_inbound_add(self):
381         # In this test case, packets in IPv4 FWD path are configured
382         # to go through IPSec inbound SPD policy lookup.
383         # 2 SPD rules (1 HIGH and 1 LOW) are added.
384         # Low priority rule action is set to BYPASS all ips.
385         # High priority rule action is set to DISCARD all ips.
386         # Traffic not sent on pg0 interface when HIGH discard priority rule is added.
387         # Then LOW priority
388         # rule is added and send the same traffic to pg0, this time expect
389         # the traffic is bypassed as bypass takes priority over discard.
390         self.create_interfaces(2)
391         pkt_count = 5
392         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
393
394         policy_0 = self.spd_add_rem_policy(  # inbound, priority 20
395             1,
396             self.pg0,
397             self.pg1,
398             socket.IPPROTO_UDP,
399             is_out=0,
400             priority=20,
401             policy_type="discard",
402             all_ips=True,
403         )
404
405         policy_1 = self.spd_add_rem_policy(  # inbound, priority 20
406             1,
407             self.pg0,
408             self.pg1,
409             socket.IPPROTO_UDP,
410             is_out=True,
411             priority=5,
412             policy_type="bypass",
413             all_ips=True,
414         )
415
416         # create the packet stream
417         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
418         # add the stream to the source interface + enable capture
419         self.pg0.add_stream(packets)
420         self.pg0.enable_capture()
421         self.pg1.enable_capture()
422         # start the packet generator
423         self.pg_start()
424         # assert nothing captured on pg0 and pg1
425         self.pg0.assert_nothing_captured()
426         self.pg1.assert_nothing_captured()
427
428         policy_2 = self.spd_add_rem_policy(  # inbound, priority 10
429             1,
430             self.pg0,
431             self.pg1,
432             socket.IPPROTO_UDP,
433             is_out=0,
434             priority=10,
435             policy_type="bypass",
436             all_ips=True,
437         )
438
439         # create the packet stream
440         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
441         # add the stream to the source interface + enable capture
442         self.pg0.add_stream(packets)
443         self.pg0.enable_capture()
444         self.pg1.enable_capture()
445         # start the packet generator
446         self.pg_start()
447         # get capture
448         capture = self.pg1.get_capture(expected_count=pkt_count)
449         for packet in capture:
450             try:
451                 self.logger.debug(ppp("SPD - Got packet:", packet))
452             except Exception:
453                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
454                 raise
455         self.logger.debug("SPD: Num packets: %s", len(capture.res))
456
457         # assert nothing captured on pg0
458         self.pg0.assert_nothing_captured()
459         # verify all policies matched the expected number of times
460         self.verify_policy_match(pkt_count, policy_2)
461
462
463 class IPSec4SpdTestCaseRemove(SpdFastPathInbound):
464     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
465         (remove rule)"""
466
467     def test_ipsec_spd_inbound_remove(self):
468         # In this test case, packets in IPv4 FWD path are configured
469         # to go through IPSec inbound SPD policy lookup.
470         # 2 SPD rules (1 HIGH and 1 LOW) are added.
471         # High priority rule action is set to BYPASS.
472         # Low priority rule action is set to DISCARD.
473         # High priority rule is then removed.
474         # Traffic sent on pg0 interface should match low priority
475         # rule and should be discarded after SPD lookup.
476         self.create_interfaces(2)
477         pkt_count = 5
478         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
479         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
480             1,
481             self.pg0,
482             self.pg1,
483             socket.IPPROTO_UDP,
484             is_out=0,
485             priority=10,
486             policy_type="bypass",
487         )
488         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
489             1,
490             self.pg0,
491             self.pg1,
492             socket.IPPROTO_UDP,
493             is_out=0,
494             priority=5,
495             policy_type="discard",
496         )
497
498         policy_out = self.spd_add_rem_policy(  # outbound, priority 10
499             1,
500             self.pg0,
501             self.pg1,
502             socket.IPPROTO_UDP,
503             is_out=1,
504             priority=10,
505             policy_type="bypass",
506         )
507
508         # create the packet stream
509         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
510         # add the stream to the source interface + enable capture
511         self.pg0.add_stream(packets)
512         self.pg0.enable_capture()
513         self.pg1.enable_capture()
514         # start the packet generator
515         self.pg_start()
516         # get capture
517         capture = self.pg1.get_capture()
518         for packet in capture:
519             try:
520                 self.logger.debug(ppp("SPD - Got packet:", packet))
521             except Exception:
522                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
523                 raise
524
525         # assert nothing captured on pg0
526         self.pg0.assert_nothing_captured()
527         # verify capture on pg1
528         self.logger.debug("SPD: Num packets: %s", len(capture.res))
529         self.verify_capture(self.pg0, self.pg1, capture)
530         # verify all policies matched the expected number of times
531         self.verify_policy_match(pkt_count, policy_0)
532         self.verify_policy_match(0, policy_1)
533         # now remove the bypass rule
534         self.spd_add_rem_policy(  # outbound, priority 10
535             1,
536             self.pg0,
537             self.pg1,
538             socket.IPPROTO_UDP,
539             is_out=0,
540             priority=10,
541             policy_type="bypass",
542             remove=True,
543         )
544
545         # resend the same packets
546         self.pg0.add_stream(packets)
547         self.pg0.enable_capture()  # flush the old captures
548         self.pg1.enable_capture()
549         self.pg_start()
550         # assert nothing captured on pg0
551         self.pg0.assert_nothing_captured()
552         # all packets will be dropped by SPD rule
553         self.pg1.assert_nothing_captured()
554         # verify all policies matched the expected number of times
555         self.verify_policy_match(pkt_count, policy_0)
556         self.verify_policy_match(pkt_count, policy_1)
557
558
559 class IPSec4SpdTestCaseReadd(SpdFastPathInbound):
560     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
561         (add, remove, re-add)"""
562
563     def test_ipsec_spd_inbound_readd(self):
564         # In this test case, packets in IPv4 FWD path are configured
565         # to go through IPSec outbound SPD policy lookup.
566         # 2 SPD rules (1 HIGH and 1 LOW) are added.
567         # High priority rule action is set to BYPASS.
568         # Low priority rule action is set to DISCARD.
569         # Traffic sent on pg0 interface should match high priority
570         # rule and should be sent out on pg1 interface.
571         # High priority rule is then removed.
572         # Traffic sent on pg0 interface should match low priority
573         # rule and should be discarded after SPD lookup.
574         # Readd high priority rule.
575         # Traffic sent on pg0 interface should match high priority
576         # rule and should be sent out on pg1 interface.
577         self.create_interfaces(2)
578         pkt_count = 5
579         self.spd_create_and_intf_add(1, [self.pg0, self.pg1])
580         policy_0 = self.spd_add_rem_policy(  # inbound, priority 10
581             1,
582             self.pg0,
583             self.pg1,
584             socket.IPPROTO_UDP,
585             is_out=0,
586             priority=10,
587             policy_type="bypass",
588         )
589         policy_1 = self.spd_add_rem_policy(  # inbound, priority 5
590             1,
591             self.pg0,
592             self.pg1,
593             socket.IPPROTO_UDP,
594             is_out=0,
595             priority=5,
596             policy_type="discard",
597         )
598         policy_2 = self.spd_add_rem_policy(  # outbound, priority 10
599             1,
600             self.pg0,
601             self.pg1,
602             socket.IPPROTO_UDP,
603             is_out=1,
604             priority=10,
605             policy_type="bypass",
606         )
607
608         # create the packet stream
609         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
610         # add the stream to the source interface + enable capture
611         self.pg0.add_stream(packets)
612         self.pg0.enable_capture()
613         self.pg1.enable_capture()
614         # start the packet generator
615         self.pg_start()
616         # get capture
617         capture = self.pg1.get_capture()
618         for packet in capture:
619             try:
620                 self.logger.debug(ppp("SPD - Got packet:", packet))
621             except Exception:
622                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
623                 raise
624         self.logger.debug("SPD: Num packets: %s", len(capture.res))
625
626         # assert nothing captured on pg0
627         self.pg0.assert_nothing_captured()
628         # verify capture on pg1
629         self.verify_capture(self.pg0, self.pg1, capture)
630         # verify all policies matched the expected number of times
631         self.verify_policy_match(pkt_count, policy_0)
632         self.verify_policy_match(0, policy_1)
633         # remove the bypass rule, leaving only the discard rule
634         self.spd_add_rem_policy(  # inbound, priority 10
635             1,
636             self.pg0,
637             self.pg1,
638             socket.IPPROTO_UDP,
639             is_out=0,
640             priority=10,
641             policy_type="bypass",
642             remove=True,
643         )
644
645         # resend the same packets
646         self.pg0.add_stream(packets)
647         self.pg0.enable_capture()  # flush the old captures
648         self.pg1.enable_capture()
649         self.pg_start()
650
651         # assert nothing captured on pg0
652         self.pg0.assert_nothing_captured()
653         # all packets will be dropped by SPD rule
654         self.pg1.assert_nothing_captured()
655         # verify all policies matched the expected number of times
656         self.verify_policy_match(pkt_count, policy_0)
657         self.verify_policy_match(pkt_count, policy_1)
658
659         # now readd the bypass rule
660         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
661             1,
662             self.pg0,
663             self.pg1,
664             socket.IPPROTO_UDP,
665             is_out=0,
666             priority=10,
667             policy_type="bypass",
668         )
669
670         # resend the same packets
671         self.pg0.add_stream(packets)
672         self.pg0.enable_capture()  # flush the old captures
673         self.pg1.enable_capture()
674         self.pg_start()
675
676         # get capture
677         capture = self.pg1.get_capture(pkt_count)
678         for packet in capture:
679             try:
680                 self.logger.debug(ppp("SPD - Got packet:", packet))
681             except Exception:
682                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
683                 raise
684         self.logger.debug("SPD: Num packets: %s", len(capture.res))
685
686         # assert nothing captured on pg0
687         self.pg0.assert_nothing_captured()
688         # verify captured packets
689         self.verify_capture(self.pg0, self.pg1, capture)
690         # verify all policies matched the expected number of times
691         self.verify_policy_match(pkt_count, policy_0)
692         self.verify_policy_match(pkt_count, policy_1)
693
694
695 class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
696     """ IPSec/IPv4 inbound: Policy mode test case with fast path \
697         (multiple interfaces, multiple rules)"""
698
699     def test_ipsec_spd_inbound_multiple(self):
700         # In this test case, packets in IPv4 FWD path are configured to go
701         # through IPSec outbound SPD policy lookup.
702         # Multiples rules on multiple interfaces are tested at the same time.
703         # 3x interfaces are configured, binding the same SPD to each.
704         # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
705         # On pg0 & pg1, the BYPASS rule is HIGH priority
706         # On pg2, the DISCARD rule is HIGH priority
707         # Traffic should be received on pg0 & pg1 and dropped on pg2.
708         self.create_interfaces(3)
709         pkt_count = 5
710         # bind SPD to all interfaces
711         self.spd_create_and_intf_add(1, self.pg_interfaces)
712         # add rules on all interfaces
713         policy_01 = self.spd_add_rem_policy(  # inbound, priority 10
714             1,
715             self.pg0,
716             self.pg1,
717             socket.IPPROTO_UDP,
718             is_out=0,
719             priority=10,
720             policy_type="bypass",
721         )
722         policy_02 = self.spd_add_rem_policy(  # inbound, priority 5
723             1,
724             self.pg0,
725             self.pg1,
726             socket.IPPROTO_UDP,
727             is_out=0,
728             priority=5,
729             policy_type="discard",
730         )
731
732         policy_11 = self.spd_add_rem_policy(  # inbound, priority 10
733             1,
734             self.pg1,
735             self.pg2,
736             socket.IPPROTO_UDP,
737             is_out=0,
738             priority=10,
739             policy_type="bypass",
740         )
741         policy_12 = self.spd_add_rem_policy(  # inbound, priority 5
742             1,
743             self.pg1,
744             self.pg2,
745             socket.IPPROTO_UDP,
746             is_out=0,
747             priority=5,
748             policy_type="discard",
749         )
750
751         policy_21 = self.spd_add_rem_policy(  # inbound, priority 5
752             1,
753             self.pg2,
754             self.pg0,
755             socket.IPPROTO_UDP,
756             is_out=0,
757             priority=5,
758             policy_type="bypass",
759         )
760         policy_22 = self.spd_add_rem_policy(  # inbound, priority 10
761             1,
762             self.pg2,
763             self.pg0,
764             socket.IPPROTO_UDP,
765             is_out=0,
766             priority=10,
767             policy_type="discard",
768         )
769
770         # interfaces bound to an SPD, will by default drop outbound
771         # traffic with no matching policies. add catch-all outbound
772         # bypass rule to SPD:
773         self.spd_add_rem_policy(  # outbound, all interfaces
774             1,
775             None,
776             None,
777             socket.IPPROTO_UDP,
778             is_out=1,
779             priority=10,
780             policy_type="bypass",
781             all_ips=True,
782         )
783
784         # create the packet streams
785         packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
786         packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
787         packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
788         # add the streams to the source interfaces
789         self.pg0.add_stream(packets0)
790         self.pg1.add_stream(packets1)
791         self.pg2.add_stream(packets2)
792         # enable capture on all interfaces
793         for pg in self.pg_interfaces:
794             pg.enable_capture()
795         # start the packet generator
796         self.pg_start()
797
798         # get captures
799         if_caps = []
800         for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
801             if_caps.append(pg.get_capture())
802             for packet in if_caps[-1]:
803                 try:
804                     self.logger.debug(ppp("SPD - Got packet:", packet))
805                 except Exception:
806                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
807                     raise
808         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
809         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
810
811         # verify captures that matched BYPASS rule
812         self.verify_capture(self.pg0, self.pg1, if_caps[0])
813         self.verify_capture(self.pg1, self.pg2, if_caps[1])
814         # verify that traffic to pg0 matched BYPASS rule
815         # although DISCARD rule had higher prioriy and was not dropped
816         self.verify_policy_match(pkt_count, policy_21)
817
818         # verify all packets that were expected to match rules, matched
819         # pg0 -> pg1
820         self.verify_policy_match(pkt_count, policy_01)
821         self.verify_policy_match(0, policy_02)
822         # pg1 -> pg2
823         self.verify_policy_match(pkt_count, policy_11)
824         self.verify_policy_match(0, policy_12)
825         # pg2 -> pg0
826         self.verify_policy_match(0, policy_22)
827
828
829 class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
830     """ IPSec/IPv6 inbound: Policy mode test case with fast path \
831     (add protect)"""
832
833     @classmethod
834     def setUpClass(cls):
835         super(IPSec6SpdTestCaseProtect, cls).setUpClass()
836
837     @classmethod
838     def tearDownClass(cls):
839         super(IPSec6SpdTestCaseProtect, cls).tearDownClass()
840
841     def setUp(self):
842         super(IPSec6SpdTestCaseProtect, self).setUp()
843
844     def tearDown(self):
845         super(IPSec6SpdTestCaseProtect, self).tearDown()
846
847     def test_ipsec6_spd_inbound_protect(self):
848         pkt_count = 5
849         payload_size = 64
850         p = self.params[socket.AF_INET6]
851         send_pkts = self.gen_encrypt_pkts6(
852             p,
853             p.scapy_tra_sa,
854             self.tra_if,
855             src=self.tra_if.local_ip6,
856             dst=self.tra_if.remote_ip6,
857             count=pkt_count,
858             payload_size=payload_size,
859         )
860         recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
861
862         self.logger.info(self.vapi.ppcli("show error"))
863         self.logger.info(self.vapi.ppcli("show ipsec all"))
864         pkts = p.tra_sa_in.get_stats()["packets"]
865         self.assertEqual(
866             pkts,
867             pkt_count,
868             "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
869         )
870         pkts = p.tra_sa_out.get_stats()["packets"]
871         self.assertEqual(
872             pkts,
873             pkt_count,
874             "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts),
875         )
876         self.assertEqual(p.tra_sa_out.get_lost(), 0)
877         self.assertEqual(p.tra_sa_in.get_lost(), 0)
878
879
880 if __name__ == "__main__":
881     unittest.main(testRunner=VppTestRunner)