tests: add fast path ipv6 python tests for outbound policy matching
[vpp.git] / test / test_ipsec_spd_fp_output.py
1 import socket
2 import unittest
3 import ipaddress
4
5 from util import ppp
6 from framework import VppTestRunner
7 from template_ipsec import IPSecIPv4Fwd
8 from template_ipsec import IPSecIPv6Fwd
9
10
11 class SpdFastPathOutbound(IPSecIPv4Fwd):
12     # Override setUpConstants to enable outbound fast path in config
13     @classmethod
14     def setUpConstants(cls):
15         super(SpdFastPathOutbound, cls).setUpConstants()
16         cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
17         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
18
19
20 class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
21     # Override setUpConstants to enable outbound fast path in config
22     @classmethod
23     def setUpConstants(cls):
24         super(SpdFastPathIPv6Outbound, cls).setUpConstants()
25         cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
26         cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
27
28
29 class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
30     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
31         (add rule)"""
32
33     def test_ipsec_spd_outbound_add(self):
34         # In this test case, packets in IPv4 FWD path are configured
35         # to go through IPSec outbound SPD policy lookup.
36         # 2 SPD rules (1 HIGH and 1 LOW) are added.
37         # High priority rule action is set to BYPASS.
38         # Low priority rule action is set to DISCARD.
39         # Traffic sent on pg0 interface should match high priority
40         # rule and should be sent out on pg1 interface.
41         self.create_interfaces(2)
42         pkt_count = 5
43         s_port_s = 1111
44         s_port_e = 1111
45         d_port_s = 2222
46         d_port_e = 2222
47         self.spd_create_and_intf_add(1, [self.pg1])
48         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
49             1,
50             self.pg0,
51             self.pg1,
52             socket.IPPROTO_UDP,
53             is_out=1,
54             priority=10,
55             policy_type="bypass",
56             local_port_start=s_port_s,
57             local_port_stop=s_port_e,
58             remote_port_start=d_port_s,
59             remote_port_stop=d_port_e,
60         )
61         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
62             1,
63             self.pg0,
64             self.pg1,
65             socket.IPPROTO_UDP,
66             is_out=1,
67             priority=5,
68             policy_type="discard",
69             local_port_start=s_port_s,
70             local_port_stop=s_port_e,
71             remote_port_start=d_port_s,
72             remote_port_stop=d_port_e,
73         )
74
75         # create the packet stream
76         packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
77         # add the stream to the source interface + enable capture
78         self.pg0.add_stream(packets)
79         self.pg0.enable_capture()
80         self.pg1.enable_capture()
81         # start the packet generator
82         self.pg_start()
83         # get capture
84         capture = self.pg1.get_capture()
85         for packet in capture:
86             try:
87                 self.logger.debug(ppp("SPD - Got packet:", packet))
88             except Exception:
89                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
90                 raise
91         self.logger.debug("SPD: Num packets: %s", len(capture.res))
92
93         # assert nothing captured on pg0
94         self.pg0.assert_nothing_captured()
95         # verify captured packets
96         self.verify_capture(self.pg0, self.pg1, capture)
97         # verify all policies matched the expected number of times
98         self.verify_policy_match(pkt_count, policy_0)
99         self.verify_policy_match(0, policy_1)
100
101
102 class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
103     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
104         (add all ips port range rule)"""
105
106     def test_ipsec_spd_outbound_add(self):
107         # In this test case, packets in IPv4 FWD path are configured
108         # to go through IPSec outbound SPD policy lookup.
109         # 2 SPD rules (1 HIGH and 1 LOW) are added.
110         # High priority rule action is set to BYPASS.
111         # Low priority rule action is set to DISCARD.
112         # Traffic sent on pg0 interface should match high priority
113         # rule and should be sent out on pg1 interface.
114         self.create_interfaces(2)
115         pkt_count = 5
116         s_port_s = 1000
117         s_port_e = 2023
118         d_port_s = 5000
119         d_port_e = 6023
120         self.spd_create_and_intf_add(1, [self.pg1])
121         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
122             1,
123             self.pg0,
124             self.pg1,
125             socket.IPPROTO_UDP,
126             is_out=1,
127             priority=10,
128             policy_type="bypass",
129             all_ips=True,
130             local_port_start=s_port_s,
131             local_port_stop=s_port_e,
132             remote_port_start=d_port_s,
133             remote_port_stop=d_port_e,
134         )
135         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
136             1,
137             self.pg0,
138             self.pg1,
139             socket.IPPROTO_UDP,
140             is_out=1,
141             priority=5,
142             policy_type="discard",
143             all_ips=True,
144             local_port_start=s_port_s,
145             local_port_stop=s_port_e,
146             remote_port_start=d_port_s,
147             remote_port_stop=d_port_e,
148         )
149
150         # create the packet stream
151         packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
152         # add the stream to the source interface + enable capture
153         self.pg0.add_stream(packets)
154         self.pg0.enable_capture()
155         self.pg1.enable_capture()
156         # start the packet generator
157         self.pg_start()
158         # get capture
159         capture = self.pg1.get_capture()
160         for packet in capture:
161             try:
162                 self.logger.debug(ppp("SPD - Got packet:", packet))
163             except Exception:
164                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
165                 raise
166         self.logger.debug("SPD: Num packets: %s", len(capture.res))
167
168         # assert nothing captured on pg0
169         self.pg0.assert_nothing_captured()
170         # verify captured packets
171         self.verify_capture(self.pg0, self.pg1, capture)
172         # verify all policies matched the expected number of times
173         self.verify_policy_match(pkt_count, policy_0)
174         self.verify_policy_match(0, policy_1)
175
176
177 class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
178     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
179         (add  ips  range with any port rule)"""
180
181     def test_ipsec_spd_outbound_add(self):
182         # In this test case, packets in IPv4 FWD path are configured
183         # to go through IPSec outbound SPD policy lookup.
184         # 2 SPD rules (1 HIGH and 1 LOW) are added.
185         # High priority rule action is set to BYPASS.
186         # Low priority rule action is set to DISCARD.
187         # Traffic sent on pg0 interface should match high priority
188         # rule and should be sent out on pg1 interface.
189         self.create_interfaces(2)
190         pkt_count = 5
191         s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
192         s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
193         d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
194         d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
195         self.spd_create_and_intf_add(1, [self.pg1])
196         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
197             1,
198             self.pg0,
199             self.pg1,
200             socket.IPPROTO_UDP,
201             is_out=1,
202             priority=10,
203             policy_type="bypass",
204             ip_range=True,
205             local_ip_start=s_ip_s,
206             local_ip_stop=s_ip_e,
207             remote_ip_start=d_ip_s,
208             remote_ip_stop=d_ip_e,
209         )
210         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
211             1,
212             self.pg0,
213             self.pg1,
214             socket.IPPROTO_UDP,
215             is_out=1,
216             priority=5,
217             policy_type="discard",
218             ip_range=True,
219             local_ip_start=s_ip_s,
220             local_ip_stop=s_ip_e,
221             remote_ip_start=d_ip_s,
222             remote_ip_stop=d_ip_e,
223         )
224
225         # create the packet stream
226         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
227         # add the stream to the source interface + enable capture
228         self.pg0.add_stream(packets)
229         self.pg0.enable_capture()
230         self.pg1.enable_capture()
231         # start the packet generator
232         self.pg_start()
233         # get capture
234         capture = self.pg1.get_capture()
235         for packet in capture:
236             try:
237                 self.logger.debug(ppp("SPD - Got packet:", packet))
238             except Exception:
239                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
240                 raise
241         self.logger.debug("SPD: Num packets: %s", len(capture.res))
242
243         # assert nothing captured on pg0
244         self.pg0.assert_nothing_captured()
245         # verify captured packets
246         self.verify_capture(self.pg0, self.pg1, capture)
247         # verify all policies matched the expected number of times
248         self.verify_policy_match(pkt_count, policy_0)
249         self.verify_policy_match(0, policy_1)
250
251
252 class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
253     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
254         (add all ips  range rule)"""
255
256     def test_ipsec_spd_outbound_add(self):
257         # In this test case, packets in IPv4 FWD path are configured
258         # to go through IPSec outbound SPD policy lookup.
259         # 2 SPD rules (1 HIGH and 1 LOW) are added.
260         # High priority rule action is set to BYPASS.
261         # Low priority rule action is set to DISCARD.
262         # Traffic sent on pg0 interface should match high priority
263         # rule and should be sent out on pg1 interface.
264         # in this test we define ranges of ports and ip addresses.
265         self.create_interfaces(2)
266         pkt_count = 5
267         s_port_s = 1000
268         s_port_e = 1000 + 1023
269         d_port_s = 5000
270         d_port_e = 5000 + 1023
271
272         s_ip_s = ipaddress.ip_address(
273             int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
274         )
275         s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
276         d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
277         d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
278         self.spd_create_and_intf_add(1, [self.pg1])
279         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
280             1,
281             self.pg0,
282             self.pg1,
283             socket.IPPROTO_UDP,
284             is_out=1,
285             priority=10,
286             policy_type="bypass",
287             ip_range=True,
288             local_ip_start=s_ip_s,
289             local_ip_stop=s_ip_e,
290             remote_ip_start=d_ip_s,
291             remote_ip_stop=d_ip_e,
292             local_port_start=s_port_s,
293             local_port_stop=s_port_e,
294             remote_port_start=d_port_s,
295             remote_port_stop=d_port_e,
296         )
297         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
298             1,
299             self.pg0,
300             self.pg1,
301             socket.IPPROTO_UDP,
302             is_out=1,
303             priority=5,
304             policy_type="discard",
305             ip_range=True,
306             local_ip_start=s_ip_s,
307             local_ip_stop=s_ip_e,
308             remote_ip_start=d_ip_s,
309             remote_ip_stop=d_ip_e,
310             local_port_start=s_port_s,
311             local_port_stop=s_port_e,
312             remote_port_start=d_port_s,
313             remote_port_stop=d_port_e,
314         )
315
316         # create the packet stream
317         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
318         # add the stream to the source interface + enable capture
319         self.pg0.add_stream(packets)
320         self.pg0.enable_capture()
321         self.pg1.enable_capture()
322         # start the packet generator
323         self.pg_start()
324         # get capture
325         capture = self.pg1.get_capture()
326         for packet in capture:
327             try:
328                 self.logger.debug(ppp("SPD - Got packet:", packet))
329             except Exception:
330                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
331                 raise
332         self.logger.debug("SPD: Num packets: %s", len(capture.res))
333
334         # assert nothing captured on pg0
335         self.pg0.assert_nothing_captured()
336         # verify captured packets
337         self.verify_capture(self.pg0, self.pg1, capture)
338         # verify all policies matched the expected number of times
339         self.verify_policy_match(pkt_count, policy_0)
340         self.verify_policy_match(0, policy_1)
341
342
343 class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
344     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
345         (add all ips ports rule)"""
346
347     def test_ipsec_spd_outbound_add(self):
348         # In this test case, packets in IPv4 FWD path are configured
349         # to go through IPSec outbound SPD policy lookup.
350         # 2 SPD rules (1 HIGH and 1 LOW) are added.
351         # Low priority rule action is set to BYPASS all ips.
352         # High priority rule action is set to DISCARD all ips.
353         # Traffic sent on pg0 interface when LOW priority rule is added,
354         # expect the packet is being sent out to pg1. Then HIGH priority
355         # rule is added and send the same traffic to pg0, this time expect
356         # the traffic is dropped.
357         self.create_interfaces(2)
358         pkt_count = 5
359         self.spd_create_and_intf_add(1, [self.pg1])
360         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
361             1,
362             self.pg0,
363             self.pg1,
364             socket.IPPROTO_UDP,
365             is_out=1,
366             priority=10,
367             policy_type="bypass",
368             all_ips=True,
369         )
370
371         # create the packet stream
372         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
373         # add the stream to the source interface + enable capture
374         self.pg0.add_stream(packets)
375         self.pg0.enable_capture()
376         self.pg1.enable_capture()
377         # start the packet generator
378         self.pg_start()
379         # get capture
380         capture = self.pg1.get_capture()
381         for packet in capture:
382             try:
383                 self.logger.debug(ppp("SPD - Got packet:", packet))
384             except Exception:
385                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
386                 raise
387         self.logger.debug("SPD: Num packets: %s", len(capture.res))
388
389         # assert nothing captured on pg0
390         self.pg0.assert_nothing_captured()
391         # verify captured packets
392         self.verify_capture(self.pg0, self.pg1, capture)
393         # verify all policies matched the expected number of times
394         self.verify_policy_match(pkt_count, policy_0)
395
396         policy_1 = self.spd_add_rem_policy(  # outbound, priority 20
397             1,
398             self.pg0,
399             self.pg1,
400             socket.IPPROTO_UDP,
401             is_out=1,
402             priority=20,
403             policy_type="discard",
404             all_ips=True,
405         )
406
407         # create the packet stream
408         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
409         # add the stream to the source interface + enable capture
410         self.pg0.add_stream(packets)
411         self.pg0.enable_capture()
412         self.pg1.enable_capture()
413         # start the packet generator
414         self.pg_start()
415         # assert nothing captured on pg0 and pg1
416         self.pg0.assert_nothing_captured()
417         self.pg1.assert_nothing_captured()
418
419
420 class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
421     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
422         (remove rule)"""
423
424     def test_ipsec_spd_outbound_remove(self):
425         # In this test case, packets in IPv4 FWD path are configured
426         # to go through IPSec outbound SPD policy lookup.
427         # 2 SPD rules (1 HIGH and 1 LOW) are added.
428         # High priority rule action is set to BYPASS.
429         # Low priority rule action is set to DISCARD.
430         # High priority rule is then removed.
431         # Traffic sent on pg0 interface should match low priority
432         # rule and should be discarded after SPD lookup.
433         self.create_interfaces(2)
434         pkt_count = 5
435         self.spd_create_and_intf_add(1, [self.pg1])
436         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
437             1,
438             self.pg0,
439             self.pg1,
440             socket.IPPROTO_UDP,
441             is_out=1,
442             priority=10,
443             policy_type="bypass",
444         )
445         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
446             1,
447             self.pg0,
448             self.pg1,
449             socket.IPPROTO_UDP,
450             is_out=1,
451             priority=5,
452             policy_type="discard",
453         )
454
455         # create the packet stream
456         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
457         # add the stream to the source interface + enable capture
458         self.pg0.add_stream(packets)
459         self.pg0.enable_capture()
460         self.pg1.enable_capture()
461         # start the packet generator
462         self.pg_start()
463         # get capture
464         capture = self.pg1.get_capture()
465         for packet in capture:
466             try:
467                 self.logger.debug(ppp("SPD - Got packet:", packet))
468             except Exception:
469                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
470                 raise
471
472         # assert nothing captured on pg0
473         self.pg0.assert_nothing_captured()
474         # verify capture on pg1
475         self.logger.debug("SPD: Num packets: %s", len(capture.res))
476         self.verify_capture(self.pg0, self.pg1, capture)
477         # verify all policies matched the expected number of times
478         self.verify_policy_match(pkt_count, policy_0)
479         self.verify_policy_match(0, policy_1)
480         # now remove the bypass rule
481         self.spd_add_rem_policy(  # outbound, priority 10
482             1,
483             self.pg0,
484             self.pg1,
485             socket.IPPROTO_UDP,
486             is_out=1,
487             priority=10,
488             policy_type="bypass",
489             remove=True,
490         )
491
492         # resend the same packets
493         self.pg0.add_stream(packets)
494         self.pg0.enable_capture()  # flush the old captures
495         self.pg1.enable_capture()
496         self.pg_start()
497         # assert nothing captured on pg0
498         self.pg0.assert_nothing_captured()
499         # all packets will be dropped by SPD rule
500         self.pg1.assert_nothing_captured()
501         # verify all policies matched the expected number of times
502         self.verify_policy_match(pkt_count, policy_0)
503         self.verify_policy_match(pkt_count, policy_1)
504
505
506 class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
507     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
508         (add, remove, re-add)"""
509
510     def test_ipsec_spd_outbound_readd(self):
511         # In this test case, packets in IPv4 FWD path are configured
512         # to go through IPSec outbound SPD policy lookup.
513         # 2 SPD rules (1 HIGH and 1 LOW) are added.
514         # High priority rule action is set to BYPASS.
515         # Low priority rule action is set to DISCARD.
516         # Traffic sent on pg0 interface should match high priority
517         # rule and should be sent out on pg1 interface.
518         # High priority rule is then removed.
519         # Traffic sent on pg0 interface should match low priority
520         # rule and should be discarded after SPD lookup.
521         # Readd high priority rule.
522         # Traffic sent on pg0 interface should match high priority
523         # rule and should be sent out on pg1 interface.
524         self.create_interfaces(2)
525         pkt_count = 5
526         self.spd_create_and_intf_add(1, [self.pg1])
527         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
528             1,
529             self.pg0,
530             self.pg1,
531             socket.IPPROTO_UDP,
532             is_out=1,
533             priority=10,
534             policy_type="bypass",
535         )
536         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
537             1,
538             self.pg0,
539             self.pg1,
540             socket.IPPROTO_UDP,
541             is_out=1,
542             priority=5,
543             policy_type="discard",
544         )
545
546         # create the packet stream
547         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
548         # add the stream to the source interface + enable capture
549         self.pg0.add_stream(packets)
550         self.pg0.enable_capture()
551         self.pg1.enable_capture()
552         # start the packet generator
553         self.pg_start()
554         # get capture
555         capture = self.pg1.get_capture()
556         for packet in capture:
557             try:
558                 self.logger.debug(ppp("SPD - Got packet:", packet))
559             except Exception:
560                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
561                 raise
562         self.logger.debug("SPD: Num packets: %s", len(capture.res))
563
564         # assert nothing captured on pg0
565         self.pg0.assert_nothing_captured()
566         # verify capture on pg1
567         self.verify_capture(self.pg0, self.pg1, capture)
568         # verify all policies matched the expected number of times
569         self.verify_policy_match(pkt_count, policy_0)
570         self.verify_policy_match(0, policy_1)
571         # remove the bypass rule, leaving only the discard rule
572         self.spd_add_rem_policy(  # outbound, priority 10
573             1,
574             self.pg0,
575             self.pg1,
576             socket.IPPROTO_UDP,
577             is_out=1,
578             priority=10,
579             policy_type="bypass",
580             remove=True,
581         )
582
583         # resend the same packets
584         self.pg0.add_stream(packets)
585         self.pg0.enable_capture()  # flush the old captures
586         self.pg1.enable_capture()
587         self.pg_start()
588
589         # assert nothing captured on pg0
590         self.pg0.assert_nothing_captured()
591         # all packets will be dropped by SPD rule
592         self.pg1.assert_nothing_captured()
593         # verify all policies matched the expected number of times
594         self.verify_policy_match(pkt_count, policy_0)
595         self.verify_policy_match(pkt_count, policy_1)
596
597         # now readd the bypass rule
598         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
599             1,
600             self.pg0,
601             self.pg1,
602             socket.IPPROTO_UDP,
603             is_out=1,
604             priority=10,
605             policy_type="bypass",
606         )
607
608         # resend the same packets
609         self.pg0.add_stream(packets)
610         self.pg0.enable_capture()  # flush the old captures
611         self.pg1.enable_capture()
612         self.pg_start()
613
614         # get capture
615         capture = self.pg1.get_capture(pkt_count)
616         for packet in capture:
617             try:
618                 self.logger.debug(ppp("SPD - Got packet:", packet))
619             except Exception:
620                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
621                 raise
622         self.logger.debug("SPD: Num packets: %s", len(capture.res))
623
624         # assert nothing captured on pg0
625         self.pg0.assert_nothing_captured()
626         # verify captured packets
627         self.verify_capture(self.pg0, self.pg1, capture)
628         # verify all policies matched the expected number of times
629         self.verify_policy_match(pkt_count, policy_0)
630         self.verify_policy_match(pkt_count, policy_1)
631
632
633 class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
634     """ IPSec/IPv4 outbound: Policy mode test case with fast path \
635         (multiple interfaces, multiple rules)"""
636
637     def test_ipsec_spd_outbound_multiple(self):
638         # In this test case, packets in IPv4 FWD path are configured to go
639         # through IPSec outbound SPD policy lookup.
640         # Multiples rules on multiple interfaces are tested at the same time.
641         # 3x interfaces are configured, binding the same SPD to each.
642         # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
643         # On pg0 & pg1, the BYPASS rule is HIGH priority
644         # On pg2, the DISCARD rule is HIGH priority
645         # Traffic should be received on pg0 & pg1 and dropped on pg2.
646         self.create_interfaces(3)
647         pkt_count = 5
648         # bind SPD to all interfaces
649         self.spd_create_and_intf_add(1, self.pg_interfaces)
650         # add rules on all interfaces
651         policy_01 = self.spd_add_rem_policy(  # outbound, priority 10
652             1,
653             self.pg0,
654             self.pg1,
655             socket.IPPROTO_UDP,
656             is_out=1,
657             priority=10,
658             policy_type="bypass",
659         )
660         policy_02 = self.spd_add_rem_policy(  # outbound, priority 5
661             1,
662             self.pg0,
663             self.pg1,
664             socket.IPPROTO_UDP,
665             is_out=1,
666             priority=5,
667             policy_type="discard",
668         )
669
670         policy_11 = self.spd_add_rem_policy(  # outbound, priority 10
671             1,
672             self.pg1,
673             self.pg2,
674             socket.IPPROTO_UDP,
675             is_out=1,
676             priority=10,
677             policy_type="bypass",
678         )
679         policy_12 = self.spd_add_rem_policy(  # outbound, priority 5
680             1,
681             self.pg1,
682             self.pg2,
683             socket.IPPROTO_UDP,
684             is_out=1,
685             priority=5,
686             policy_type="discard",
687         )
688
689         policy_21 = self.spd_add_rem_policy(  # outbound, priority 5
690             1,
691             self.pg2,
692             self.pg0,
693             socket.IPPROTO_UDP,
694             is_out=1,
695             priority=5,
696             policy_type="bypass",
697         )
698         policy_22 = self.spd_add_rem_policy(  # outbound, priority 10
699             1,
700             self.pg2,
701             self.pg0,
702             socket.IPPROTO_UDP,
703             is_out=1,
704             priority=10,
705             policy_type="discard",
706         )
707
708         # interfaces bound to an SPD, will by default drop inbound
709         # traffic with no matching policies. add catch-all inbound
710         # bypass rule to SPD:
711         self.spd_add_rem_policy(  # inbound, all interfaces
712             1,
713             None,
714             None,
715             socket.IPPROTO_UDP,
716             is_out=0,
717             priority=10,
718             policy_type="bypass",
719             all_ips=True,
720         )
721
722         # create the packet streams
723         packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
724         packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
725         packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
726         # add the streams to the source interfaces
727         self.pg0.add_stream(packets0)
728         self.pg1.add_stream(packets1)
729         self.pg2.add_stream(packets2)
730         # enable capture on all interfaces
731         for pg in self.pg_interfaces:
732             pg.enable_capture()
733         # start the packet generator
734         self.pg_start()
735
736         # get captures
737         if_caps = []
738         for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
739             if_caps.append(pg.get_capture())
740             for packet in if_caps[-1]:
741                 try:
742                     self.logger.debug(ppp("SPD - Got packet:", packet))
743                 except Exception:
744                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
745                     raise
746         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
747         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
748
749         # verify captures that matched BYPASS rule
750         self.verify_capture(self.pg0, self.pg1, if_caps[0])
751         self.verify_capture(self.pg1, self.pg2, if_caps[1])
752         # verify that traffic to pg0 matched DISCARD rule and was dropped
753         self.pg0.assert_nothing_captured()
754         # verify all packets that were expected to match rules, matched
755         # pg0 -> pg1
756         self.verify_policy_match(pkt_count, policy_01)
757         self.verify_policy_match(0, policy_02)
758         # pg1 -> pg2
759         self.verify_policy_match(pkt_count, policy_11)
760         self.verify_policy_match(0, policy_12)
761         # pg2 -> pg0
762         self.verify_policy_match(0, policy_21)
763         self.verify_policy_match(pkt_count, policy_22)
764
765
766 class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
767     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
768         (add rule)"""
769
770     def test_ipsec_spd_outbound_add(self):
771         # In this test case, packets in IPv4 FWD path are configured
772         # to go through IPSec outbound SPD policy lookup.
773         # 2 SPD rules (1 HIGH and 1 LOW) are added.
774         # High priority rule action is set to BYPASS.
775         # Low priority rule action is set to DISCARD.
776         # Traffic sent on pg0 interface should match high priority
777         # rule and should be sent out on pg1 interface.
778         self.create_interfaces(2)
779         pkt_count = 5
780         s_port_s = 1111
781         s_port_e = 1111
782         d_port_s = 2222
783         d_port_e = 2222
784         self.spd_create_and_intf_add(1, [self.pg1])
785         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
786             1,
787             self.pg0,
788             self.pg1,
789             socket.IPPROTO_UDP,
790             is_out=1,
791             priority=10,
792             policy_type="bypass",
793             local_port_start=s_port_s,
794             local_port_stop=s_port_e,
795             remote_port_start=d_port_s,
796             remote_port_stop=d_port_e,
797         )
798         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
799             1,
800             self.pg0,
801             self.pg1,
802             socket.IPPROTO_UDP,
803             is_out=1,
804             priority=5,
805             policy_type="discard",
806             local_port_start=s_port_s,
807             local_port_stop=s_port_e,
808             remote_port_start=d_port_s,
809             remote_port_stop=d_port_e,
810         )
811
812         # create the packet stream
813         packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
814         # add the stream to the source interface + enable capture
815         self.pg0.add_stream(packets)
816         self.pg0.enable_capture()
817         self.pg1.enable_capture()
818         # start the packet generator
819         self.pg_start()
820         # get capture
821         capture = self.pg1.get_capture()
822         for packet in capture:
823             try:
824                 self.logger.debug(ppp("SPD - Got packet:", packet))
825             except Exception:
826                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
827                 raise
828         self.logger.debug("SPD: Num packets: %s", len(capture.res))
829
830         # assert nothing captured on pg0
831         self.pg0.assert_nothing_captured()
832         # verify captured packets
833         self.verify_capture(self.pg0, self.pg1, capture)
834         # verify all policies matched the expected number of times
835         self.verify_policy_match(pkt_count, policy_0)
836         self.verify_policy_match(0, policy_1)
837
838
839 class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
840     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
841         (add all ips ports rule)"""
842
843     def test_ipsec_spd_outbound_add(self):
844         # In this test case, packets in IPv4 FWD path are configured
845         # to go through IPSec outbound SPD policy lookup.
846         # 2 SPD rules (1 HIGH and 1 LOW) are added.
847         # Low priority rule action is set to BYPASS all ips.
848         # High priority rule action is set to DISCARD all ips.
849         # Traffic sent on pg0 interface when LOW priority rule is added,
850         # expect the packet is being sent out to pg1. Then HIGH priority
851         # rule is added and send the same traffic to pg0, this time expect
852         # the traffic is dropped.
853         self.create_interfaces(2)
854         pkt_count = 5
855         self.spd_create_and_intf_add(1, [self.pg1])
856         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
857             1,
858             self.pg0,
859             self.pg1,
860             socket.IPPROTO_UDP,
861             is_out=1,
862             priority=10,
863             policy_type="bypass",
864             all_ips=True,
865         )
866
867         # create the packet stream
868         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
869         # add the stream to the source interface + enable capture
870         self.pg0.add_stream(packets)
871         self.pg0.enable_capture()
872         self.pg1.enable_capture()
873         # start the packet generator
874         self.pg_start()
875         # get capture
876         capture = self.pg1.get_capture()
877         for packet in capture:
878             try:
879                 self.logger.debug(ppp("SPD - Got packet:", packet))
880             except Exception:
881                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
882                 raise
883         self.logger.debug("SPD: Num packets: %s", len(capture.res))
884
885         # assert nothing captured on pg0
886         self.pg0.assert_nothing_captured()
887         # verify captured packets
888         self.verify_capture(self.pg0, self.pg1, capture)
889         # verify all policies matched the expected number of times
890         self.verify_policy_match(pkt_count, policy_0)
891
892         policy_1 = self.spd_add_rem_policy(  # outbound, priority 20
893             1,
894             self.pg0,
895             self.pg1,
896             socket.IPPROTO_UDP,
897             is_out=1,
898             priority=20,
899             policy_type="discard",
900             all_ips=True,
901         )
902
903         # create the packet stream
904         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
905         # add the stream to the source interface + enable capture
906         self.pg0.add_stream(packets)
907         self.pg0.enable_capture()
908         self.pg1.enable_capture()
909         # start the packet generator
910         self.pg_start()
911         # assert nothing captured on pg0 and pg1
912         self.pg0.assert_nothing_captured()
913         self.pg1.assert_nothing_captured()
914
915
916 class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
917     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
918         (add all ips port range rule)"""
919
920     def test_ipsec_spd_outbound_add(self):
921         # In this test case, packets in IPv4 FWD path are configured
922         # to go through IPSec outbound SPD policy lookup.
923         # 2 SPD rules (1 HIGH and 1 LOW) are added.
924         # High priority rule action is set to BYPASS.
925         # Low priority rule action is set to DISCARD.
926         # Traffic sent on pg0 interface should match high priority
927         # rule and should be sent out on pg1 interface.
928         self.create_interfaces(2)
929         pkt_count = 5
930         s_port_s = 1000
931         s_port_e = 2023
932         d_port_s = 5000
933         d_port_e = 6023
934         self.spd_create_and_intf_add(1, [self.pg1])
935         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
936             1,
937             self.pg0,
938             self.pg1,
939             socket.IPPROTO_UDP,
940             is_out=1,
941             priority=10,
942             policy_type="bypass",
943             all_ips=True,
944             local_port_start=s_port_s,
945             local_port_stop=s_port_e,
946             remote_port_start=d_port_s,
947             remote_port_stop=d_port_e,
948         )
949         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
950             1,
951             self.pg0,
952             self.pg1,
953             socket.IPPROTO_UDP,
954             is_out=1,
955             priority=5,
956             policy_type="discard",
957             all_ips=True,
958             local_port_start=s_port_s,
959             local_port_stop=s_port_e,
960             remote_port_start=d_port_s,
961             remote_port_stop=d_port_e,
962         )
963
964         # create the packet stream
965         packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
966         # add the stream to the source interface + enable capture
967         self.pg0.add_stream(packets)
968         self.pg0.enable_capture()
969         self.pg1.enable_capture()
970         # start the packet generator
971         self.pg_start()
972         # get capture
973         capture = self.pg1.get_capture()
974         for packet in capture:
975             try:
976                 self.logger.debug(ppp("SPD - Got packet:", packet))
977             except Exception:
978                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
979                 raise
980         self.logger.debug("SPD: Num packets: %s", len(capture.res))
981
982         # assert nothing captured on pg0
983         self.pg0.assert_nothing_captured()
984         # verify captured packets
985         self.verify_capture(self.pg0, self.pg1, capture)
986         # verify all policies matched the expected number of times
987         self.verify_policy_match(pkt_count, policy_0)
988         self.verify_policy_match(0, policy_1)
989
990
991 class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
992     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
993         (add ips range with any port rule)"""
994
995     def test_ipsec_spd_outbound_add(self):
996         # In this test case, packets in IPv4 FWD path are configured
997         # to go through IPSec outbound SPD policy lookup.
998         # 2 SPD rules (1 HIGH and 1 LOW) are added.
999         # High priority rule action is set to BYPASS.
1000         # Low priority rule action is set to DISCARD.
1001         # Traffic sent on pg0 interface should match high priority
1002         # rule and should be sent out on pg1 interface.
1003         self.create_interfaces(2)
1004         pkt_count = 5
1005         s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
1006         s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
1007         d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1008         d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
1009         self.spd_create_and_intf_add(1, [self.pg1])
1010         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
1011             1,
1012             self.pg0,
1013             self.pg1,
1014             socket.IPPROTO_UDP,
1015             is_out=1,
1016             priority=10,
1017             policy_type="bypass",
1018             ip_range=True,
1019             local_ip_start=s_ip_s,
1020             local_ip_stop=s_ip_e,
1021             remote_ip_start=d_ip_s,
1022             remote_ip_stop=d_ip_e,
1023         )
1024         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
1025             1,
1026             self.pg0,
1027             self.pg1,
1028             socket.IPPROTO_UDP,
1029             is_out=1,
1030             priority=5,
1031             policy_type="discard",
1032             ip_range=True,
1033             local_ip_start=s_ip_s,
1034             local_ip_stop=s_ip_e,
1035             remote_ip_start=d_ip_s,
1036             remote_ip_stop=d_ip_e,
1037         )
1038
1039         # create the packet stream
1040         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1041         # add the stream to the source interface + enable capture
1042         self.pg0.add_stream(packets)
1043         self.pg0.enable_capture()
1044         self.pg1.enable_capture()
1045         # start the packet generator
1046         self.pg_start()
1047         # get capture
1048         capture = self.pg1.get_capture()
1049         for packet in capture:
1050             try:
1051                 self.logger.debug(ppp("SPD - Got packet:", packet))
1052             except Exception:
1053                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1054                 raise
1055         self.logger.debug("SPD: Num packets: %s", len(capture.res))
1056
1057         # assert nothing captured on pg0
1058         self.pg0.assert_nothing_captured()
1059         # verify captured packets
1060         self.verify_capture(self.pg0, self.pg1, capture)
1061         # verify all policies matched the expected number of times
1062         self.verify_policy_match(pkt_count, policy_0)
1063         self.verify_policy_match(0, policy_1)
1064
1065
1066 class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
1067     """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
1068              (add all ips  range rule)"""
1069
1070     def test_ipsec_spd_outbound_add(self):
1071         # In this test case, packets in IPv4 FWD path are configured
1072         # to go through IPSec outbound SPD policy lookup.
1073         # 2 SPD rules (1 HIGH and 1 LOW) are added.
1074         # High priority rule action is set to BYPASS.
1075         # Low priority rule action is set to DISCARD.
1076         # Traffic sent on pg0 interface should match high priority
1077         # rule and should be sent out on pg1 interface.
1078         # in this test we define ranges of ports and ip addresses.
1079         self.create_interfaces(2)
1080         pkt_count = 5
1081         s_port_s = 1000
1082         s_port_e = 1000 + 1023
1083         d_port_s = 5000
1084         d_port_e = 5000 + 1023
1085
1086         s_ip_s = ipaddress.ip_address(
1087             int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
1088         )
1089         s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
1090         d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1091         d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
1092         self.spd_create_and_intf_add(1, [self.pg1])
1093         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
1094             1,
1095             self.pg0,
1096             self.pg1,
1097             socket.IPPROTO_UDP,
1098             is_out=1,
1099             priority=10,
1100             policy_type="bypass",
1101             ip_range=True,
1102             local_ip_start=s_ip_s,
1103             local_ip_stop=s_ip_e,
1104             remote_ip_start=d_ip_s,
1105             remote_ip_stop=d_ip_e,
1106             local_port_start=s_port_s,
1107             local_port_stop=s_port_e,
1108             remote_port_start=d_port_s,
1109             remote_port_stop=d_port_e,
1110         )
1111         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
1112             1,
1113             self.pg0,
1114             self.pg1,
1115             socket.IPPROTO_UDP,
1116             is_out=1,
1117             priority=5,
1118             policy_type="discard",
1119             ip_range=True,
1120             local_ip_start=s_ip_s,
1121             local_ip_stop=s_ip_e,
1122             remote_ip_start=d_ip_s,
1123             remote_ip_stop=d_ip_e,
1124             local_port_start=s_port_s,
1125             local_port_stop=s_port_e,
1126             remote_port_start=d_port_s,
1127             remote_port_stop=d_port_e,
1128         )
1129
1130         # create the packet stream
1131         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1132         # add the stream to the source interface + enable capture
1133         self.pg0.add_stream(packets)
1134         self.pg0.enable_capture()
1135         self.pg1.enable_capture()
1136         # start the packet generator
1137         self.pg_start()
1138         # get capture
1139         capture = self.pg1.get_capture()
1140         for packet in capture:
1141             try:
1142                 self.logger.debug(ppp("SPD - Got packet:", packet))
1143             except Exception:
1144                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1145                 raise
1146         self.logger.debug("SPD: Num packets: %s", len(capture.res))
1147
1148         # assert nothing captured on pg0
1149         self.pg0.assert_nothing_captured()
1150         # verify captured packets
1151         self.verify_capture(self.pg0, self.pg1, capture)
1152         # verify all policies matched the expected number of times
1153         self.verify_policy_match(pkt_count, policy_0)
1154         self.verify_policy_match(0, policy_1)
1155
1156
1157 class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
1158     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1159         (add, remove, re-add)"""
1160
1161     def test_ipsec_spd_outbound_readd(self):
1162         # In this test case, packets in IPv4 FWD path are configured
1163         # to go through IPSec outbound SPD policy lookup.
1164         # 2 SPD rules (1 HIGH and 1 LOW) are added.
1165         # High priority rule action is set to BYPASS.
1166         # Low priority rule action is set to DISCARD.
1167         # Traffic sent on pg0 interface should match high priority
1168         # rule and should be sent out on pg1 interface.
1169         # High priority rule is then removed.
1170         # Traffic sent on pg0 interface should match low priority
1171         # rule and should be discarded after SPD lookup.
1172         # Readd high priority rule.
1173         # Traffic sent on pg0 interface should match high priority
1174         # rule and should be sent out on pg1 interface.
1175         self.create_interfaces(2)
1176         pkt_count = 5
1177         self.spd_create_and_intf_add(1, [self.pg1])
1178         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
1179             1,
1180             self.pg0,
1181             self.pg1,
1182             socket.IPPROTO_UDP,
1183             is_out=1,
1184             priority=10,
1185             policy_type="bypass",
1186         )
1187         policy_1 = self.spd_add_rem_policy(  # outbound, priority 5
1188             1,
1189             self.pg0,
1190             self.pg1,
1191             socket.IPPROTO_UDP,
1192             is_out=1,
1193             priority=5,
1194             policy_type="discard",
1195         )
1196
1197         # create the packet stream
1198         packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1199         # add the stream to the source interface + enable capture
1200         self.pg0.add_stream(packets)
1201         self.pg0.enable_capture()
1202         self.pg1.enable_capture()
1203         # start the packet generator
1204         self.pg_start()
1205         # get capture
1206         capture = self.pg1.get_capture()
1207         for packet in capture:
1208             try:
1209                 self.logger.debug(ppp("SPD - Got packet:", packet))
1210             except Exception:
1211                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1212                 raise
1213         self.logger.debug("SPD: Num packets: %s", len(capture.res))
1214
1215         # assert nothing captured on pg0
1216         self.pg0.assert_nothing_captured()
1217         # verify capture on pg1
1218         self.verify_capture(self.pg0, self.pg1, capture)
1219         # verify all policies matched the expected number of times
1220         self.verify_policy_match(pkt_count, policy_0)
1221         self.verify_policy_match(0, policy_1)
1222         # remove the bypass rule, leaving only the discard rule
1223         self.spd_add_rem_policy(  # outbound, priority 10
1224             1,
1225             self.pg0,
1226             self.pg1,
1227             socket.IPPROTO_UDP,
1228             is_out=1,
1229             priority=10,
1230             policy_type="bypass",
1231             remove=True,
1232         )
1233
1234         # resend the same packets
1235         self.pg0.add_stream(packets)
1236         self.pg0.enable_capture()  # flush the old captures
1237         self.pg1.enable_capture()
1238         self.pg_start()
1239
1240         # assert nothing captured on pg0
1241         self.pg0.assert_nothing_captured()
1242         # all packets will be dropped by SPD rule
1243         self.pg1.assert_nothing_captured()
1244         # verify all policies matched the expected number of times
1245         self.verify_policy_match(pkt_count, policy_0)
1246         self.verify_policy_match(pkt_count, policy_1)
1247
1248         # now readd the bypass rule
1249         policy_0 = self.spd_add_rem_policy(  # outbound, priority 10
1250             1,
1251             self.pg0,
1252             self.pg1,
1253             socket.IPPROTO_UDP,
1254             is_out=1,
1255             priority=10,
1256             policy_type="bypass",
1257         )
1258
1259         # resend the same packets
1260         self.pg0.add_stream(packets)
1261         self.pg0.enable_capture()  # flush the old captures
1262         self.pg1.enable_capture()
1263         self.pg_start()
1264
1265         # get capture
1266         capture = self.pg1.get_capture(pkt_count)
1267         for packet in capture:
1268             try:
1269                 self.logger.debug(ppp("SPD - Got packet:", packet))
1270             except Exception:
1271                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1272                 raise
1273         self.logger.debug("SPD: Num packets: %s", len(capture.res))
1274
1275         # assert nothing captured on pg0
1276         self.pg0.assert_nothing_captured()
1277         # verify captured packets
1278         self.verify_capture(self.pg0, self.pg1, capture)
1279         # verify all policies matched the expected number of times
1280         self.verify_policy_match(pkt_count, policy_0)
1281         self.verify_policy_match(pkt_count, policy_1)
1282
1283
1284 class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
1285     """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1286         (multiple interfaces, multiple rules)"""
1287
1288     def test_ipsec_spd_outbound_multiple(self):
1289         # In this test case, packets in IPv4 FWD path are configured to go
1290         # through IPSec outbound SPD policy lookup.
1291         # Multiples rules on multiple interfaces are tested at the same time.
1292         # 3x interfaces are configured, binding the same SPD to each.
1293         # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
1294         # On pg0 & pg1, the BYPASS rule is HIGH priority
1295         # On pg2, the DISCARD rule is HIGH priority
1296         # Traffic should be received on pg0 & pg1 and dropped on pg2.
1297         self.create_interfaces(3)
1298         pkt_count = 5
1299         # bind SPD to all interfaces
1300         self.spd_create_and_intf_add(1, self.pg_interfaces)
1301         # add rules on all interfaces
1302         policy_01 = self.spd_add_rem_policy(  # outbound, priority 10
1303             1,
1304             self.pg0,
1305             self.pg1,
1306             socket.IPPROTO_UDP,
1307             is_out=1,
1308             priority=10,
1309             policy_type="bypass",
1310         )
1311         policy_02 = self.spd_add_rem_policy(  # outbound, priority 5
1312             1,
1313             self.pg0,
1314             self.pg1,
1315             socket.IPPROTO_UDP,
1316             is_out=1,
1317             priority=5,
1318             policy_type="discard",
1319         )
1320
1321         policy_11 = self.spd_add_rem_policy(  # outbound, priority 10
1322             1,
1323             self.pg1,
1324             self.pg2,
1325             socket.IPPROTO_UDP,
1326             is_out=1,
1327             priority=10,
1328             policy_type="bypass",
1329         )
1330         policy_12 = self.spd_add_rem_policy(  # outbound, priority 5
1331             1,
1332             self.pg1,
1333             self.pg2,
1334             socket.IPPROTO_UDP,
1335             is_out=1,
1336             priority=5,
1337             policy_type="discard",
1338         )
1339
1340         policy_21 = self.spd_add_rem_policy(  # outbound, priority 5
1341             1,
1342             self.pg2,
1343             self.pg0,
1344             socket.IPPROTO_UDP,
1345             is_out=1,
1346             priority=5,
1347             policy_type="bypass",
1348         )
1349         policy_22 = self.spd_add_rem_policy(  # outbound, priority 10
1350             1,
1351             self.pg2,
1352             self.pg0,
1353             socket.IPPROTO_UDP,
1354             is_out=1,
1355             priority=10,
1356             policy_type="discard",
1357         )
1358
1359         # interfaces bound to an SPD, will by default drop inbound
1360         # traffic with no matching policies. add catch-all inbound
1361         # bypass rule to SPD:
1362         self.spd_add_rem_policy(  # inbound, all interfaces
1363             1,
1364             None,
1365             None,
1366             socket.IPPROTO_UDP,
1367             is_out=0,
1368             priority=10,
1369             policy_type="bypass",
1370             all_ips=True,
1371         )
1372
1373         # create the packet streams
1374         packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
1375         packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
1376         packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
1377         # add the streams to the source interfaces
1378         self.pg0.add_stream(packets0)
1379         self.pg1.add_stream(packets1)
1380         self.pg2.add_stream(packets2)
1381         # enable capture on all interfaces
1382         for pg in self.pg_interfaces:
1383             pg.enable_capture()
1384         # start the packet generator
1385         self.pg_start()
1386
1387         # get captures
1388         if_caps = []
1389         for pg in [self.pg1, self.pg2]:  # we are expecting captures on pg1/pg2
1390             if_caps.append(pg.get_capture())
1391             for packet in if_caps[-1]:
1392                 try:
1393                     self.logger.debug(ppp("SPD - Got packet:", packet))
1394                 except Exception:
1395                     self.logger.error(ppp("Unexpected or invalid packet:", packet))
1396                     raise
1397         self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
1398         self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
1399
1400         # verify captures that matched BYPASS rule
1401         self.verify_capture(self.pg0, self.pg1, if_caps[0])
1402         self.verify_capture(self.pg1, self.pg2, if_caps[1])
1403         # verify that traffic to pg0 matched DISCARD rule and was dropped
1404         self.pg0.assert_nothing_captured()
1405         # verify all packets that were expected to match rules, matched
1406         # pg0 -> pg1
1407         self.verify_policy_match(pkt_count, policy_01)
1408         self.verify_policy_match(0, policy_02)
1409         # pg1 -> pg2
1410         self.verify_policy_match(pkt_count, policy_11)
1411         self.verify_policy_match(0, policy_12)
1412         # pg2 -> pg0
1413         self.verify_policy_match(0, policy_21)
1414         self.verify_policy_match(pkt_count, policy_22)
1415
1416
1417 if __name__ == "__main__":
1418     unittest.main(testRunner=VppTestRunner)