ipsec: huge anti-replay window support
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29     addr_type = socket.AF_INET
30     addr_any = "0.0.0.0"
31     addr_bcast = "255.255.255.255"
32     addr_len = 32
33     is_ipv6 = 0
34
35     def __init__(self):
36         self.remote_tun_if_host = "1.1.1.1"
37         self.remote_tun_if_host6 = "1111::1"
38
39         self.scapy_tun_sa_id = 100
40         self.scapy_tun_spi = 1000
41         self.vpp_tun_sa_id = 200
42         self.vpp_tun_spi = 2000
43
44         self.scapy_tra_sa_id = 300
45         self.scapy_tra_spi = 3000
46         self.vpp_tra_sa_id = 400
47         self.vpp_tra_spi = 4000
48
49         self.outer_hop_limit = 64
50         self.inner_hop_limit = 255
51         self.outer_flow_label = 0
52         self.inner_flow_label = 0x12345
53
54         self.anti_replay_window_size = 64
55
56         self.auth_algo_vpp_id = (
57             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
58         )
59         self.auth_algo = "HMAC-SHA1-96"  # scapy name
60         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
61
62         self.crypt_algo_vpp_id = (
63             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
64         )
65         self.crypt_algo = "AES-CBC"  # scapy name
66         self.crypt_key = b"JPjyOWBeVEQiMe7h"
67         self.salt = 0
68         self.flags = 0
69         self.nat_header = None
70         self.tun_flags = (
71             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
72         )
73         self.dscp = 0
74         self.async_mode = False
75
76
77 class IPsecIPv6Params:
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.anti_replay_window_size = 64
104
105         self.auth_algo_vpp_id = (
106             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
107         )
108         self.auth_algo = "HMAC-SHA1-96"  # scapy name
109         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
110
111         self.crypt_algo_vpp_id = (
112             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
113         )
114         self.crypt_algo = "AES-CBC"  # scapy name
115         self.crypt_key = b"JPjyOWBeVEQiMe7h"
116         self.salt = 0
117         self.flags = 0
118         self.nat_header = None
119         self.tun_flags = (
120             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
121         )
122         self.dscp = 0
123         self.async_mode = False
124
125
126 def mk_scapy_crypt_key(p):
127     if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
128         return p.crypt_key + struct.pack("!I", p.salt)
129     else:
130         return p.crypt_key
131
132
133 def config_tun_params(p, encryption_type, tun_if):
134     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
135     esn_en = bool(
136         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
137     )
138     p.tun_dst = tun_if.remote_addr[p.addr_type]
139     p.tun_src = tun_if.local_addr[p.addr_type]
140     crypt_key = mk_scapy_crypt_key(p)
141     p.scapy_tun_sa = SecurityAssociation(
142         encryption_type,
143         spi=p.scapy_tun_spi,
144         crypt_algo=p.crypt_algo,
145         crypt_key=crypt_key,
146         auth_algo=p.auth_algo,
147         auth_key=p.auth_key,
148         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
149         nat_t_header=p.nat_header,
150         esn_en=esn_en,
151     )
152     p.vpp_tun_sa = SecurityAssociation(
153         encryption_type,
154         spi=p.vpp_tun_spi,
155         crypt_algo=p.crypt_algo,
156         crypt_key=crypt_key,
157         auth_algo=p.auth_algo,
158         auth_key=p.auth_key,
159         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
160         nat_t_header=p.nat_header,
161         esn_en=esn_en,
162     )
163
164
165 def config_tra_params(p, encryption_type):
166     esn_en = bool(
167         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
168     )
169     crypt_key = mk_scapy_crypt_key(p)
170     p.scapy_tra_sa = SecurityAssociation(
171         encryption_type,
172         spi=p.scapy_tra_spi,
173         crypt_algo=p.crypt_algo,
174         crypt_key=crypt_key,
175         auth_algo=p.auth_algo,
176         auth_key=p.auth_key,
177         nat_t_header=p.nat_header,
178         esn_en=esn_en,
179     )
180     p.vpp_tra_sa = SecurityAssociation(
181         encryption_type,
182         spi=p.vpp_tra_spi,
183         crypt_algo=p.crypt_algo,
184         crypt_key=crypt_key,
185         auth_algo=p.auth_algo,
186         auth_key=p.auth_key,
187         nat_t_header=p.nat_header,
188         esn_en=esn_en,
189     )
190
191
192 class TemplateIpsec(VppTestCase):
193     """
194     TRANSPORT MODE::
195
196          ------   encrypt   ---
197         |tra_if| <-------> |VPP|
198          ------   decrypt   ---
199
200     TUNNEL MODE::
201
202          ------   encrypt   ---   plain   ---
203         |tun_if| <-------  |VPP| <------ |pg1|
204          ------             ---           ---
205
206          ------   decrypt   ---   plain   ---
207         |tun_if| ------->  |VPP| ------> |pg1|
208          ------             ---           ---
209     """
210
211     tun_spd_id = 1
212     tra_spd_id = 2
213
214     def ipsec_select_backend(self):
215         """empty method to be overloaded when necessary"""
216         pass
217
218     @classmethod
219     def setUpClass(cls):
220         super(TemplateIpsec, cls).setUpClass()
221
222     @classmethod
223     def tearDownClass(cls):
224         super(TemplateIpsec, cls).tearDownClass()
225
226     def setup_params(self):
227         if not hasattr(self, "ipv4_params"):
228             self.ipv4_params = IPsecIPv4Params()
229         if not hasattr(self, "ipv6_params"):
230             self.ipv6_params = IPsecIPv6Params()
231         self.params = {
232             self.ipv4_params.addr_type: self.ipv4_params,
233             self.ipv6_params.addr_type: self.ipv6_params,
234         }
235
236     def config_interfaces(self):
237         self.create_pg_interfaces(range(3))
238         self.interfaces = list(self.pg_interfaces)
239         for i in self.interfaces:
240             i.admin_up()
241             i.config_ip4()
242             i.resolve_arp()
243             i.config_ip6()
244             i.resolve_ndp()
245
246     def setUp(self):
247         super(TemplateIpsec, self).setUp()
248
249         self.setup_params()
250
251         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
252         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
253
254         self.config_interfaces()
255
256         self.ipsec_select_backend()
257
258     def unconfig_interfaces(self):
259         for i in self.interfaces:
260             i.admin_down()
261             i.unconfig_ip4()
262             i.unconfig_ip6()
263
264     def tearDown(self):
265         super(TemplateIpsec, self).tearDown()
266
267         self.unconfig_interfaces()
268
269     def show_commands_at_teardown(self):
270         self.logger.info(self.vapi.cli("show hardware"))
271
272     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
273         return [
274             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
275             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
276             for i in range(count)
277         ]
278
279     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
280         return [
281             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
282             / sa.encrypt(
283                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
284                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
285             )
286             for i in range(count)
287         ]
288
289     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
290         return [
291             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
292             / IP(src=src, dst=dst)
293             / ICMP()
294             / Raw(b"X" * payload_size)
295             for i in range(count)
296         ]
297
298     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
299         return [
300             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
301             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
302             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
303             for i in range(count)
304         ]
305
306
307 class IpsecTcp(object):
308     def verify_tcp_checksum(self):
309         # start http cli server listener on http://0.0.0.0:80
310         self.vapi.cli("http cli server")
311         p = self.params[socket.AF_INET]
312         send = Ether(
313             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
314         ) / p.scapy_tun_sa.encrypt(
315             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
316             / TCP(flags="S", dport=80)
317         )
318         self.logger.debug(ppp("Sending packet:", send))
319         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
320         recv = recv[0]
321         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
322         self.assert_packet_checksums_valid(decrypted)
323
324
325 class IpsecTcpTests(IpsecTcp):
326     def test_tcp_checksum(self):
327         """verify checksum correctness for vpp generated packets"""
328         self.verify_tcp_checksum()
329
330
331 class IpsecTra4(object):
332     """verify methods for Transport v4"""
333
334     def get_replay_counts(self, p):
335         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
336         count = self.statistics.get_err_counter(replay_node_name)
337
338         if p.async_mode:
339             replay_post_node_name = (
340                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
341             )
342             count += self.statistics.get_err_counter(replay_post_node_name)
343
344         return count
345
346     def get_hash_failed_counts(self, p):
347         if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
348             hash_failed_node_name = (
349                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
350             )
351         else:
352             hash_failed_node_name = (
353                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
354             )
355         count = self.statistics.get_err_counter(hash_failed_node_name)
356
357         if p.async_mode:
358             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
359
360         return count
361
362     def verify_hi_seq_num(self):
363         p = self.params[socket.AF_INET]
364         saf = VppEnum.vl_api_ipsec_sad_flags_t
365         esn_on = p.vpp_tra_sa.esn_en
366         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
367
368         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
369         replay_count = self.get_replay_counts(p)
370         hash_failed_count = self.get_hash_failed_counts(p)
371         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
372
373         # a few packets so we get the rx seq number above the window size and
374         # thus can simulate a wrap with an out of window packet
375         pkts = [
376             (
377                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
378                 / p.scapy_tra_sa.encrypt(
379                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
380                     seq_num=seq,
381                 )
382             )
383             for seq in range(63, 80)
384         ]
385         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
386
387         # these 4 packets will all choose seq-num 0 to decrpyt since none
388         # are out of window when first checked. however, once #200 has
389         # decrypted it will move the window to 200 and has #81 is out of
390         # window. this packet should be dropped.
391         pkts = [
392             (
393                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
394                 / p.scapy_tra_sa.encrypt(
395                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
396                     seq_num=200,
397                 )
398             ),
399             (
400                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
401                 / p.scapy_tra_sa.encrypt(
402                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
403                     seq_num=81,
404                 )
405             ),
406             (
407                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
408                 / p.scapy_tra_sa.encrypt(
409                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
410                     seq_num=201,
411                 )
412             ),
413             (
414                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
415                 / p.scapy_tra_sa.encrypt(
416                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
417                     seq_num=202,
418                 )
419             ),
420         ]
421
422         # if anti-replay is off then we won't drop #81
423         n_rx = 3 if ar_on else 4
424         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
425         # this packet is one before the wrap
426         pkts = [
427             (
428                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
429                 / p.scapy_tra_sa.encrypt(
430                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
431                     seq_num=203,
432                 )
433             )
434         ]
435         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
436
437         # a replayed packet, then an out of window, then a legit
438         # tests that a early failure on the batch doesn't affect subsequent packets.
439         pkts = [
440             (
441                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
442                 / p.scapy_tra_sa.encrypt(
443                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
444                     seq_num=203,
445                 )
446             ),
447             (
448                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
449                 / p.scapy_tra_sa.encrypt(
450                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
451                     seq_num=81,
452                 )
453             ),
454             (
455                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
456                 / p.scapy_tra_sa.encrypt(
457                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
458                     seq_num=204,
459                 )
460             ),
461         ]
462         n_rx = 1 if ar_on else 3
463         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
464
465         # move the window over half way to a wrap
466         pkts = [
467             (
468                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
469                 / p.scapy_tra_sa.encrypt(
470                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
471                     seq_num=0x80000001,
472                 )
473             )
474         ]
475         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
476
477         # anti-replay will drop old packets, no anti-replay will not
478         pkts = [
479             (
480                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
481                 / p.scapy_tra_sa.encrypt(
482                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
483                     seq_num=0x44000001,
484                 )
485             )
486         ]
487
488         if ar_on:
489             self.send_and_assert_no_replies(self.tra_if, pkts)
490         else:
491             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
492
493         if esn_on:
494             #
495             # validate wrapping the ESN
496             #
497
498             # wrap scapy's TX SA SN
499             p.scapy_tra_sa.seq_num = 0x100000005
500
501             # send a packet that wraps the window for both AR and no AR
502             pkts = [
503                 (
504                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
505                     / p.scapy_tra_sa.encrypt(
506                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
507                         / ICMP(),
508                         seq_num=0x100000005,
509                     )
510                 )
511             ]
512
513             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
514             for rx in rxs:
515                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
516
517             # move the window forward to half way to the next wrap
518             pkts = [
519                 (
520                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
521                     / p.scapy_tra_sa.encrypt(
522                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
523                         / ICMP(),
524                         seq_num=0x180000005,
525                     )
526                 )
527             ]
528
529             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
530
531             # a packet less than 2^30 from the current position is:
532             #  - AR: out of window and dropped
533             #  - non-AR: accepted
534             pkts = [
535                 (
536                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
537                     / p.scapy_tra_sa.encrypt(
538                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
539                         / ICMP(),
540                         seq_num=0x170000005,
541                     )
542                 )
543             ]
544
545             if ar_on:
546                 self.send_and_assert_no_replies(self.tra_if, pkts)
547             else:
548                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
549
550             # a packet more than 2^30 from the current position is:
551             #  - AR: out of window and dropped
552             #  - non-AR: considered a wrap, but since it's not a wrap
553             #    it won't decrpyt and so will be dropped
554             pkts = [
555                 (
556                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
557                     / p.scapy_tra_sa.encrypt(
558                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
559                         / ICMP(),
560                         seq_num=0x130000005,
561                     )
562                 )
563             ]
564
565             self.send_and_assert_no_replies(self.tra_if, pkts)
566
567             # a packet less than 2^30 from the current position and is a
568             # wrap; (the seq is currently at 0x180000005).
569             #  - AR: out of window so considered a wrap, so accepted
570             #  - non-AR: not considered a wrap, so won't decrypt
571             p.scapy_tra_sa.seq_num = 0x260000005
572             pkts = [
573                 (
574                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
575                     / p.scapy_tra_sa.encrypt(
576                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
577                         / ICMP(),
578                         seq_num=0x260000005,
579                     )
580                 )
581             ]
582             if ar_on:
583                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
584             else:
585                 self.send_and_assert_no_replies(self.tra_if, pkts)
586
587             #
588             # window positions are different now for AR/non-AR
589             #  move non-AR forward
590             #
591             if not ar_on:
592                 # a packet more than 2^30 from the current position and is a
593                 # wrap; (the seq is currently at 0x180000005).
594                 #  - AR: accepted
595                 #  - non-AR: not considered a wrap, so won't decrypt
596
597                 pkts = [
598                     (
599                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
600                         / p.scapy_tra_sa.encrypt(
601                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
602                             / ICMP(),
603                             seq_num=0x200000005,
604                         )
605                     ),
606                     (
607                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
608                         / p.scapy_tra_sa.encrypt(
609                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
610                             / ICMP(),
611                             seq_num=0x200000006,
612                         )
613                     ),
614                 ]
615                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
616
617                 pkts = [
618                     (
619                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
620                         / p.scapy_tra_sa.encrypt(
621                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
622                             / ICMP(),
623                             seq_num=0x260000005,
624                         )
625                     )
626                 ]
627                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
628
629     def verify_tra_anti_replay(self):
630         p = self.params[socket.AF_INET]
631         esn_en = p.vpp_tra_sa.esn_en
632         anti_replay_window_size = p.anti_replay_window_size
633
634         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
635         replay_count = self.get_replay_counts(p)
636         initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
637         hash_failed_count = self.get_hash_failed_counts(p)
638         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
639         initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
640             "seq_cycled"
641         )
642         hash_err = "integ_error"
643
644         if ESP == self.encryption_type:
645             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
646             undersize_count = self.statistics.get_err_counter(undersize_node_name)
647             initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
648                 "runt"
649             )
650             # For AES-GCM an error in the hash is reported as a decryption failure
651             if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
652                 hash_err = "decryption_failed"
653         # In async mode, we don't report errors in the hash.
654         if p.async_mode:
655             hash_err = ""
656         else:
657             initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
658                 hash_err
659             )
660
661         #
662         # send packets with seq numbers 1->34
663         # this means the window size is still in Case B (see RFC4303
664         # Appendix A)
665         #
666         # for reasons i haven't investigated Scapy won't create a packet with
667         # seq_num=0
668         #
669         pkts = [
670             (
671                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
672                 / p.scapy_tra_sa.encrypt(
673                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
674                     seq_num=seq,
675                 )
676             )
677             for seq in range(1, 34)
678         ]
679         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
680
681         # replayed packets are dropped
682         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
683         replay_count += len(pkts)
684         self.assertEqual(self.get_replay_counts(p), replay_count)
685         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
686         self.assertEqual(err, replay_count)
687
688         #
689         # now send a batch of packets all with the same sequence number
690         # the first packet in the batch is legitimate, the rest bogus
691         #
692         self.vapi.cli("clear error")
693         self.vapi.cli("clear node counters")
694         pkts = Ether(
695             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
696         ) / p.scapy_tra_sa.encrypt(
697             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
698             seq_num=35,
699         )
700         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
701         replay_count += 7
702         self.assertEqual(self.get_replay_counts(p), replay_count)
703         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
704         self.assertEqual(err, replay_count)
705
706         #
707         # now move the window over to anti_replay_window_size + 100 and into Case A
708         #
709         self.vapi.cli("clear error")
710         pkt = Ether(
711             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
712         ) / p.scapy_tra_sa.encrypt(
713             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
714             seq_num=anti_replay_window_size + 100,
715         )
716         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
717
718         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
719
720         # replayed packets are dropped
721         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
722         replay_count += 3
723         self.assertEqual(self.get_replay_counts(p), replay_count)
724         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
725         self.assertEqual(err, replay_count)
726
727         # the window size is anti_replay_window_size packets
728         # in window are still accepted
729         pkt = Ether(
730             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
731         ) / p.scapy_tra_sa.encrypt(
732             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
733             seq_num=200,
734         )
735
736         # a packet that does not decrypt does not move the window forward
737         bogus_sa = SecurityAssociation(
738             self.encryption_type,
739             p.scapy_tra_spi,
740             crypt_algo=p.crypt_algo,
741             crypt_key=mk_scapy_crypt_key(p)[::-1],
742             auth_algo=p.auth_algo,
743             auth_key=p.auth_key[::-1],
744         )
745         pkt = Ether(
746             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
747         ) / bogus_sa.encrypt(
748             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
749             seq_num=anti_replay_window_size + 200,
750         )
751         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
752
753         hash_failed_count += 17
754         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
755         if hash_err != "":
756             err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
757             self.assertEqual(err, hash_failed_count)
758
759         # a malformed 'runt' packet
760         #  created by a mis-constructed SA
761         if ESP == self.encryption_type and p.crypt_algo != "NULL":
762             bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
763             pkt = Ether(
764                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
765             ) / bogus_sa.encrypt(
766                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
767                 seq_num=anti_replay_window_size + 200,
768             )
769             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
770
771             undersize_count += 17
772             self.assert_error_counter_equal(undersize_node_name, undersize_count)
773             err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
774             self.assertEqual(err, undersize_count)
775
776         # which we can determine since this packet is still in the window
777         pkt = Ether(
778             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
779         ) / p.scapy_tra_sa.encrypt(
780             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
781             seq_num=234,
782         )
783         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
784
785         #
786         # out of window are dropped
787         #  this is Case B. So VPP will consider this to be a high seq num wrap
788         #  and so the decrypt attempt will fail
789         #
790         pkt = Ether(
791             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
792         ) / p.scapy_tra_sa.encrypt(
793             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
794             seq_num=17,
795         )
796         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
797
798         if esn_en:
799             # an out of window error with ESN looks like a high sequence
800             # wrap. but since it isn't then the verify will fail.
801             hash_failed_count += 17
802             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
803             if hash_err != "":
804                 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
805                 self.assertEqual(err, hash_failed_count)
806
807         else:
808             replay_count += 17
809             self.assertEqual(self.get_replay_counts(p), replay_count)
810             err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
811             self.assertEqual(err, replay_count)
812
813         # valid packet moves the window over to anti_replay_window_size + 258
814         pkt = Ether(
815             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
816         ) / p.scapy_tra_sa.encrypt(
817             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
818             seq_num=anti_replay_window_size + 258,
819         )
820         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
821         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
822
823         #
824         # move VPP's SA TX seq-num to just before the seq-number wrap.
825         # then fire in a packet that VPP should drop on TX because it
826         # causes the TX seq number to wrap; unless we're using extened sequence
827         # numbers.
828         #
829         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
830         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
831         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
832
833         pkts = [
834             (
835                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
836                 / p.scapy_tra_sa.encrypt(
837                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
838                     seq_num=seq,
839                 )
840             )
841             for seq in range(259, 280)
842         ]
843
844         if esn_en:
845             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
846
847             #
848             # in order for scapy to decrypt its SA's high order number needs
849             # to wrap
850             #
851             p.vpp_tra_sa.seq_num = 0x100000000
852             for rx in rxs:
853                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
854
855             #
856             # wrap scapy's TX high sequence number. VPP is in case B, so it
857             # will consider this a high seq wrap also.
858             # The low seq num we set it to will place VPP's RX window in Case A
859             #
860             p.scapy_tra_sa.seq_num = 0x100000005
861             pkt = Ether(
862                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
863             ) / p.scapy_tra_sa.encrypt(
864                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
865                 seq_num=0x100000005,
866             )
867             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
868
869             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
870
871             #
872             # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
873             # the window
874             #
875             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
876             pkt = Ether(
877                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
878             ) / p.scapy_tra_sa.encrypt(
879                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
880                 seq_num=0xFFFFFFFD,
881             )
882             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
883             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
884
885             #
886             # While in case A we cannot wrap the high sequence number again
887             # because VPP will consider this packet to be one that moves the
888             # window forward
889             #
890             pkt = Ether(
891                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
892             ) / p.scapy_tra_sa.encrypt(
893                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
894                 seq_num=0x200000999,
895             )
896             self.send_and_assert_no_replies(
897                 self.tra_if, [pkt], self.tra_if, timeout=0.2
898             )
899
900             hash_failed_count += 1
901             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
902             if hash_err != "":
903                 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
904                 self.assertEqual(err, hash_failed_count)
905
906             #
907             # but if we move the window forward to case B, then we can wrap
908             # again
909             #
910             p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
911             pkt = Ether(
912                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
913             ) / p.scapy_tra_sa.encrypt(
914                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
915                 seq_num=p.scapy_tra_sa.seq_num,
916             )
917             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
918             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
919
920             p.scapy_tra_sa.seq_num = 0x200000444
921             pkt = Ether(
922                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
923             ) / p.scapy_tra_sa.encrypt(
924                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
925                 seq_num=0x200000444,
926             )
927             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
928             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
929
930         else:
931             #
932             # without ESN TX sequence numbers can't wrap and packets are
933             # dropped from here on out.
934             #
935             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
936             seq_cycle_count += len(pkts)
937             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
938             err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
939             self.assertEqual(err, seq_cycle_count)
940
941         # move the security-associations seq number on to the last we used
942         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
943         p.scapy_tra_sa.seq_num = 351
944         p.vpp_tra_sa.seq_num = 351
945
946     def verify_tra_lost(self):
947         p = self.params[socket.AF_INET]
948         esn_en = p.vpp_tra_sa.esn_en
949
950         #
951         # send packets with seq numbers 1->34
952         # this means the window size is still in Case B (see RFC4303
953         # Appendix A)
954         #
955         # for reasons i haven't investigated Scapy won't create a packet with
956         # seq_num=0
957         #
958         pkts = [
959             (
960                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
961                 / p.scapy_tra_sa.encrypt(
962                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
963                     seq_num=seq,
964                 )
965             )
966             for seq in range(1, 3)
967         ]
968         self.send_and_expect(self.tra_if, pkts, self.tra_if)
969
970         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
971
972         # skip a sequence number
973         pkts = [
974             (
975                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
976                 / p.scapy_tra_sa.encrypt(
977                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
978                     seq_num=seq,
979                 )
980             )
981             for seq in range(4, 6)
982         ]
983         self.send_and_expect(self.tra_if, pkts, self.tra_if)
984
985         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
986
987         # the lost packet are counted untill we get up past the first
988         # sizeof(replay_window) packets
989         pkts = [
990             (
991                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
992                 / p.scapy_tra_sa.encrypt(
993                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
994                     seq_num=seq,
995                 )
996             )
997             for seq in range(6, 100)
998         ]
999         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1000
1001         self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
1002
1003         # lost of holes in the sequence
1004         pkts = [
1005             (
1006                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1007                 / p.scapy_tra_sa.encrypt(
1008                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1009                     seq_num=seq,
1010                 )
1011             )
1012             for seq in range(100, 200, 2)
1013         ]
1014         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
1015
1016         pkts = [
1017             (
1018                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1019                 / p.scapy_tra_sa.encrypt(
1020                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1021                     seq_num=seq,
1022                 )
1023             )
1024             for seq in range(200, 300)
1025         ]
1026         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1027
1028         self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
1029
1030         # a big hole in the seq number space
1031         pkts = [
1032             (
1033                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1034                 / p.scapy_tra_sa.encrypt(
1035                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1036                     seq_num=seq,
1037                 )
1038             )
1039             for seq in range(400, 500)
1040         ]
1041         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1042
1043         self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
1044
1045     def verify_tra_basic4(self, count=1, payload_size=54):
1046         """ipsec v4 transport basic test"""
1047         self.vapi.cli("clear errors")
1048         self.vapi.cli("clear ipsec sa")
1049         try:
1050             p = self.params[socket.AF_INET]
1051             send_pkts = self.gen_encrypt_pkts(
1052                 p,
1053                 p.scapy_tra_sa,
1054                 self.tra_if,
1055                 src=self.tra_if.remote_ip4,
1056                 dst=self.tra_if.local_ip4,
1057                 count=count,
1058                 payload_size=payload_size,
1059             )
1060             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1061             for rx in recv_pkts:
1062                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1063                 self.assert_packet_checksums_valid(rx)
1064                 try:
1065                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1066                     self.assert_packet_checksums_valid(decrypted)
1067                 except:
1068                     self.logger.debug(ppp("Unexpected packet:", rx))
1069                     raise
1070         finally:
1071             self.logger.info(self.vapi.ppcli("show error"))
1072             self.logger.info(self.vapi.ppcli("show ipsec all"))
1073
1074         pkts = p.tra_sa_in.get_stats()["packets"]
1075         self.assertEqual(
1076             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1077         )
1078         pkts = p.tra_sa_out.get_stats()["packets"]
1079         self.assertEqual(
1080             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1081         )
1082         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
1083         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
1084
1085         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1086         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1087
1088     def _verify_tra_anti_replay_algorithm_esn(self):
1089         def seq_num(seqh, seql):
1090             return (seqh << 32) | (seql & 0xFFFF_FFFF)
1091
1092         p = self.params[socket.AF_INET]
1093         anti_replay_window_size = p.anti_replay_window_size
1094
1095         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1096         replay_count = self.get_replay_counts(p)
1097         hash_failed_count = self.get_hash_failed_counts(p)
1098         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1099
1100         if ESP == self.encryption_type:
1101             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1102             undersize_count = self.statistics.get_err_counter(undersize_node_name)
1103
1104         # reset the TX SA to avoid conflict with left configuration
1105         self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1106
1107         """
1108         RFC 4303 Appendix A2. Case A
1109
1110         |: new Th marker
1111         a-i: possible seq num received
1112         +: Bl, Tl, Bl', Tl'
1113         [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
1114
1115                 Th - 1               Th                Th + 1
1116         --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
1117                 =========          =========          =========
1118                 Bl-     Tl-        Bl      Tl         Bl+     Tl+
1119
1120         Case A implies Tl >= W - 1
1121         """
1122
1123         Th = 1
1124         Tl = anti_replay_window_size + 40
1125         Bl = Tl - anti_replay_window_size + 1
1126
1127         # move VPP's RX AR window to Case A
1128         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1129         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1130
1131         """
1132         case a: Seql < Bl
1133             - pre-crypto check: algorithm predicts that the packet wrap the window
1134                 -> Seqh = Th + 1
1135             - integrity check: should fail
1136             - post-crypto check: ...
1137         """
1138         pkts = [
1139             (
1140                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1141                 / p.scapy_tra_sa.encrypt(
1142                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1143                     seq_num=seq,
1144                 )
1145             )
1146             for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
1147         ]
1148
1149         # out-of-window packets fail integrity check
1150         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1151         hash_failed_count += len(pkts)
1152         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1153
1154         """
1155             case b: Bl <= Seql <= Tl
1156                 - pre-crypto check: algorithm predicts that the packet is in the window
1157                     -> Seqh = Th
1158                     -> check for a replayed packet with Seql
1159                 - integrity check: should fail
1160                 - post-crypto check: ...
1161         """
1162         pkts = [
1163             (
1164                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1165                 / p.scapy_tra_sa.encrypt(
1166                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1167                     seq_num=seq,
1168                 )
1169             )
1170             for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
1171         ]
1172         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1173
1174         p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1175         pkts = [
1176             (
1177                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1178                 / p.scapy_tra_sa.encrypt(
1179                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1180                     seq_num=seq,
1181                 )
1182             )
1183             for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
1184         ]
1185
1186         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1187
1188         # some packets are rejected by the pre-crypto check
1189         replay_count += 5
1190         self.assertEqual(self.get_replay_counts(p), replay_count)
1191
1192         # out-of-window packets fail integrity check
1193         hash_failed_count += len(pkts) - 5
1194         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1195
1196         """
1197             case c: Seql > Tl
1198                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1199                     -> Seqh = Th
1200                 - integrity check: should fail
1201                 - post-crypto check: ...
1202         """
1203         pkts = [
1204             (
1205                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1206                 / p.scapy_tra_sa.encrypt(
1207                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1208                     seq_num=seq,
1209                 )
1210             )
1211             for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
1212         ]
1213
1214         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1215
1216         # out-of-window packets fail integrity check
1217         hash_failed_count += len(pkts)
1218         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1219
1220         """
1221             case d: Seql < Bl
1222                 - pre-crypto check: algorithm predicts that the packet wrap the window
1223                     -> Seqh = Th + 1
1224                 - integrity check: should fail
1225                 - post-crypto check: ...
1226         """
1227         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1228         pkts = [
1229             (
1230                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1231                 / p.scapy_tra_sa.encrypt(
1232                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1233                     seq_num=seq,
1234                 )
1235             )
1236             for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
1237         ]
1238
1239         # out-of-window packets fail integrity check
1240         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1241         hash_failed_count += len(pkts)
1242         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1243
1244         """
1245             case e: Bl <= Seql <= Tl
1246                 - pre-crypto check: algorithm predicts that the packet is in the window
1247                     -> Seqh = Th
1248                     -> check for a replayed packet with Seql
1249                 - integrity check: should pass
1250                 - post-crypto check: should pass
1251                     -> Seql is marked in the AR window
1252         """
1253         pkts = [
1254             (
1255                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1256                 / p.scapy_tra_sa.encrypt(
1257                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1258                     seq_num=seq,
1259                 )
1260             )
1261             for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
1262         ]
1263
1264         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1265
1266         """
1267             case f: Seql > Tl
1268                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1269                     -> Seqh = Th
1270                 - integrity check: should pass
1271                 - post-crypto check: should pass
1272                     -> AR window shift (the window stays Case A)
1273                     -> Seql is marked in the AR window
1274         """
1275         pkts = [
1276             (
1277                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1278                 / p.scapy_tra_sa.encrypt(
1279                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1280                     seq_num=seq,
1281                 )
1282             )
1283             for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
1284         ]
1285
1286         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1287
1288         """
1289             case g: Seql < Bl
1290                 - pre-crypto check: algorithm predicts that the packet wrap the window
1291                     -> Seqh = Th + 1
1292                 - integrity check: should pass
1293                 - post-crypto check: should pass
1294                     -> AR window shift (may set the window in Case B)
1295                     -> Seql is marked in the AR window
1296         """
1297         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1298         pkts = [
1299             (
1300                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1301                 / p.scapy_tra_sa.encrypt(
1302                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1303                     seq_num=seq,
1304                 )
1305             )
1306             # set the window in Case B (the minimum window size is 64
1307             # so we are sure to overlap)
1308             for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
1309         ]
1310
1311         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1312
1313         # reset the VPP's RX AR window to Case A
1314         Th = 1
1315         Tl = 2 * anti_replay_window_size + 40
1316         Bl = Tl - anti_replay_window_size + 1
1317
1318         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1319
1320         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1321         pkts = [
1322             (
1323                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1324                 / p.scapy_tra_sa.encrypt(
1325                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1326                     seq_num=seq,
1327                 )
1328             )
1329             # the AR will stay in Case A
1330             for seq in range(
1331                 seq_num(Th + 1, anti_replay_window_size + 10),
1332                 seq_num(Th + 1, anti_replay_window_size + 20),
1333             )
1334         ]
1335
1336         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1337
1338         """
1339             case h: Bl <= Seql <= Tl
1340                 - pre-crypto check: algorithm predicts that the packet is in the window
1341                     -> Seqh = Th
1342                     -> check for a replayed packet with Seql
1343                 - integrity check: the wrap is not detected, should fail
1344                 - post-crypto check: ...
1345         """
1346         Th += 1
1347         Tl = anti_replay_window_size + 20
1348         Bl = Tl - anti_replay_window_size + 1
1349
1350         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1351
1352         pkts = [
1353             (
1354                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1355                 / p.scapy_tra_sa.encrypt(
1356                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1357                     seq_num=seq,
1358                 )
1359             )
1360             for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
1361         ]
1362
1363         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1364
1365         # some packets are rejected by the pre-crypto check
1366         replay_count += 5
1367         self.assertEqual(self.get_replay_counts(p), replay_count)
1368
1369         # out-of-window packets fail integrity check
1370         hash_failed_count += len(pkts) - 5
1371         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1372
1373         """
1374             case i: Seql > Tl
1375                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1376                     -> Seqh = Th
1377                 - integrity check: the wrap is not detected, shoud fail
1378                 - post-crypto check: ...
1379         """
1380         pkts = [
1381             (
1382                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1383                 / p.scapy_tra_sa.encrypt(
1384                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1385                     seq_num=seq,
1386                 )
1387             )
1388             for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
1389         ]
1390
1391         # out-of-window packets fail integrity check
1392         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1393         hash_failed_count += len(pkts)
1394         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1395
1396         """
1397             RFC 4303 Appendix A2. Case B
1398
1399                         Th - 1               Th                Th + 1
1400             ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
1401             =========          ===========          ===========
1402                     Tl-        Bl       Tl          Bl+       Tl+
1403
1404             Case B implies Tl < W - 1
1405         """
1406
1407         # reset the VPP's RX AR window to Case B
1408         Th = 2
1409         Tl = 30  # minimum window size of 64, we are sure to overlap
1410         Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1411
1412         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1413         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1414
1415         """
1416             case a: Seql <= Tl < Bl
1417                 - pre-crypto check: algorithm predicts that the packet is in the window
1418                     -> Seqh = Th
1419                     -> check for replayed packet
1420                 - integrity check: should fail
1421                 - post-crypto check: ...
1422         """
1423         pkts = [
1424             (
1425                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1426                 / p.scapy_tra_sa.encrypt(
1427                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1428                     seq_num=seq,
1429                 )
1430             )
1431             for seq in range(seq_num(Th, 5), seq_num(Th, 10))
1432         ]
1433
1434         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1435
1436         p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1437         pkts = [
1438             (
1439                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1440                 / p.scapy_tra_sa.encrypt(
1441                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1442                     seq_num=seq,
1443                 )
1444             )
1445             for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
1446         ]
1447
1448         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1449
1450         # some packets are rejected by the pre-crypto check
1451         replay_count += 5
1452         self.assertEqual(self.get_replay_counts(p), replay_count)
1453
1454         # out-of-window packets fail integrity check
1455         hash_failed_count += len(pkts) - 5
1456         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1457
1458         """
1459             case b: Tl < Seql < Bl
1460                 - pre-crypto check: algorithm predicts that the packet will shift the window
1461                     -> Seqh = Th
1462                 - integrity check: should fail
1463                 - post-crypto check: ...
1464         """
1465         pkts = [
1466             (
1467                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1468                 / p.scapy_tra_sa.encrypt(
1469                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1470                     seq_num=seq,
1471                 )
1472             )
1473             for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
1474         ]
1475
1476         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1477
1478         # out-of-window packets fail integrity check
1479         hash_failed_count += len(pkts)
1480         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1481
1482         """
1483             case c: Tl < Bl <= Seql
1484                 - pre-crypto check: algorithm predicts that the packet is in the window
1485                     -> Seqh = Th - 1
1486                     -> check for a replayed packet with Seql
1487                 - integrity check: should pass
1488                 - post-crypto check: should pass
1489                     -> Seql is marked in the AR window
1490         """
1491         pkts = [
1492             (
1493                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1494                 / p.scapy_tra_sa.encrypt(
1495                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1496                     seq_num=seq,
1497                 )
1498             )
1499             for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
1500         ]
1501
1502         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1503
1504         """
1505             case d: Seql <= Tl < Bl
1506                 - pre-crypto check: algorithm predicts that the packet is the window
1507                     -> Seqh = Th
1508                     -> check for replayed packet
1509                 - integrity check: should pass
1510                 - post-crypto check: should pass
1511                     -> Seql is marked in the AR window
1512         """
1513         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1514         pkts = [
1515             (
1516                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1517                 / p.scapy_tra_sa.encrypt(
1518                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1519                     seq_num=seq,
1520                 )
1521             )
1522             for seq in range(seq_num(Th, 15), seq_num(Th, 25))
1523         ]
1524
1525         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1526
1527         """
1528             case e: Tl < Seql < Bl
1529                 - pre-crypto check: algorithm predicts that the packet is in the window
1530                     -> Seqh = Th
1531                     -> check for a replayed packet with Seql
1532                 - integrity check: should pass
1533                 - post-crypto check: should pass
1534                     -> AR window shift (may set the window in Case A)
1535                     -> Seql is marked in the AR window
1536         """
1537         pkts = [
1538             (
1539                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1540                 / p.scapy_tra_sa.encrypt(
1541                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1542                     seq_num=seq,
1543                 )
1544             )
1545             for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
1546         ]
1547
1548         # the window stays in Case B
1549         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1550
1551         pkts = [
1552             (
1553                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1554                 / p.scapy_tra_sa.encrypt(
1555                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1556                     seq_num=seq,
1557                 )
1558             )
1559             for seq in range(
1560                 seq_num(Th, Tl + anti_replay_window_size + 5),
1561                 seq_num(Th, Tl + anti_replay_window_size + 15),
1562             )
1563         ]
1564
1565         # the window moves to Case A
1566         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1567
1568         # reset the VPP's RX AR window to Case B
1569         Th = 2
1570         Tl = 30  # minimum window size of 64, we are sure to overlap
1571         Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1572
1573         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1574         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1575
1576         """
1577             case f: Tl < Bl <= Seql
1578                 - pre-crypto check: algorithm predicts that the packet is in the previous window
1579                     -> Seqh = Th - 1
1580                     -> check for a replayed packet with Seql
1581                 - integrity check: should fail
1582                 - post-crypto check: ...
1583         """
1584         pkts = [
1585             (
1586                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1587                 / p.scapy_tra_sa.encrypt(
1588                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1589                     seq_num=seq,
1590                 )
1591             )
1592             for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
1593         ]
1594
1595         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1596
1597         # out-of-window packets fail integrity check
1598         hash_failed_count += len(pkts)
1599         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1600
1601         """
1602             case g: Seql <= Tl < Bl
1603                 - pre-crypto check: algorithm predicts that the packet is the window
1604                     -> Seqh = Th
1605                     -> check for replayed packet
1606                 - integrity check: should fail
1607                 - post-crypto check: ...
1608         """
1609         pkts = [
1610             (
1611                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1612                 / p.scapy_tra_sa.encrypt(
1613                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1614                     seq_num=seq,
1615                 )
1616             )
1617             for seq in range(seq_num(Th, 10), seq_num(Th, 15))
1618         ]
1619
1620         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1621
1622         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1623         pkts = [
1624             (
1625                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1626                 / p.scapy_tra_sa.encrypt(
1627                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1628                     seq_num=seq,
1629                 )
1630             )
1631             for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
1632         ]
1633
1634         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1635
1636         # some packets are rejected by the pre-crypto check
1637         replay_count += 5
1638         self.assertEqual(self.get_replay_counts(p), replay_count)
1639
1640         # out-of-window packets fail integrity check
1641         hash_failed_count += len(pkts) - 5
1642         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1643
1644         """
1645             case h: Tl < Seql < Bl
1646                 - pre-crypto check: algorithm predicts that the packet will shift the window
1647                     -> Seqh = Th
1648                 - integrity check: should fail
1649                 - post-crypto check: ...
1650         """
1651         pkts = [
1652             (
1653                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1654                 / p.scapy_tra_sa.encrypt(
1655                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1656                     seq_num=seq,
1657                 )
1658             )
1659             for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
1660         ]
1661
1662         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1663
1664         # out-of-window packets fail integrity check
1665         hash_failed_count += len(pkts)
1666         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1667
1668     def _verify_tra_anti_replay_algorithm_no_esn(self):
1669         def seq_num(seql):
1670             return seql & 0xFFFF_FFFF
1671
1672         p = self.params[socket.AF_INET]
1673         anti_replay_window_size = p.anti_replay_window_size
1674
1675         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1676         replay_count = self.get_replay_counts(p)
1677         hash_failed_count = self.get_hash_failed_counts(p)
1678         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1679
1680         if ESP == self.encryption_type:
1681             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1682             undersize_count = self.statistics.get_err_counter(undersize_node_name)
1683
1684         # reset the TX SA to avoid conflict with left configuration
1685         self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1686
1687         """
1688         RFC 4303 Appendix A2. Case A
1689
1690         a-c: possible seq num received
1691         +: Bl, Tl
1692
1693         |--a--+---b---+-c--|
1694               =========
1695               Bl      Tl
1696
1697         No ESN implies Th = 0
1698         Case A implies Tl >= W - 1
1699         """
1700
1701         Tl = anti_replay_window_size + 40
1702         Bl = Tl - anti_replay_window_size + 1
1703
1704         # move VPP's RX AR window to Case A
1705         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1706         p.scapy_tra_sa.seq_num = seq_num(Tl)
1707
1708         """
1709         case a: Seql < Bl
1710             - pre-crypto check: algorithm predicts that the packet is out of window
1711                 -> packet should be dropped
1712             - integrity check: ...
1713             - post-crypto check: ...
1714         """
1715         pkts = [
1716             (
1717                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1718                 / p.scapy_tra_sa.encrypt(
1719                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1720                     seq_num=seq,
1721                 )
1722             )
1723             for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
1724         ]
1725
1726         # out-of-window packets
1727         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1728         replay_count += len(pkts)
1729         self.assertEqual(self.get_replay_counts(p), replay_count)
1730
1731         """
1732             case b: Bl <= Seql <= Tl
1733                 - pre-crypto check: algorithm predicts that the packet is in the window
1734                     -> check for a replayed packet with Seql
1735                 - integrity check: should pass
1736                 - post-crypto check:
1737                     -> check for a replayed packet with Seql
1738         """
1739         pkts = [
1740             (
1741                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1742                 / p.scapy_tra_sa.encrypt(
1743                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1744                     seq_num=seq,
1745                 )
1746             )
1747             for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
1748         ]
1749         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1750
1751         pkts = [
1752             (
1753                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1754                 / p.scapy_tra_sa.encrypt(
1755                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1756                     seq_num=seq,
1757                 )
1758             )
1759             for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
1760         ]
1761
1762         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1763
1764         # replayed packets
1765         replay_count += 5
1766         self.assertEqual(self.get_replay_counts(p), replay_count)
1767
1768         """
1769             case c: Seql > Tl
1770                 - pre-crypto check: algorithm predicts that the packet will shift the window
1771                 - integrity check: should pass
1772                 - post-crypto check: should pass
1773                     -> AR window is shifted
1774         """
1775         pkts = [
1776             (
1777                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1778                 / p.scapy_tra_sa.encrypt(
1779                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1780                     seq_num=seq,
1781                 )
1782             )
1783             for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
1784         ]
1785
1786         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1787
1788         """
1789             RFC 4303 Appendix A2. Case B
1790
1791             |-a-----+------b-----|
1792             =========
1793                     Tl
1794
1795             Case B implies Tl < W - 1
1796         """
1797
1798         # reset the VPP's RX AR window to Case B
1799         Tl = 30  # minimum window size of 64, we are sure to overlap
1800         Bl = seq_num(Tl - anti_replay_window_size + 1)
1801
1802         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1803
1804         """
1805         case a: Seql <= Tl < Bl
1806             - pre-crypto check: algorithm predicts that the packet is in the window
1807                 -> check for replayed packet
1808             - integrity check: should fail
1809             - post-crypto check: ...
1810         """
1811         pkts = [
1812             (
1813                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1814                 / p.scapy_tra_sa.encrypt(
1815                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1816                     seq_num=seq,
1817                 )
1818             )
1819             for seq in range(seq_num(5), seq_num(10))
1820         ]
1821
1822         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1823
1824         """
1825             case b: Tl < Seql < Bl
1826                 - pre-crypto check: algorithm predicts that the packet will shift the window
1827                 - integrity check: should pass
1828                 - post-crypto check: should pass
1829                     -> AR window is shifted
1830         """
1831         pkts = [
1832             (
1833                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1834                 / p.scapy_tra_sa.encrypt(
1835                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1836                     seq_num=seq,
1837                 )
1838             )
1839             for seq in range(seq_num(-50), seq_num(-20))
1840         ]
1841
1842         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1843
1844     def verify_tra_anti_replay_algorithm(self):
1845         if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
1846             self._verify_tra_anti_replay_algorithm_esn()
1847         else:
1848             self._verify_tra_anti_replay_algorithm_no_esn()
1849
1850
1851 class IpsecTra4Tests(IpsecTra4):
1852     """UT test methods for Transport v4"""
1853
1854     def test_tra_anti_replay(self):
1855         """ipsec v4 transport anti-replay test"""
1856         self.verify_tra_anti_replay()
1857
1858     def test_tra_anti_replay_algorithm(self):
1859         """ipsec v4 transport anti-replay algorithm test"""
1860         self.verify_tra_anti_replay_algorithm()
1861
1862     def test_tra_lost(self):
1863         """ipsec v4 transport lost packet test"""
1864         self.verify_tra_lost()
1865
1866     def test_tra_basic(self, count=1):
1867         """ipsec v4 transport basic test"""
1868         self.verify_tra_basic4(count=1)
1869
1870     def test_tra_burst(self):
1871         """ipsec v4 transport burst test"""
1872         self.verify_tra_basic4(count=257)
1873
1874
1875 class IpsecTra6(object):
1876     """verify methods for Transport v6"""
1877
1878     def verify_tra_basic6(self, count=1, payload_size=54):
1879         self.vapi.cli("clear errors")
1880         self.vapi.cli("clear ipsec sa")
1881         try:
1882             p = self.params[socket.AF_INET6]
1883             send_pkts = self.gen_encrypt_pkts6(
1884                 p,
1885                 p.scapy_tra_sa,
1886                 self.tra_if,
1887                 src=self.tra_if.remote_ip6,
1888                 dst=self.tra_if.local_ip6,
1889                 count=count,
1890                 payload_size=payload_size,
1891             )
1892             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1893             for rx in recv_pkts:
1894                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1895                 try:
1896                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1897                     self.assert_packet_checksums_valid(decrypted)
1898                 except:
1899                     self.logger.debug(ppp("Unexpected packet:", rx))
1900                     raise
1901         finally:
1902             self.logger.info(self.vapi.ppcli("show error"))
1903             self.logger.info(self.vapi.ppcli("show ipsec all"))
1904
1905         pkts = p.tra_sa_in.get_stats()["packets"]
1906         self.assertEqual(
1907             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1908         )
1909         pkts = p.tra_sa_out.get_stats()["packets"]
1910         self.assertEqual(
1911             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1912         )
1913         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1914         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1915
1916     def gen_encrypt_pkts_ext_hdrs6(
1917         self, sa, sw_intf, src, dst, count=1, payload_size=54
1918     ):
1919         return [
1920             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1921             / sa.encrypt(
1922                 IPv6(src=src, dst=dst)
1923                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1924             )
1925             for i in range(count)
1926         ]
1927
1928     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1929         return [
1930             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1931             / IPv6(src=src, dst=dst)
1932             / IPv6ExtHdrHopByHop()
1933             / IPv6ExtHdrFragment(id=2, offset=200)
1934             / Raw(b"\xff" * 200)
1935             for i in range(count)
1936         ]
1937
1938     def verify_tra_encrypted6(self, p, sa, rxs):
1939         decrypted = []
1940         for rx in rxs:
1941             self.assert_packet_checksums_valid(rx)
1942             try:
1943                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1944                 decrypted.append(decrypt_pkt)
1945                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1946                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1947             except:
1948                 self.logger.debug(ppp("Unexpected packet:", rx))
1949                 try:
1950                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1951                 except:
1952                     pass
1953                 raise
1954         return decrypted
1955
1956     def verify_tra_66_ext_hdrs(self, p):
1957         count = 63
1958
1959         #
1960         # check we can decrypt with options
1961         #
1962         tx = self.gen_encrypt_pkts_ext_hdrs6(
1963             p.scapy_tra_sa,
1964             self.tra_if,
1965             src=self.tra_if.remote_ip6,
1966             dst=self.tra_if.local_ip6,
1967             count=count,
1968         )
1969         self.send_and_expect(self.tra_if, tx, self.tra_if)
1970
1971         #
1972         # injecting a packet from ourselves to be routed of box is a hack
1973         # but it matches an outbout policy, alors je ne regrette rien
1974         #
1975
1976         # one extension before ESP
1977         tx = (
1978             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1979             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1980             / IPv6ExtHdrFragment(id=2, offset=200)
1981             / Raw(b"\xff" * 200)
1982         )
1983
1984         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1985         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1986
1987         for dc in dcs:
1988             # for reasons i'm not going to investigate scapy does not
1989             # created the correct headers after decrypt. but reparsing
1990             # the ipv6 packet fixes it
1991             dc = IPv6(raw(dc[IPv6]))
1992             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1993
1994         # two extensions before ESP
1995         tx = (
1996             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1997             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1998             / IPv6ExtHdrHopByHop()
1999             / IPv6ExtHdrFragment(id=2, offset=200)
2000             / Raw(b"\xff" * 200)
2001         )
2002
2003         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2004         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2005
2006         for dc in dcs:
2007             dc = IPv6(raw(dc[IPv6]))
2008             self.assertTrue(dc[IPv6ExtHdrHopByHop])
2009             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2010
2011         # two extensions before ESP, one after
2012         tx = (
2013             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
2014             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
2015             / IPv6ExtHdrHopByHop()
2016             / IPv6ExtHdrFragment(id=2, offset=200)
2017             / IPv6ExtHdrDestOpt()
2018             / Raw(b"\xff" * 200)
2019         )
2020
2021         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2022         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2023
2024         for dc in dcs:
2025             dc = IPv6(raw(dc[IPv6]))
2026             self.assertTrue(dc[IPv6ExtHdrDestOpt])
2027             self.assertTrue(dc[IPv6ExtHdrHopByHop])
2028             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2029
2030
2031 class IpsecTra6Tests(IpsecTra6):
2032     """UT test methods for Transport v6"""
2033
2034     def test_tra_basic6(self):
2035         """ipsec v6 transport basic test"""
2036         self.verify_tra_basic6(count=1)
2037
2038     def test_tra_burst6(self):
2039         """ipsec v6 transport burst test"""
2040         self.verify_tra_basic6(count=257)
2041
2042
2043 class IpsecTra6ExtTests(IpsecTra6):
2044     def test_tra_ext_hdrs_66(self):
2045         """ipsec 6o6 tra extension headers test"""
2046         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
2047
2048
2049 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
2050     """UT test methods for Transport v6 and v4"""
2051
2052     pass
2053
2054
2055 class IpsecTun4(object):
2056     """verify methods for Tunnel v4"""
2057
2058     def verify_counters4(self, p, count, n_frags=None, worker=None):
2059         if not n_frags:
2060             n_frags = count
2061         if hasattr(p, "spd_policy_in_any"):
2062             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
2063             self.assertEqual(
2064                 pkts,
2065                 count,
2066                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
2067             )
2068
2069         if hasattr(p, "tun_sa_in"):
2070             pkts = p.tun_sa_in.get_stats(worker)["packets"]
2071             self.assertEqual(
2072                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2073             )
2074             pkts = p.tun_sa_out.get_stats(worker)["packets"]
2075             self.assertEqual(
2076                 pkts,
2077                 n_frags,
2078                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2079             )
2080
2081         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
2082         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
2083
2084     def verify_decrypted(self, p, rxs):
2085         for rx in rxs:
2086             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2087             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2088             self.assert_packet_checksums_valid(rx)
2089
2090     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
2091         align = sa.crypt_algo.block_size
2092         if align < 4:
2093             align = 4
2094         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
2095         exp_len += sa.crypt_algo.iv_size
2096         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
2097         self.assertEqual(exp_len, len(esp_payload))
2098
2099     def verify_encrypted(self, p, sa, rxs):
2100         decrypt_pkts = []
2101         for rx in rxs:
2102             if p.nat_header:
2103                 self.assertEqual(rx[UDP].dport, p.nat_header.dport)
2104             self.assert_packet_checksums_valid(rx)
2105             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
2106             try:
2107                 rx_ip = rx[IP]
2108                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
2109                 if not decrypt_pkt.haslayer(IP):
2110                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
2111                 if rx_ip.proto == socket.IPPROTO_ESP:
2112                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
2113                 decrypt_pkts.append(decrypt_pkt)
2114                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2115                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2116             except:
2117                 self.logger.debug(ppp("Unexpected packet:", rx))
2118                 try:
2119                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2120                 except:
2121                     pass
2122                 raise
2123         pkts = reassemble4(decrypt_pkts)
2124         for pkt in pkts:
2125             self.assert_packet_checksums_valid(pkt)
2126
2127     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
2128         self.vapi.cli("clear errors")
2129         self.vapi.cli("clear ipsec counters")
2130         self.vapi.cli("clear ipsec sa")
2131         if not n_rx:
2132             n_rx = count
2133         try:
2134             send_pkts = self.gen_encrypt_pkts(
2135                 p,
2136                 p.scapy_tun_sa,
2137                 self.tun_if,
2138                 src=p.remote_tun_if_host,
2139                 dst=self.pg1.remote_ip4,
2140                 count=count,
2141                 payload_size=payload_size,
2142             )
2143             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2144             self.verify_decrypted(p, recv_pkts)
2145
2146             send_pkts = self.gen_pkts(
2147                 self.pg1,
2148                 src=self.pg1.remote_ip4,
2149                 dst=p.remote_tun_if_host,
2150                 count=count,
2151                 payload_size=payload_size,
2152             )
2153             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
2154             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2155
2156             for rx in recv_pkts:
2157                 self.assertEqual(rx[IP].src, p.tun_src)
2158                 self.assertEqual(rx[IP].dst, p.tun_dst)
2159
2160         finally:
2161             self.logger.info(self.vapi.ppcli("show error"))
2162             self.logger.info(self.vapi.ppcli("show ipsec all"))
2163
2164         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
2165         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
2166         self.verify_counters4(p, count, n_rx)
2167
2168     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
2169         self.vapi.cli("clear errors")
2170         if not n_rx:
2171             n_rx = count
2172         try:
2173             send_pkts = self.gen_encrypt_pkts(
2174                 p,
2175                 p.scapy_tun_sa,
2176                 self.tun_if,
2177                 src=p.remote_tun_if_host,
2178                 dst=self.pg1.remote_ip4,
2179                 count=count,
2180             )
2181             self.send_and_assert_no_replies(self.tun_if, send_pkts)
2182
2183             send_pkts = self.gen_pkts(
2184                 self.pg1,
2185                 src=self.pg1.remote_ip4,
2186                 dst=p.remote_tun_if_host,
2187                 count=count,
2188                 payload_size=payload_size,
2189             )
2190             self.send_and_assert_no_replies(self.pg1, send_pkts)
2191
2192         finally:
2193             self.logger.info(self.vapi.ppcli("show error"))
2194             self.logger.info(self.vapi.ppcli("show ipsec all"))
2195
2196     def verify_tun_reass_44(self, p):
2197         self.vapi.cli("clear errors")
2198         self.vapi.ip_reassembly_enable_disable(
2199             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
2200         )
2201
2202         try:
2203             send_pkts = self.gen_encrypt_pkts(
2204                 p,
2205                 p.scapy_tun_sa,
2206                 self.tun_if,
2207                 src=p.remote_tun_if_host,
2208                 dst=self.pg1.remote_ip4,
2209                 payload_size=1900,
2210                 count=1,
2211             )
2212             send_pkts = fragment_rfc791(send_pkts[0], 1400)
2213             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2214             self.verify_decrypted(p, recv_pkts)
2215
2216             send_pkts = self.gen_pkts(
2217                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
2218             )
2219             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2220             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2221
2222         finally:
2223             self.logger.info(self.vapi.ppcli("show error"))
2224             self.logger.info(self.vapi.ppcli("show ipsec all"))
2225
2226         self.verify_counters4(p, 1, 1)
2227         self.vapi.ip_reassembly_enable_disable(
2228             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
2229         )
2230
2231     def verify_tun_64(self, p, count=1):
2232         self.vapi.cli("clear errors")
2233         self.vapi.cli("clear ipsec sa")
2234         try:
2235             send_pkts = self.gen_encrypt_pkts6(
2236                 p,
2237                 p.scapy_tun_sa,
2238                 self.tun_if,
2239                 src=p.remote_tun_if_host6,
2240                 dst=self.pg1.remote_ip6,
2241                 count=count,
2242             )
2243             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2244             for recv_pkt in recv_pkts:
2245                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
2246                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
2247                 self.assert_packet_checksums_valid(recv_pkt)
2248             send_pkts = self.gen_pkts6(
2249                 p,
2250                 self.pg1,
2251                 src=self.pg1.remote_ip6,
2252                 dst=p.remote_tun_if_host6,
2253                 count=count,
2254             )
2255             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2256             for recv_pkt in recv_pkts:
2257                 try:
2258                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
2259                     if not decrypt_pkt.haslayer(IPv6):
2260                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2261                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2262                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
2263                     self.assert_packet_checksums_valid(decrypt_pkt)
2264                 except:
2265                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
2266                     try:
2267                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2268                     except:
2269                         pass
2270                     raise
2271         finally:
2272             self.logger.info(self.vapi.ppcli("show error"))
2273             self.logger.info(self.vapi.ppcli("show ipsec all"))
2274
2275         self.verify_counters4(p, count)
2276
2277     def verify_keepalive(self, p):
2278         # the sizeof Raw is calculated to pad to the minimum ehternet
2279         # frame size of 64 btyes
2280         pkt = (
2281             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2282             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2283             / UDP(sport=333, dport=4500)
2284             / Raw(b"\xff")
2285             / Padding(0 * 21)
2286         )
2287         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2288         self.assert_error_counter_equal(
2289             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
2290         )
2291
2292         pkt = (
2293             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2294             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2295             / UDP(sport=333, dport=4500)
2296             / Raw(b"\xfe")
2297         )
2298         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2299         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
2300
2301         pkt = (
2302             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2303             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2304             / UDP(sport=333, dport=4500)
2305             / Raw(b"\xfe")
2306             / Padding(0 * 21)
2307         )
2308         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2309         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
2310
2311
2312 class IpsecTun4Tests(IpsecTun4):
2313     """UT test methods for Tunnel v4"""
2314
2315     def test_tun_basic44(self):
2316         """ipsec 4o4 tunnel basic test"""
2317         self.verify_tun_44(self.params[socket.AF_INET], count=1)
2318         self.tun_if.admin_down()
2319         self.tun_if.resolve_arp()
2320         self.tun_if.admin_up()
2321         self.verify_tun_44(self.params[socket.AF_INET], count=1)
2322
2323     def test_tun_reass_basic44(self):
2324         """ipsec 4o4 tunnel basic reassembly test"""
2325         self.verify_tun_reass_44(self.params[socket.AF_INET])
2326
2327     def test_tun_burst44(self):
2328         """ipsec 4o4 tunnel burst test"""
2329         self.verify_tun_44(self.params[socket.AF_INET], count=127)
2330
2331
2332 class IpsecTun6(object):
2333     """verify methods for Tunnel v6"""
2334
2335     def verify_counters6(self, p_in, p_out, count, worker=None):
2336         if hasattr(p_in, "tun_sa_in"):
2337             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
2338             self.assertEqual(
2339                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2340             )
2341         if hasattr(p_out, "tun_sa_out"):
2342             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
2343             self.assertEqual(
2344                 pkts,
2345                 count,
2346                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2347             )
2348         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
2349         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
2350
2351     def verify_decrypted6(self, p, rxs):
2352         for rx in rxs:
2353             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2354             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2355             self.assert_packet_checksums_valid(rx)
2356
2357     def verify_encrypted6(self, p, sa, rxs):
2358         for rx in rxs:
2359             self.assert_packet_checksums_valid(rx)
2360             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
2361             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
2362             if p.outer_flow_label:
2363                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
2364             try:
2365                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
2366                 if not decrypt_pkt.haslayer(IPv6):
2367                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2368                 self.assert_packet_checksums_valid(decrypt_pkt)
2369                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2370                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2371                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
2372                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
2373             except:
2374                 self.logger.debug(ppp("Unexpected packet:", rx))
2375                 try:
2376                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2377                 except:
2378                     pass
2379                 raise
2380
2381     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
2382         self.vapi.cli("clear errors")
2383         self.vapi.cli("clear ipsec sa")
2384
2385         send_pkts = self.gen_pkts6(
2386             p_in,
2387             self.pg1,
2388             src=self.pg1.remote_ip6,
2389             dst=p_in.remote_tun_if_host,
2390             count=count,
2391             payload_size=payload_size,
2392         )
2393         self.send_and_assert_no_replies(self.tun_if, send_pkts)
2394         self.logger.info(self.vapi.cli("sh punt stats"))
2395
2396     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
2397         self.vapi.cli("clear errors")
2398         self.vapi.cli("clear ipsec sa")
2399
2400         send_pkts = self.gen_encrypt_pkts6(
2401             p_in,
2402             p_in.scapy_tun_sa,
2403             self.tun_if,
2404             src=p_in.remote_tun_if_host,
2405             dst=self.pg1.remote_ip6,
2406             count=count,
2407         )
2408         self.send_and_assert_no_replies(self.tun_if, send_pkts)
2409
2410     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
2411         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
2412         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
2413
2414     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
2415         self.vapi.cli("clear errors")
2416         self.vapi.cli("clear ipsec sa")
2417         if not p_out:
2418             p_out = p_in
2419         try:
2420             send_pkts = self.gen_encrypt_pkts6(
2421                 p_in,
2422                 p_in.scapy_tun_sa,
2423                 self.tun_if,
2424                 src=p_in.remote_tun_if_host,
2425                 dst=self.pg1.remote_ip6,
2426                 count=count,
2427                 payload_size=payload_size,
2428             )
2429             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2430             self.verify_decrypted6(p_in, recv_pkts)
2431
2432             send_pkts = self.gen_pkts6(
2433                 p_in,
2434                 self.pg1,
2435                 src=self.pg1.remote_ip6,
2436                 dst=p_out.remote_tun_if_host,
2437                 count=count,
2438                 payload_size=payload_size,
2439             )
2440             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2441             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
2442
2443             for rx in recv_pkts:
2444                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
2445                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
2446
2447         finally:
2448             self.logger.info(self.vapi.ppcli("show error"))
2449             self.logger.info(self.vapi.ppcli("show ipsec all"))
2450         self.verify_counters6(p_in, p_out, count)
2451
2452     def verify_tun_reass_66(self, p):
2453         self.vapi.cli("clear errors")
2454         self.vapi.ip_reassembly_enable_disable(
2455             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
2456         )
2457
2458         try:
2459             send_pkts = self.gen_encrypt_pkts6(
2460                 p,
2461                 p.scapy_tun_sa,
2462                 self.tun_if,
2463                 src=p.remote_tun_if_host,
2464                 dst=self.pg1.remote_ip6,
2465                 count=1,
2466                 payload_size=1850,
2467             )
2468             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
2469             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2470             self.verify_decrypted6(p, recv_pkts)
2471
2472             send_pkts = self.gen_pkts6(
2473                 p,
2474                 self.pg1,
2475                 src=self.pg1.remote_ip6,
2476                 dst=p.remote_tun_if_host,
2477                 count=1,
2478                 payload_size=64,
2479             )
2480             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2481             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2482         finally:
2483             self.logger.info(self.vapi.ppcli("show error"))
2484             self.logger.info(self.vapi.ppcli("show ipsec all"))
2485         self.verify_counters6(p, p, 1)
2486         self.vapi.ip_reassembly_enable_disable(
2487             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
2488         )
2489
2490     def verify_tun_46(self, p, count=1):
2491         """ipsec 4o6 tunnel basic test"""
2492         self.vapi.cli("clear errors")
2493         self.vapi.cli("clear ipsec sa")
2494         try:
2495             send_pkts = self.gen_encrypt_pkts(
2496                 p,
2497                 p.scapy_tun_sa,
2498                 self.tun_if,
2499                 src=p.remote_tun_if_host4,
2500                 dst=self.pg1.remote_ip4,
2501                 count=count,
2502             )
2503             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2504             for recv_pkt in recv_pkts:
2505                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
2506                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
2507                 self.assert_packet_checksums_valid(recv_pkt)
2508             send_pkts = self.gen_pkts(
2509                 self.pg1,
2510                 src=self.pg1.remote_ip4,
2511                 dst=p.remote_tun_if_host4,
2512                 count=count,
2513             )
2514             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2515             for recv_pkt in recv_pkts:
2516                 try:
2517                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
2518                     if not decrypt_pkt.haslayer(IP):
2519                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
2520                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2521                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
2522                     self.assert_packet_checksums_valid(decrypt_pkt)
2523                 except:
2524                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
2525                     try:
2526                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2527                     except:
2528                         pass
2529                     raise
2530         finally:
2531             self.logger.info(self.vapi.ppcli("show error"))
2532             self.logger.info(self.vapi.ppcli("show ipsec all"))
2533         self.verify_counters6(p, p, count)
2534
2535     def verify_keepalive(self, p):
2536         # the sizeof Raw is calculated to pad to the minimum ehternet
2537         # frame size of 64 btyes
2538         pkt = (
2539             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2540             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2541             / UDP(sport=333, dport=4500)
2542             / Raw(b"\xff")
2543             / Padding(0 * 1)
2544         )
2545         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2546         self.assert_error_counter_equal(
2547             "/err/%s/nat_keepalive" % self.tun6_input_node, 31
2548         )
2549
2550         pkt = (
2551             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2552             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2553             / UDP(sport=333, dport=4500)
2554             / Raw(b"\xfe")
2555         )
2556         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2557         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
2558
2559         pkt = (
2560             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2561             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2562             / UDP(sport=333, dport=4500)
2563             / Raw(b"\xfe")
2564             / Padding(0 * 21)
2565         )
2566         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2567         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
2568
2569
2570 class IpsecTun6Tests(IpsecTun6):
2571     """UT test methods for Tunnel v6"""
2572
2573     def test_tun_basic66(self):
2574         """ipsec 6o6 tunnel basic test"""
2575         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
2576
2577     def test_tun_reass_basic66(self):
2578         """ipsec 6o6 tunnel basic reassembly test"""
2579         self.verify_tun_reass_66(self.params[socket.AF_INET6])
2580
2581     def test_tun_burst66(self):
2582         """ipsec 6o6 tunnel burst test"""
2583         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
2584
2585
2586 class IpsecTun6HandoffTests(IpsecTun6):
2587     """UT test methods for Tunnel v6 with multiple workers"""
2588
2589     vpp_worker_count = 2
2590
2591     def test_tun_handoff_66(self):
2592         """ipsec 6o6 tunnel worker hand-off test"""
2593         self.vapi.cli("clear errors")
2594         self.vapi.cli("clear ipsec sa")
2595
2596         N_PKTS = 15
2597         p = self.params[socket.AF_INET6]
2598
2599         # inject alternately on worker 0 and 1. all counts on the SA
2600         # should be against worker 0
2601         for worker in [0, 1, 0, 1]:
2602             send_pkts = self.gen_encrypt_pkts6(
2603                 p,
2604                 p.scapy_tun_sa,
2605                 self.tun_if,
2606                 src=p.remote_tun_if_host,
2607                 dst=self.pg1.remote_ip6,
2608                 count=N_PKTS,
2609             )
2610             recv_pkts = self.send_and_expect(
2611                 self.tun_if, send_pkts, self.pg1, worker=worker
2612             )
2613             self.verify_decrypted6(p, recv_pkts)
2614
2615             send_pkts = self.gen_pkts6(
2616                 p,
2617                 self.pg1,
2618                 src=self.pg1.remote_ip6,
2619                 dst=p.remote_tun_if_host,
2620                 count=N_PKTS,
2621             )
2622             recv_pkts = self.send_and_expect(
2623                 self.pg1, send_pkts, self.tun_if, worker=worker
2624             )
2625             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2626
2627         # all counts against the first worker that was used
2628         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
2629
2630
2631 class IpsecTun4HandoffTests(IpsecTun4):
2632     """UT test methods for Tunnel v4 with multiple workers"""
2633
2634     vpp_worker_count = 2
2635
2636     def test_tun_handooff_44(self):
2637         """ipsec 4o4 tunnel worker hand-off test"""
2638         self.vapi.cli("clear errors")
2639         self.vapi.cli("clear ipsec sa")
2640
2641         N_PKTS = 15
2642         p = self.params[socket.AF_INET]
2643
2644         # inject alternately on worker 0 and 1. all counts on the SA
2645         # should be against worker 0
2646         for worker in [0, 1, 0, 1]:
2647             send_pkts = self.gen_encrypt_pkts(
2648                 p,
2649                 p.scapy_tun_sa,
2650                 self.tun_if,
2651                 src=p.remote_tun_if_host,
2652                 dst=self.pg1.remote_ip4,
2653                 count=N_PKTS,
2654             )
2655             recv_pkts = self.send_and_expect(
2656                 self.tun_if, send_pkts, self.pg1, worker=worker
2657             )
2658             self.verify_decrypted(p, recv_pkts)
2659
2660             send_pkts = self.gen_pkts(
2661                 self.pg1,
2662                 src=self.pg1.remote_ip4,
2663                 dst=p.remote_tun_if_host,
2664                 count=N_PKTS,
2665             )
2666             recv_pkts = self.send_and_expect(
2667                 self.pg1, send_pkts, self.tun_if, worker=worker
2668             )
2669             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2670
2671         # all counts against the first worker that was used
2672         self.verify_counters4(p, 4 * N_PKTS, worker=0)
2673
2674
2675 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
2676     """UT test methods for Tunnel v6 & v4"""
2677
2678     pass
2679
2680
2681 class IPSecIPv4Fwd(VppTestCase):
2682     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
2683
2684     @classmethod
2685     def setUpConstants(cls):
2686         super(IPSecIPv4Fwd, cls).setUpConstants()
2687
2688     def setUp(self):
2689         super(IPSecIPv4Fwd, self).setUp()
2690         # store SPD objects so we can remove configs on tear down
2691         self.spd_objs = []
2692         self.spd_policies = []
2693
2694     def tearDown(self):
2695         # remove SPD policies
2696         for obj in self.spd_policies:
2697             obj.remove_vpp_config()
2698         self.spd_policies = []
2699         # remove SPD items (interface bindings first, then SPD)
2700         for obj in reversed(self.spd_objs):
2701             obj.remove_vpp_config()
2702         self.spd_objs = []
2703         # close down pg intfs
2704         for pg in self.pg_interfaces:
2705             pg.unconfig_ip4()
2706             pg.admin_down()
2707         super(IPSecIPv4Fwd, self).tearDown()
2708
2709     def create_interfaces(self, num_ifs=2):
2710         # create interfaces pg0 ... pg<num_ifs>
2711         self.create_pg_interfaces(range(num_ifs))
2712         for pg in self.pg_interfaces:
2713             # put the interface up
2714             pg.admin_up()
2715             # configure IPv4 address on the interface
2716             pg.config_ip4()
2717             # resolve ARP, so that we know VPP MAC
2718             pg.resolve_arp()
2719         self.logger.info(self.vapi.ppcli("show int addr"))
2720
2721     def spd_create_and_intf_add(self, spd_id, pg_list):
2722         spd = VppIpsecSpd(self, spd_id)
2723         spd.add_vpp_config()
2724         self.spd_objs.append(spd)
2725         for pg in pg_list:
2726             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2727             spdItf.add_vpp_config()
2728             self.spd_objs.append(spdItf)
2729
2730     def get_policy(self, policy_type):
2731         e = VppEnum.vl_api_ipsec_spd_action_t
2732         if policy_type == "protect":
2733             return e.IPSEC_API_SPD_ACTION_PROTECT
2734         elif policy_type == "bypass":
2735             return e.IPSEC_API_SPD_ACTION_BYPASS
2736         elif policy_type == "discard":
2737             return e.IPSEC_API_SPD_ACTION_DISCARD
2738         else:
2739             raise Exception("Invalid policy type: %s", policy_type)
2740
2741     def spd_add_rem_policy(
2742         self,
2743         spd_id,
2744         src_if,
2745         dst_if,
2746         proto,
2747         is_out,
2748         priority,
2749         policy_type,
2750         remove=False,
2751         all_ips=False,
2752         ip_range=False,
2753         local_ip_start=ip_address("0.0.0.0"),
2754         local_ip_stop=ip_address("255.255.255.255"),
2755         remote_ip_start=ip_address("0.0.0.0"),
2756         remote_ip_stop=ip_address("255.255.255.255"),
2757         remote_port_start=0,
2758         remote_port_stop=65535,
2759         local_port_start=0,
2760         local_port_stop=65535,
2761     ):
2762         spd = VppIpsecSpd(self, spd_id)
2763
2764         if all_ips:
2765             src_range_low = ip_address("0.0.0.0")
2766             src_range_high = ip_address("255.255.255.255")
2767             dst_range_low = ip_address("0.0.0.0")
2768             dst_range_high = ip_address("255.255.255.255")
2769
2770         elif ip_range:
2771             src_range_low = local_ip_start
2772             src_range_high = local_ip_stop
2773             dst_range_low = remote_ip_start
2774             dst_range_high = remote_ip_stop
2775
2776         else:
2777             src_range_low = src_if.remote_ip4
2778             src_range_high = src_if.remote_ip4
2779             dst_range_low = dst_if.remote_ip4
2780             dst_range_high = dst_if.remote_ip4
2781
2782         spdEntry = VppIpsecSpdEntry(
2783             self,
2784             spd,
2785             0,
2786             src_range_low,
2787             src_range_high,
2788             dst_range_low,
2789             dst_range_high,
2790             proto,
2791             priority=priority,
2792             policy=self.get_policy(policy_type),
2793             is_outbound=is_out,
2794             remote_port_start=remote_port_start,
2795             remote_port_stop=remote_port_stop,
2796             local_port_start=local_port_start,
2797             local_port_stop=local_port_stop,
2798         )
2799
2800         if remove is False:
2801             spdEntry.add_vpp_config()
2802             self.spd_policies.append(spdEntry)
2803         else:
2804             spdEntry.remove_vpp_config()
2805             self.spd_policies.remove(spdEntry)
2806         self.logger.info(self.vapi.ppcli("show ipsec all"))
2807         return spdEntry
2808
2809     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2810         packets = []
2811         for i in range(pkt_count):
2812             # create packet info stored in the test case instance
2813             info = self.create_packet_info(src_if, dst_if)
2814             # convert the info into packet payload
2815             payload = self.info_to_payload(info)
2816             # create the packet itself
2817             p = (
2818                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2819                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2820                 / UDP(sport=src_prt, dport=dst_prt)
2821                 / Raw(payload)
2822             )
2823             # store a copy of the packet in the packet info
2824             info.data = p.copy()
2825             # append the packet to the list
2826             packets.append(p)
2827         # return the created packet list
2828         return packets
2829
2830     def verify_capture(self, src_if, dst_if, capture):
2831         packet_info = None
2832         for packet in capture:
2833             try:
2834                 ip = packet[IP]
2835                 udp = packet[UDP]
2836                 # convert the payload to packet info object
2837                 payload_info = self.payload_to_info(packet)
2838                 # make sure the indexes match
2839                 self.assert_equal(
2840                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2841                 )
2842                 self.assert_equal(
2843                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2844                 )
2845                 packet_info = self.get_next_packet_info_for_interface2(
2846                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2847                 )
2848                 # make sure we didn't run out of saved packets
2849                 self.assertIsNotNone(packet_info)
2850                 self.assert_equal(
2851                     payload_info.index, packet_info.index, "packet info index"
2852                 )
2853                 saved_packet = packet_info.data  # fetch the saved packet
2854                 # assert the values match
2855                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2856                 # ... more assertions here
2857                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2858             except Exception as e:
2859                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2860                 raise
2861         remaining_packet = self.get_next_packet_info_for_interface2(
2862             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2863         )
2864         self.assertIsNone(
2865             remaining_packet,
2866             "Interface %s: Packet expected from interface "
2867             "%s didn't arrive" % (dst_if.name, src_if.name),
2868         )
2869
2870     def verify_policy_match(self, pkt_count, spdEntry):
2871         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2872         matched_pkts = spdEntry.get_stats().get("packets")
2873         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2874         self.assert_equal(pkt_count, matched_pkts)
2875
2876
2877 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2878     @classmethod
2879     def setUpConstants(cls):
2880         super(SpdFlowCacheTemplate, cls).setUpConstants()
2881         # Override this method with required cmdline parameters e.g.
2882         # cls.vpp_cmdline.extend(["ipsec", "{",
2883         #                         "ipv4-outbound-spd-flow-cache on",
2884         #                         "}"])
2885         # cls.logger.info("VPP modified cmdline is %s" % " "
2886         #                 .join(cls.vpp_cmdline))
2887
2888     def setUp(self):
2889         super(SpdFlowCacheTemplate, self).setUp()
2890
2891     def tearDown(self):
2892         super(SpdFlowCacheTemplate, self).tearDown()
2893
2894     def get_spd_flow_cache_entries(self, outbound):
2895         """'show ipsec spd' output:
2896         ipv4-inbound-spd-flow-cache-entries: 0
2897         ipv4-outbound-spd-flow-cache-entries: 0
2898         """
2899         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2900         # match the relevant section of 'show ipsec spd' output
2901         if outbound:
2902             regex_match = re.search(
2903                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2904                 show_ipsec_reply,
2905                 re.DOTALL,
2906             )
2907         else:
2908             regex_match = re.search(
2909                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2910             )
2911         if regex_match is None:
2912             raise Exception(
2913                 "Unable to find spd flow cache entries \
2914                 in 'show ipsec spd' CLI output - regex failed to match"
2915             )
2916         else:
2917             try:
2918                 num_entries = int(regex_match.group(1))
2919             except ValueError:
2920                 raise Exception(
2921                     "Unable to get spd flow cache entries \
2922                 from 'show ipsec spd' string: %s",
2923                     regex_match.group(0),
2924                 )
2925             self.logger.info("%s", regex_match.group(0))
2926         return num_entries
2927
2928     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2929         self.assertEqual(
2930             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2931         )
2932
2933     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2934         self.assertEqual(
2935             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2936         )
2937
2938     def crc32_supported(self):
2939         # lscpu is part of util-linux package, available on all Linux Distros
2940         stream = os.popen("lscpu")
2941         cpu_info = stream.read()
2942         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2943         # see vppinfra/crc32.h
2944         if "crc32" or "sse4_2" in cpu_info:
2945             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2946             return True
2947         else:
2948             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2949             return False
2950
2951
2952 class IPSecIPv6Fwd(VppTestCase):
2953     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2954
2955     @classmethod
2956     def setUpConstants(cls):
2957         super(IPSecIPv6Fwd, cls).setUpConstants()
2958
2959     def setUp(self):
2960         super(IPSecIPv6Fwd, self).setUp()
2961         # store SPD objects so we can remove configs on tear down
2962         self.spd_objs = []
2963         self.spd_policies = []
2964
2965     def tearDown(self):
2966         # remove SPD policies
2967         for obj in self.spd_policies:
2968             obj.remove_vpp_config()
2969         self.spd_policies = []
2970         # remove SPD items (interface bindings first, then SPD)
2971         for obj in reversed(self.spd_objs):
2972             obj.remove_vpp_config()
2973         self.spd_objs = []
2974         # close down pg intfs
2975         for pg in self.pg_interfaces:
2976             pg.unconfig_ip6()
2977             pg.admin_down()
2978         super(IPSecIPv6Fwd, self).tearDown()
2979
2980     def create_interfaces(self, num_ifs=2):
2981         # create interfaces pg0 ... pg<num_ifs>
2982         self.create_pg_interfaces(range(num_ifs))
2983         for pg in self.pg_interfaces:
2984             # put the interface up
2985             pg.admin_up()
2986             # configure IPv6 address on the interface
2987             pg.config_ip6()
2988             pg.resolve_ndp()
2989         self.logger.info(self.vapi.ppcli("show int addr"))
2990
2991     def spd_create_and_intf_add(self, spd_id, pg_list):
2992         spd = VppIpsecSpd(self, spd_id)
2993         spd.add_vpp_config()
2994         self.spd_objs.append(spd)
2995         for pg in pg_list:
2996             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2997             spdItf.add_vpp_config()
2998             self.spd_objs.append(spdItf)
2999
3000     def get_policy(self, policy_type):
3001         e = VppEnum.vl_api_ipsec_spd_action_t
3002         if policy_type == "protect":
3003             return e.IPSEC_API_SPD_ACTION_PROTECT
3004         elif policy_type == "bypass":
3005             return e.IPSEC_API_SPD_ACTION_BYPASS
3006         elif policy_type == "discard":
3007             return e.IPSEC_API_SPD_ACTION_DISCARD
3008         else:
3009             raise Exception("Invalid policy type: %s", policy_type)
3010
3011     def spd_add_rem_policy(
3012         self,
3013         spd_id,
3014         src_if,
3015         dst_if,
3016         proto,
3017         is_out,
3018         priority,
3019         policy_type,
3020         remove=False,
3021         all_ips=False,
3022         ip_range=False,
3023         local_ip_start=ip_address("0::0"),
3024         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3025         remote_ip_start=ip_address("0::0"),
3026         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3027         remote_port_start=0,
3028         remote_port_stop=65535,
3029         local_port_start=0,
3030         local_port_stop=65535,
3031     ):
3032         spd = VppIpsecSpd(self, spd_id)
3033
3034         if all_ips:
3035             src_range_low = ip_address("0::0")
3036             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3037             dst_range_low = ip_address("0::0")
3038             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3039
3040         elif ip_range:
3041             src_range_low = local_ip_start
3042             src_range_high = local_ip_stop
3043             dst_range_low = remote_ip_start
3044             dst_range_high = remote_ip_stop
3045
3046         else:
3047             src_range_low = src_if.remote_ip6
3048             src_range_high = src_if.remote_ip6
3049             dst_range_low = dst_if.remote_ip6
3050             dst_range_high = dst_if.remote_ip6
3051
3052         spdEntry = VppIpsecSpdEntry(
3053             self,
3054             spd,
3055             0,
3056             src_range_low,
3057             src_range_high,
3058             dst_range_low,
3059             dst_range_high,
3060             proto,
3061             priority=priority,
3062             policy=self.get_policy(policy_type),
3063             is_outbound=is_out,
3064             remote_port_start=remote_port_start,
3065             remote_port_stop=remote_port_stop,
3066             local_port_start=local_port_start,
3067             local_port_stop=local_port_stop,
3068         )
3069
3070         if remove is False:
3071             spdEntry.add_vpp_config()
3072             self.spd_policies.append(spdEntry)
3073         else:
3074             spdEntry.remove_vpp_config()
3075             self.spd_policies.remove(spdEntry)
3076         self.logger.info(self.vapi.ppcli("show ipsec all"))
3077         return spdEntry
3078
3079     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
3080         packets = []
3081         for i in range(pkt_count):
3082             # create packet info stored in the test case instance
3083             info = self.create_packet_info(src_if, dst_if)
3084             # convert the info into packet payload
3085             payload = self.info_to_payload(info)
3086             # create the packet itself
3087             p = (
3088                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
3089                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
3090                 / UDP(sport=src_prt, dport=dst_prt)
3091                 / Raw(payload)
3092             )
3093             # store a copy of the packet in the packet info
3094             info.data = p.copy()
3095             # append the packet to the list
3096             packets.append(p)
3097         # return the created packet list
3098         return packets
3099
3100     def verify_capture(self, src_if, dst_if, capture):
3101         packet_info = None
3102         for packet in capture:
3103             try:
3104                 ip = packet[IPv6]
3105                 udp = packet[UDP]
3106                 # convert the payload to packet info object
3107                 payload_info = self.payload_to_info(packet)
3108                 # make sure the indexes match
3109                 self.assert_equal(
3110                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
3111                 )
3112                 self.assert_equal(
3113                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
3114                 )
3115                 packet_info = self.get_next_packet_info_for_interface2(
3116                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
3117                 )
3118                 # make sure we didn't run out of saved packets
3119                 self.assertIsNotNone(packet_info)
3120                 self.assert_equal(
3121                     payload_info.index, packet_info.index, "packet info index"
3122                 )
3123                 saved_packet = packet_info.data  # fetch the saved packet
3124                 # assert the values match
3125                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
3126                 # ... more assertions here
3127                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
3128             except Exception as e:
3129                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3130                 raise
3131         remaining_packet = self.get_next_packet_info_for_interface2(
3132             src_if.sw_if_index, dst_if.sw_if_index, packet_info
3133         )
3134         self.assertIsNone(
3135             remaining_packet,
3136             "Interface %s: Packet expected from interface "
3137             "%s didn't arrive" % (dst_if.name, src_if.name),
3138         )
3139
3140     def verify_policy_match(self, pkt_count, spdEntry):
3141         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
3142         matched_pkts = spdEntry.get_stats().get("packets")
3143         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
3144         self.assert_equal(pkt_count, matched_pkts)
3145
3146
3147 if __name__ == "__main__":
3148     unittest.main(testRunner=VppTestRunner)