vcl: allow more rx events on peek
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase
19 from asfframework import VppTestRunner
20 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
21 from vpp_papi import VppEnum
22
23 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
24 from ipaddress import ip_address
25 from re import search
26 from os import popen
27
28
29 class IPsecIPv4Params:
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.anti_replay_window_size = 64
56
57         self.auth_algo_vpp_id = (
58             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
59         )
60         self.auth_algo = "HMAC-SHA1-96"  # scapy name
61         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
62
63         self.crypt_algo_vpp_id = (
64             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
65         )
66         self.crypt_algo = "AES-CBC"  # scapy name
67         self.crypt_key = b"JPjyOWBeVEQiMe7h"
68         self.salt = 0
69         self.flags = 0
70         self.nat_header = None
71         self.tun_flags = (
72             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
73         )
74         self.dscp = 0
75         self.async_mode = False
76
77
78 class IPsecIPv6Params:
79     addr_type = socket.AF_INET6
80     addr_any = "0::0"
81     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
82     addr_len = 128
83     is_ipv6 = 1
84
85     def __init__(self):
86         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
87         self.remote_tun_if_host4 = "1.1.1.1"
88
89         self.scapy_tun_sa_id = 500
90         self.scapy_tun_spi = 3001
91         self.vpp_tun_sa_id = 600
92         self.vpp_tun_spi = 3000
93
94         self.scapy_tra_sa_id = 700
95         self.scapy_tra_spi = 4001
96         self.vpp_tra_sa_id = 800
97         self.vpp_tra_spi = 4000
98
99         self.outer_hop_limit = 64
100         self.inner_hop_limit = 255
101         self.outer_flow_label = 0
102         self.inner_flow_label = 0x12345
103
104         self.anti_replay_window_size = 64
105
106         self.auth_algo_vpp_id = (
107             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
108         )
109         self.auth_algo = "HMAC-SHA1-96"  # scapy name
110         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
111
112         self.crypt_algo_vpp_id = (
113             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
114         )
115         self.crypt_algo = "AES-CBC"  # scapy name
116         self.crypt_key = b"JPjyOWBeVEQiMe7h"
117         self.salt = 0
118         self.flags = 0
119         self.nat_header = None
120         self.tun_flags = (
121             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
122         )
123         self.dscp = 0
124         self.async_mode = False
125
126
127 def mk_scapy_crypt_key(p):
128     if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
129         return p.crypt_key + struct.pack("!I", p.salt)
130     else:
131         return p.crypt_key
132
133
134 def config_tun_params(p, encryption_type, tun_if):
135     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
136     esn_en = bool(
137         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
138     )
139     p.tun_dst = tun_if.remote_addr[p.addr_type]
140     p.tun_src = tun_if.local_addr[p.addr_type]
141     crypt_key = mk_scapy_crypt_key(p)
142     p.scapy_tun_sa = SecurityAssociation(
143         encryption_type,
144         spi=p.scapy_tun_spi,
145         crypt_algo=p.crypt_algo,
146         crypt_key=crypt_key,
147         auth_algo=p.auth_algo,
148         auth_key=p.auth_key,
149         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
150         nat_t_header=p.nat_header,
151         esn_en=esn_en,
152     )
153     p.vpp_tun_sa = SecurityAssociation(
154         encryption_type,
155         spi=p.vpp_tun_spi,
156         crypt_algo=p.crypt_algo,
157         crypt_key=crypt_key,
158         auth_algo=p.auth_algo,
159         auth_key=p.auth_key,
160         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
161         nat_t_header=p.nat_header,
162         esn_en=esn_en,
163     )
164
165
166 def config_tra_params(p, encryption_type):
167     esn_en = bool(
168         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
169     )
170     crypt_key = mk_scapy_crypt_key(p)
171     p.scapy_tra_sa = SecurityAssociation(
172         encryption_type,
173         spi=p.scapy_tra_spi,
174         crypt_algo=p.crypt_algo,
175         crypt_key=crypt_key,
176         auth_algo=p.auth_algo,
177         auth_key=p.auth_key,
178         nat_t_header=p.nat_header,
179         esn_en=esn_en,
180     )
181     p.vpp_tra_sa = SecurityAssociation(
182         encryption_type,
183         spi=p.vpp_tra_spi,
184         crypt_algo=p.crypt_algo,
185         crypt_key=crypt_key,
186         auth_algo=p.auth_algo,
187         auth_key=p.auth_key,
188         nat_t_header=p.nat_header,
189         esn_en=esn_en,
190     )
191
192
193 class TemplateIpsec(VppTestCase):
194     """
195     TRANSPORT MODE::
196
197          ------   encrypt   ---
198         |tra_if| <-------> |VPP|
199          ------   decrypt   ---
200
201     TUNNEL MODE::
202
203          ------   encrypt   ---   plain   ---
204         |tun_if| <-------  |VPP| <------ |pg1|
205          ------             ---           ---
206
207          ------   decrypt   ---   plain   ---
208         |tun_if| ------->  |VPP| ------> |pg1|
209          ------             ---           ---
210     """
211
212     tun_spd_id = 1
213     tra_spd_id = 2
214
215     def ipsec_select_backend(self):
216         """empty method to be overloaded when necessary"""
217         pass
218
219     @classmethod
220     def setUpClass(cls):
221         super(TemplateIpsec, cls).setUpClass()
222
223     @classmethod
224     def tearDownClass(cls):
225         super(TemplateIpsec, cls).tearDownClass()
226
227     def setup_params(self):
228         if not hasattr(self, "ipv4_params"):
229             self.ipv4_params = IPsecIPv4Params()
230         if not hasattr(self, "ipv6_params"):
231             self.ipv6_params = IPsecIPv6Params()
232         self.params = {
233             self.ipv4_params.addr_type: self.ipv4_params,
234             self.ipv6_params.addr_type: self.ipv6_params,
235         }
236
237     def config_interfaces(self):
238         self.create_pg_interfaces(range(3))
239         self.interfaces = list(self.pg_interfaces)
240         for i in self.interfaces:
241             i.admin_up()
242             i.config_ip4()
243             i.resolve_arp()
244             i.config_ip6()
245             i.resolve_ndp()
246
247     def setUp(self):
248         super(TemplateIpsec, self).setUp()
249
250         self.setup_params()
251
252         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
253         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
254
255         self.config_interfaces()
256
257         self.ipsec_select_backend()
258
259     def unconfig_interfaces(self):
260         for i in self.interfaces:
261             i.admin_down()
262             i.unconfig_ip4()
263             i.unconfig_ip6()
264
265     def tearDown(self):
266         super(TemplateIpsec, self).tearDown()
267
268         self.unconfig_interfaces()
269
270     def show_commands_at_teardown(self):
271         self.logger.info(self.vapi.cli("show hardware"))
272
273     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
274         return [
275             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
276             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
277             for i in range(count)
278         ]
279
280     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
281         return [
282             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
283             / sa.encrypt(
284                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
285                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
286             )
287             for i in range(count)
288         ]
289
290     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
291         return [
292             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
293             / IP(src=src, dst=dst)
294             / ICMP()
295             / Raw(b"X" * payload_size)
296             for i in range(count)
297         ]
298
299     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
300         return [
301             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
302             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
303             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
304             for i in range(count)
305         ]
306
307
308 class IpsecTcp(object):
309     def verify_tcp_checksum(self):
310         # start http cli server listener on http://0.0.0.0:80
311         self.vapi.cli("http cli server")
312         p = self.params[socket.AF_INET]
313         send = Ether(
314             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
315         ) / p.scapy_tun_sa.encrypt(
316             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
317             / TCP(flags="S", dport=80)
318         )
319         self.logger.debug(ppp("Sending packet:", send))
320         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
321         recv = recv[0]
322         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
323         self.assert_packet_checksums_valid(decrypted)
324
325
326 class IpsecTcpTests(IpsecTcp):
327     def test_tcp_checksum(self):
328         """verify checksum correctness for vpp generated packets"""
329         self.verify_tcp_checksum()
330
331
332 class IpsecTra4(object):
333     """verify methods for Transport v4"""
334
335     def get_replay_counts(self, p):
336         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
337         count = self.statistics.get_err_counter(replay_node_name)
338
339         if p.async_mode:
340             replay_post_node_name = (
341                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
342             )
343             count += self.statistics.get_err_counter(replay_post_node_name)
344
345         return count
346
347     def get_hash_failed_counts(self, p):
348         if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
349             hash_failed_node_name = (
350                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
351             )
352         else:
353             hash_failed_node_name = (
354                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
355             )
356         count = self.statistics.get_err_counter(hash_failed_node_name)
357
358         if p.async_mode:
359             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
360
361         return count
362
363     def verify_hi_seq_num(self):
364         p = self.params[socket.AF_INET]
365         saf = VppEnum.vl_api_ipsec_sad_flags_t
366         esn_on = p.vpp_tra_sa.esn_en
367         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
368
369         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
370         replay_count = self.get_replay_counts(p)
371         hash_failed_count = self.get_hash_failed_counts(p)
372         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
373
374         # a few packets so we get the rx seq number above the window size and
375         # thus can simulate a wrap with an out of window packet
376         pkts = [
377             (
378                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
379                 / p.scapy_tra_sa.encrypt(
380                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
381                     seq_num=seq,
382                 )
383             )
384             for seq in range(63, 80)
385         ]
386         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
387
388         # these 4 packets will all choose seq-num 0 to decrpyt since none
389         # are out of window when first checked. however, once #200 has
390         # decrypted it will move the window to 200 and has #81 is out of
391         # window. this packet should be dropped.
392         pkts = [
393             (
394                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
395                 / p.scapy_tra_sa.encrypt(
396                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
397                     seq_num=200,
398                 )
399             ),
400             (
401                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
402                 / p.scapy_tra_sa.encrypt(
403                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
404                     seq_num=81,
405                 )
406             ),
407             (
408                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
409                 / p.scapy_tra_sa.encrypt(
410                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
411                     seq_num=201,
412                 )
413             ),
414             (
415                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
416                 / p.scapy_tra_sa.encrypt(
417                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
418                     seq_num=202,
419                 )
420             ),
421         ]
422
423         # if anti-replay is off then we won't drop #81
424         n_rx = 3 if ar_on else 4
425         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
426         # this packet is one before the wrap
427         pkts = [
428             (
429                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
430                 / p.scapy_tra_sa.encrypt(
431                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
432                     seq_num=203,
433                 )
434             )
435         ]
436         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
437
438         # a replayed packet, then an out of window, then a legit
439         # tests that a early failure on the batch doesn't affect subsequent packets.
440         pkts = [
441             (
442                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
443                 / p.scapy_tra_sa.encrypt(
444                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
445                     seq_num=203,
446                 )
447             ),
448             (
449                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
450                 / p.scapy_tra_sa.encrypt(
451                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
452                     seq_num=81,
453                 )
454             ),
455             (
456                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
457                 / p.scapy_tra_sa.encrypt(
458                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
459                     seq_num=204,
460                 )
461             ),
462         ]
463         n_rx = 1 if ar_on else 3
464         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
465
466         # move the window over half way to a wrap
467         pkts = [
468             (
469                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
470                 / p.scapy_tra_sa.encrypt(
471                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
472                     seq_num=0x80000001,
473                 )
474             )
475         ]
476         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
477
478         # anti-replay will drop old packets, no anti-replay will not
479         pkts = [
480             (
481                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
482                 / p.scapy_tra_sa.encrypt(
483                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
484                     seq_num=0x44000001,
485                 )
486             )
487         ]
488
489         if ar_on:
490             self.send_and_assert_no_replies(self.tra_if, pkts)
491         else:
492             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
493
494         if esn_on:
495             #
496             # validate wrapping the ESN
497             #
498
499             # wrap scapy's TX SA SN
500             p.scapy_tra_sa.seq_num = 0x100000005
501
502             # send a packet that wraps the window for both AR and no AR
503             pkts = [
504                 (
505                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
506                     / p.scapy_tra_sa.encrypt(
507                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
508                         / ICMP(),
509                         seq_num=0x100000005,
510                     )
511                 )
512             ]
513
514             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
515             for rx in rxs:
516                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
517
518             # move the window forward to half way to the next wrap
519             pkts = [
520                 (
521                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
522                     / p.scapy_tra_sa.encrypt(
523                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
524                         / ICMP(),
525                         seq_num=0x180000005,
526                     )
527                 )
528             ]
529
530             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
531
532             # a packet less than 2^30 from the current position is:
533             #  - AR: out of window and dropped
534             #  - non-AR: accepted
535             pkts = [
536                 (
537                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
538                     / p.scapy_tra_sa.encrypt(
539                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
540                         / ICMP(),
541                         seq_num=0x170000005,
542                     )
543                 )
544             ]
545
546             if ar_on:
547                 self.send_and_assert_no_replies(self.tra_if, pkts)
548             else:
549                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
550
551             # a packet more than 2^30 from the current position is:
552             #  - AR: out of window and dropped
553             #  - non-AR: considered a wrap, but since it's not a wrap
554             #    it won't decrpyt and so will be dropped
555             pkts = [
556                 (
557                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
558                     / p.scapy_tra_sa.encrypt(
559                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
560                         / ICMP(),
561                         seq_num=0x130000005,
562                     )
563                 )
564             ]
565
566             self.send_and_assert_no_replies(self.tra_if, pkts)
567
568             # a packet less than 2^30 from the current position and is a
569             # wrap; (the seq is currently at 0x180000005).
570             #  - AR: out of window so considered a wrap, so accepted
571             #  - non-AR: not considered a wrap, so won't decrypt
572             p.scapy_tra_sa.seq_num = 0x260000005
573             pkts = [
574                 (
575                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
576                     / p.scapy_tra_sa.encrypt(
577                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
578                         / ICMP(),
579                         seq_num=0x260000005,
580                     )
581                 )
582             ]
583             if ar_on:
584                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
585             else:
586                 self.send_and_assert_no_replies(self.tra_if, pkts)
587
588             #
589             # window positions are different now for AR/non-AR
590             #  move non-AR forward
591             #
592             if not ar_on:
593                 # a packet more than 2^30 from the current position and is a
594                 # wrap; (the seq is currently at 0x180000005).
595                 #  - AR: accepted
596                 #  - non-AR: not considered a wrap, so won't decrypt
597
598                 pkts = [
599                     (
600                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
601                         / p.scapy_tra_sa.encrypt(
602                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
603                             / ICMP(),
604                             seq_num=0x200000005,
605                         )
606                     ),
607                     (
608                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
609                         / p.scapy_tra_sa.encrypt(
610                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
611                             / ICMP(),
612                             seq_num=0x200000006,
613                         )
614                     ),
615                 ]
616                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
617
618                 pkts = [
619                     (
620                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
621                         / p.scapy_tra_sa.encrypt(
622                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
623                             / ICMP(),
624                             seq_num=0x260000005,
625                         )
626                     )
627                 ]
628                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
629
630     def verify_tra_anti_replay(self):
631         p = self.params[socket.AF_INET]
632         esn_en = p.vpp_tra_sa.esn_en
633         anti_replay_window_size = p.anti_replay_window_size
634
635         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
636         replay_count = self.get_replay_counts(p)
637         initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
638         hash_failed_count = self.get_hash_failed_counts(p)
639         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
640         initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
641             "seq_cycled"
642         )
643         hash_err = "integ_error"
644
645         if ESP == self.encryption_type:
646             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
647             undersize_count = self.statistics.get_err_counter(undersize_node_name)
648             initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
649                 "runt"
650             )
651             # For AES-GCM an error in the hash is reported as a decryption failure
652             if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
653                 hash_err = "decryption_failed"
654         # In async mode, we don't report errors in the hash.
655         if p.async_mode:
656             hash_err = ""
657         else:
658             initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
659                 hash_err
660             )
661
662         #
663         # send packets with seq numbers 1->34
664         # this means the window size is still in Case B (see RFC4303
665         # Appendix A)
666         #
667         # for reasons i haven't investigated Scapy won't create a packet with
668         # seq_num=0
669         #
670         pkts = [
671             (
672                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
673                 / p.scapy_tra_sa.encrypt(
674                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
675                     seq_num=seq,
676                 )
677             )
678             for seq in range(1, 34)
679         ]
680         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
681
682         # replayed packets are dropped
683         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
684         replay_count += len(pkts)
685         self.assertEqual(self.get_replay_counts(p), replay_count)
686         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
687         self.assertEqual(err, replay_count)
688
689         #
690         # now send a batch of packets all with the same sequence number
691         # the first packet in the batch is legitimate, the rest bogus
692         #
693         self.vapi.cli("clear error")
694         self.vapi.cli("clear node counters")
695         pkts = Ether(
696             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
697         ) / p.scapy_tra_sa.encrypt(
698             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
699             seq_num=35,
700         )
701         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
702         replay_count += 7
703         self.assertEqual(self.get_replay_counts(p), replay_count)
704         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
705         self.assertEqual(err, replay_count)
706
707         #
708         # now move the window over to anti_replay_window_size + 100 and into Case A
709         #
710         self.vapi.cli("clear error")
711         pkt = Ether(
712             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
713         ) / p.scapy_tra_sa.encrypt(
714             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
715             seq_num=anti_replay_window_size + 100,
716         )
717         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
718
719         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
720
721         # replayed packets are dropped
722         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
723         replay_count += 3
724         self.assertEqual(self.get_replay_counts(p), replay_count)
725         err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
726         self.assertEqual(err, replay_count)
727
728         # the window size is anti_replay_window_size packets
729         # in window are still accepted
730         pkt = Ether(
731             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
732         ) / p.scapy_tra_sa.encrypt(
733             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
734             seq_num=200,
735         )
736
737         # a packet that does not decrypt does not move the window forward
738         bogus_sa = SecurityAssociation(
739             self.encryption_type,
740             p.scapy_tra_spi,
741             crypt_algo=p.crypt_algo,
742             crypt_key=mk_scapy_crypt_key(p)[::-1],
743             auth_algo=p.auth_algo,
744             auth_key=p.auth_key[::-1],
745         )
746         pkt = Ether(
747             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
748         ) / bogus_sa.encrypt(
749             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
750             seq_num=anti_replay_window_size + 200,
751         )
752         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
753
754         hash_failed_count += 17
755         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
756         if hash_err != "":
757             err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
758             self.assertEqual(err, hash_failed_count)
759
760         # a malformed 'runt' packet
761         #  created by a mis-constructed SA
762         if ESP == self.encryption_type and p.crypt_algo != "NULL":
763             bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
764             pkt = Ether(
765                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
766             ) / bogus_sa.encrypt(
767                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
768                 seq_num=anti_replay_window_size + 200,
769             )
770             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
771
772             undersize_count += 17
773             self.assert_error_counter_equal(undersize_node_name, undersize_count)
774             err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
775             self.assertEqual(err, undersize_count)
776
777         # which we can determine since this packet is still in the window
778         pkt = Ether(
779             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
780         ) / p.scapy_tra_sa.encrypt(
781             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
782             seq_num=234,
783         )
784         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
785
786         #
787         # out of window are dropped
788         #  this is Case B. So VPP will consider this to be a high seq num wrap
789         #  and so the decrypt attempt will fail
790         #
791         pkt = Ether(
792             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
793         ) / p.scapy_tra_sa.encrypt(
794             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
795             seq_num=17,
796         )
797         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
798
799         if esn_en:
800             # an out of window error with ESN looks like a high sequence
801             # wrap. but since it isn't then the verify will fail.
802             hash_failed_count += 17
803             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
804             if hash_err != "":
805                 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
806                 self.assertEqual(err, hash_failed_count)
807
808         else:
809             replay_count += 17
810             self.assertEqual(self.get_replay_counts(p), replay_count)
811             err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
812             self.assertEqual(err, replay_count)
813
814         # valid packet moves the window over to anti_replay_window_size + 258
815         pkt = Ether(
816             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
817         ) / p.scapy_tra_sa.encrypt(
818             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
819             seq_num=anti_replay_window_size + 258,
820         )
821         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
822         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
823
824         #
825         # move VPP's SA TX seq-num to just before the seq-number wrap.
826         # then fire in a packet that VPP should drop on TX because it
827         # causes the TX seq number to wrap; unless we're using extened sequence
828         # numbers.
829         #
830         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
831         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
832         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
833
834         pkts = [
835             (
836                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
837                 / p.scapy_tra_sa.encrypt(
838                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
839                     seq_num=seq,
840                 )
841             )
842             for seq in range(259, 280)
843         ]
844
845         if esn_en:
846             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
847
848             #
849             # in order for scapy to decrypt its SA's high order number needs
850             # to wrap
851             #
852             p.vpp_tra_sa.seq_num = 0x100000000
853             for rx in rxs:
854                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
855
856             #
857             # wrap scapy's TX high sequence number. VPP is in case B, so it
858             # will consider this a high seq wrap also.
859             # The low seq num we set it to will place VPP's RX window in Case A
860             #
861             p.scapy_tra_sa.seq_num = 0x100000005
862             pkt = Ether(
863                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
864             ) / p.scapy_tra_sa.encrypt(
865                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
866                 seq_num=0x100000005,
867             )
868             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
869
870             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
871
872             #
873             # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
874             # the window
875             #
876             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
877             pkt = Ether(
878                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
879             ) / p.scapy_tra_sa.encrypt(
880                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
881                 seq_num=0xFFFFFFFD,
882             )
883             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
884             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
885
886             #
887             # While in case A we cannot wrap the high sequence number again
888             # because VPP will consider this packet to be one that moves the
889             # window forward
890             #
891             pkt = Ether(
892                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
893             ) / p.scapy_tra_sa.encrypt(
894                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
895                 seq_num=0x200000999,
896             )
897             self.send_and_assert_no_replies(
898                 self.tra_if, [pkt], self.tra_if, timeout=0.2
899             )
900
901             hash_failed_count += 1
902             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
903             if hash_err != "":
904                 err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
905                 self.assertEqual(err, hash_failed_count)
906
907             #
908             # but if we move the window forward to case B, then we can wrap
909             # again
910             #
911             p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
912             pkt = Ether(
913                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
914             ) / p.scapy_tra_sa.encrypt(
915                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
916                 seq_num=p.scapy_tra_sa.seq_num,
917             )
918             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
919             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
920
921             p.scapy_tra_sa.seq_num = 0x200000444
922             pkt = Ether(
923                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
924             ) / p.scapy_tra_sa.encrypt(
925                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
926                 seq_num=0x200000444,
927             )
928             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
929             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
930
931         else:
932             #
933             # without ESN TX sequence numbers can't wrap and packets are
934             # dropped from here on out.
935             #
936             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
937             seq_cycle_count += len(pkts)
938             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
939             err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
940             self.assertEqual(err, seq_cycle_count)
941
942         # move the security-associations seq number on to the last we used
943         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
944         p.scapy_tra_sa.seq_num = 351
945         p.vpp_tra_sa.seq_num = 351
946
947     def verify_tra_lost(self):
948         p = self.params[socket.AF_INET]
949         esn_en = p.vpp_tra_sa.esn_en
950
951         #
952         # send packets with seq numbers 1->34
953         # this means the window size is still in Case B (see RFC4303
954         # Appendix A)
955         #
956         # for reasons i haven't investigated Scapy won't create a packet with
957         # seq_num=0
958         #
959         pkts = [
960             (
961                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
962                 / p.scapy_tra_sa.encrypt(
963                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
964                     seq_num=seq,
965                 )
966             )
967             for seq in range(1, 3)
968         ]
969         self.send_and_expect(self.tra_if, pkts, self.tra_if)
970
971         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
972
973         # skip a sequence number
974         pkts = [
975             (
976                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
977                 / p.scapy_tra_sa.encrypt(
978                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
979                     seq_num=seq,
980                 )
981             )
982             for seq in range(4, 6)
983         ]
984         self.send_and_expect(self.tra_if, pkts, self.tra_if)
985
986         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
987
988         # the lost packet are counted untill we get up past the first
989         # sizeof(replay_window) packets
990         pkts = [
991             (
992                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
993                 / p.scapy_tra_sa.encrypt(
994                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
995                     seq_num=seq,
996                 )
997             )
998             for seq in range(6, 100)
999         ]
1000         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1001
1002         self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
1003
1004         # lost of holes in the sequence
1005         pkts = [
1006             (
1007                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1008                 / p.scapy_tra_sa.encrypt(
1009                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1010                     seq_num=seq,
1011                 )
1012             )
1013             for seq in range(100, 200, 2)
1014         ]
1015         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
1016
1017         pkts = [
1018             (
1019                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1020                 / p.scapy_tra_sa.encrypt(
1021                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1022                     seq_num=seq,
1023                 )
1024             )
1025             for seq in range(200, 300)
1026         ]
1027         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1028
1029         self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
1030
1031         # a big hole in the seq number space
1032         pkts = [
1033             (
1034                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1035                 / p.scapy_tra_sa.encrypt(
1036                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1037                     seq_num=seq,
1038                 )
1039             )
1040             for seq in range(400, 500)
1041         ]
1042         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1043
1044         self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
1045
1046     def verify_tra_basic4(self, count=1, payload_size=54):
1047         """ipsec v4 transport basic test"""
1048         self.vapi.cli("clear errors")
1049         self.vapi.cli("clear ipsec sa")
1050         try:
1051             p = self.params[socket.AF_INET]
1052             send_pkts = self.gen_encrypt_pkts(
1053                 p,
1054                 p.scapy_tra_sa,
1055                 self.tra_if,
1056                 src=self.tra_if.remote_ip4,
1057                 dst=self.tra_if.local_ip4,
1058                 count=count,
1059                 payload_size=payload_size,
1060             )
1061             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1062             for rx in recv_pkts:
1063                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1064                 self.assert_packet_checksums_valid(rx)
1065                 try:
1066                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1067                     self.assert_packet_checksums_valid(decrypted)
1068                 except:
1069                     self.logger.debug(ppp("Unexpected packet:", rx))
1070                     raise
1071         finally:
1072             self.logger.info(self.vapi.ppcli("show error"))
1073             self.logger.info(self.vapi.ppcli("show ipsec all"))
1074
1075         pkts = p.tra_sa_in.get_stats()["packets"]
1076         self.assertEqual(
1077             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1078         )
1079         pkts = p.tra_sa_out.get_stats()["packets"]
1080         self.assertEqual(
1081             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1082         )
1083         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
1084         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
1085
1086         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1087         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1088
1089     def _verify_tra_anti_replay_algorithm_esn(self):
1090         def seq_num(seqh, seql):
1091             return (seqh << 32) | (seql & 0xFFFF_FFFF)
1092
1093         p = self.params[socket.AF_INET]
1094         anti_replay_window_size = p.anti_replay_window_size
1095
1096         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1097         replay_count = self.get_replay_counts(p)
1098         hash_failed_count = self.get_hash_failed_counts(p)
1099         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1100
1101         if ESP == self.encryption_type:
1102             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1103             undersize_count = self.statistics.get_err_counter(undersize_node_name)
1104
1105         # reset the TX SA to avoid conflict with left configuration
1106         self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1107
1108         """
1109         RFC 4303 Appendix A2. Case A
1110
1111         |: new Th marker
1112         a-i: possible seq num received
1113         +: Bl, Tl, Bl', Tl'
1114         [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
1115
1116                 Th - 1               Th                Th + 1
1117         --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
1118                 =========          =========          =========
1119                 Bl-     Tl-        Bl      Tl         Bl+     Tl+
1120
1121         Case A implies Tl >= W - 1
1122         """
1123
1124         Th = 1
1125         Tl = anti_replay_window_size + 40
1126         Bl = Tl - anti_replay_window_size + 1
1127
1128         # move VPP's RX AR window to Case A
1129         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1130         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1131
1132         """
1133         case a: Seql < Bl
1134             - pre-crypto check: algorithm predicts that the packet wrap the window
1135                 -> Seqh = Th + 1
1136             - integrity check: should fail
1137             - post-crypto check: ...
1138         """
1139         pkts = [
1140             (
1141                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1142                 / p.scapy_tra_sa.encrypt(
1143                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1144                     seq_num=seq,
1145                 )
1146             )
1147             for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
1148         ]
1149
1150         # out-of-window packets fail integrity check
1151         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1152         hash_failed_count += len(pkts)
1153         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1154
1155         """
1156             case b: Bl <= Seql <= Tl
1157                 - pre-crypto check: algorithm predicts that the packet is in the window
1158                     -> Seqh = Th
1159                     -> check for a replayed packet with Seql
1160                 - integrity check: should fail
1161                 - post-crypto check: ...
1162         """
1163         pkts = [
1164             (
1165                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1166                 / p.scapy_tra_sa.encrypt(
1167                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1168                     seq_num=seq,
1169                 )
1170             )
1171             for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
1172         ]
1173         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1174
1175         p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1176         pkts = [
1177             (
1178                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1179                 / p.scapy_tra_sa.encrypt(
1180                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1181                     seq_num=seq,
1182                 )
1183             )
1184             for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
1185         ]
1186
1187         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1188
1189         # some packets are rejected by the pre-crypto check
1190         replay_count += 5
1191         self.assertEqual(self.get_replay_counts(p), replay_count)
1192
1193         # out-of-window packets fail integrity check
1194         hash_failed_count += len(pkts) - 5
1195         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1196
1197         """
1198             case c: Seql > Tl
1199                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1200                     -> Seqh = Th
1201                 - integrity check: should fail
1202                 - post-crypto check: ...
1203         """
1204         pkts = [
1205             (
1206                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1207                 / p.scapy_tra_sa.encrypt(
1208                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1209                     seq_num=seq,
1210                 )
1211             )
1212             for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
1213         ]
1214
1215         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1216
1217         # out-of-window packets fail integrity check
1218         hash_failed_count += len(pkts)
1219         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1220
1221         """
1222             case d: Seql < Bl
1223                 - pre-crypto check: algorithm predicts that the packet wrap the window
1224                     -> Seqh = Th + 1
1225                 - integrity check: should fail
1226                 - post-crypto check: ...
1227         """
1228         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1229         pkts = [
1230             (
1231                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1232                 / p.scapy_tra_sa.encrypt(
1233                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1234                     seq_num=seq,
1235                 )
1236             )
1237             for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
1238         ]
1239
1240         # out-of-window packets fail integrity check
1241         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1242         hash_failed_count += len(pkts)
1243         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1244
1245         """
1246             case e: Bl <= Seql <= Tl
1247                 - pre-crypto check: algorithm predicts that the packet is in the window
1248                     -> Seqh = Th
1249                     -> check for a replayed packet with Seql
1250                 - integrity check: should pass
1251                 - post-crypto check: should pass
1252                     -> Seql is marked in the AR window
1253         """
1254         pkts = [
1255             (
1256                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1257                 / p.scapy_tra_sa.encrypt(
1258                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1259                     seq_num=seq,
1260                 )
1261             )
1262             for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
1263         ]
1264
1265         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1266
1267         """
1268             case f: Seql > Tl
1269                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1270                     -> Seqh = Th
1271                 - integrity check: should pass
1272                 - post-crypto check: should pass
1273                     -> AR window shift (the window stays Case A)
1274                     -> Seql is marked in the AR window
1275         """
1276         pkts = [
1277             (
1278                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1279                 / p.scapy_tra_sa.encrypt(
1280                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1281                     seq_num=seq,
1282                 )
1283             )
1284             for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
1285         ]
1286
1287         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1288
1289         """
1290             case g: Seql < Bl
1291                 - pre-crypto check: algorithm predicts that the packet wrap the window
1292                     -> Seqh = Th + 1
1293                 - integrity check: should pass
1294                 - post-crypto check: should pass
1295                     -> AR window shift (may set the window in Case B)
1296                     -> Seql is marked in the AR window
1297         """
1298         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1299         pkts = [
1300             (
1301                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1302                 / p.scapy_tra_sa.encrypt(
1303                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1304                     seq_num=seq,
1305                 )
1306             )
1307             # set the window in Case B (the minimum window size is 64
1308             # so we are sure to overlap)
1309             for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
1310         ]
1311
1312         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1313
1314         # reset the VPP's RX AR window to Case A
1315         Th = 1
1316         Tl = 2 * anti_replay_window_size + 40
1317         Bl = Tl - anti_replay_window_size + 1
1318
1319         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1320
1321         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1322         pkts = [
1323             (
1324                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1325                 / p.scapy_tra_sa.encrypt(
1326                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1327                     seq_num=seq,
1328                 )
1329             )
1330             # the AR will stay in Case A
1331             for seq in range(
1332                 seq_num(Th + 1, anti_replay_window_size + 10),
1333                 seq_num(Th + 1, anti_replay_window_size + 20),
1334             )
1335         ]
1336
1337         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1338
1339         """
1340             case h: Bl <= Seql <= Tl
1341                 - pre-crypto check: algorithm predicts that the packet is in the window
1342                     -> Seqh = Th
1343                     -> check for a replayed packet with Seql
1344                 - integrity check: the wrap is not detected, should fail
1345                 - post-crypto check: ...
1346         """
1347         Th += 1
1348         Tl = anti_replay_window_size + 20
1349         Bl = Tl - anti_replay_window_size + 1
1350
1351         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1352
1353         pkts = [
1354             (
1355                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1356                 / p.scapy_tra_sa.encrypt(
1357                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1358                     seq_num=seq,
1359                 )
1360             )
1361             for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
1362         ]
1363
1364         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1365
1366         # some packets are rejected by the pre-crypto check
1367         replay_count += 5
1368         self.assertEqual(self.get_replay_counts(p), replay_count)
1369
1370         # out-of-window packets fail integrity check
1371         hash_failed_count += len(pkts) - 5
1372         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1373
1374         """
1375             case i: Seql > Tl
1376                 - pre-crypto check: algorithm predicts that the packet does not wrap the window
1377                     -> Seqh = Th
1378                 - integrity check: the wrap is not detected, shoud fail
1379                 - post-crypto check: ...
1380         """
1381         pkts = [
1382             (
1383                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1384                 / p.scapy_tra_sa.encrypt(
1385                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1386                     seq_num=seq,
1387                 )
1388             )
1389             for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
1390         ]
1391
1392         # out-of-window packets fail integrity check
1393         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1394         hash_failed_count += len(pkts)
1395         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1396
1397         """
1398             RFC 4303 Appendix A2. Case B
1399
1400                         Th - 1               Th                Th + 1
1401             ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
1402             =========          ===========          ===========
1403                     Tl-        Bl       Tl          Bl+       Tl+
1404
1405             Case B implies Tl < W - 1
1406         """
1407
1408         # reset the VPP's RX AR window to Case B
1409         Th = 2
1410         Tl = 30  # minimum window size of 64, we are sure to overlap
1411         Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1412
1413         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1414         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1415
1416         """
1417             case a: Seql <= Tl < Bl
1418                 - pre-crypto check: algorithm predicts that the packet is in the window
1419                     -> Seqh = Th
1420                     -> check for replayed packet
1421                 - integrity check: should fail
1422                 - post-crypto check: ...
1423         """
1424         pkts = [
1425             (
1426                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1427                 / p.scapy_tra_sa.encrypt(
1428                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1429                     seq_num=seq,
1430                 )
1431             )
1432             for seq in range(seq_num(Th, 5), seq_num(Th, 10))
1433         ]
1434
1435         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1436
1437         p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
1438         pkts = [
1439             (
1440                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1441                 / p.scapy_tra_sa.encrypt(
1442                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1443                     seq_num=seq,
1444                 )
1445             )
1446             for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
1447         ]
1448
1449         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1450
1451         # some packets are rejected by the pre-crypto check
1452         replay_count += 5
1453         self.assertEqual(self.get_replay_counts(p), replay_count)
1454
1455         # out-of-window packets fail integrity check
1456         hash_failed_count += len(pkts) - 5
1457         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1458
1459         """
1460             case b: Tl < Seql < Bl
1461                 - pre-crypto check: algorithm predicts that the packet will shift the window
1462                     -> Seqh = Th
1463                 - integrity check: should fail
1464                 - post-crypto check: ...
1465         """
1466         pkts = [
1467             (
1468                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1469                 / p.scapy_tra_sa.encrypt(
1470                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1471                     seq_num=seq,
1472                 )
1473             )
1474             for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
1475         ]
1476
1477         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1478
1479         # out-of-window packets fail integrity check
1480         hash_failed_count += len(pkts)
1481         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1482
1483         """
1484             case c: Tl < Bl <= Seql
1485                 - pre-crypto check: algorithm predicts that the packet is in the window
1486                     -> Seqh = Th - 1
1487                     -> check for a replayed packet with Seql
1488                 - integrity check: should pass
1489                 - post-crypto check: should pass
1490                     -> Seql is marked in the AR window
1491         """
1492         pkts = [
1493             (
1494                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1495                 / p.scapy_tra_sa.encrypt(
1496                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1497                     seq_num=seq,
1498                 )
1499             )
1500             for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
1501         ]
1502
1503         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1504
1505         """
1506             case d: Seql <= Tl < Bl
1507                 - pre-crypto check: algorithm predicts that the packet is the window
1508                     -> Seqh = Th
1509                     -> check for replayed packet
1510                 - integrity check: should pass
1511                 - post-crypto check: should pass
1512                     -> Seql is marked in the AR window
1513         """
1514         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1515         pkts = [
1516             (
1517                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1518                 / p.scapy_tra_sa.encrypt(
1519                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1520                     seq_num=seq,
1521                 )
1522             )
1523             for seq in range(seq_num(Th, 15), seq_num(Th, 25))
1524         ]
1525
1526         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1527
1528         """
1529             case e: Tl < Seql < Bl
1530                 - pre-crypto check: algorithm predicts that the packet is in the window
1531                     -> Seqh = Th
1532                     -> check for a replayed packet with Seql
1533                 - integrity check: should pass
1534                 - post-crypto check: should pass
1535                     -> AR window shift (may set the window in Case A)
1536                     -> Seql is marked in the AR window
1537         """
1538         pkts = [
1539             (
1540                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1541                 / p.scapy_tra_sa.encrypt(
1542                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1543                     seq_num=seq,
1544                 )
1545             )
1546             for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
1547         ]
1548
1549         # the window stays in Case B
1550         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1551
1552         pkts = [
1553             (
1554                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1555                 / p.scapy_tra_sa.encrypt(
1556                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1557                     seq_num=seq,
1558                 )
1559             )
1560             for seq in range(
1561                 seq_num(Th, Tl + anti_replay_window_size + 5),
1562                 seq_num(Th, Tl + anti_replay_window_size + 15),
1563             )
1564         ]
1565
1566         # the window moves to Case A
1567         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1568
1569         # reset the VPP's RX AR window to Case B
1570         Th = 2
1571         Tl = 30  # minimum window size of 64, we are sure to overlap
1572         Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
1573
1574         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
1575         p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
1576
1577         """
1578             case f: Tl < Bl <= Seql
1579                 - pre-crypto check: algorithm predicts that the packet is in the previous window
1580                     -> Seqh = Th - 1
1581                     -> check for a replayed packet with Seql
1582                 - integrity check: should fail
1583                 - post-crypto check: ...
1584         """
1585         pkts = [
1586             (
1587                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1588                 / p.scapy_tra_sa.encrypt(
1589                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1590                     seq_num=seq,
1591                 )
1592             )
1593             for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
1594         ]
1595
1596         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1597
1598         # out-of-window packets fail integrity check
1599         hash_failed_count += len(pkts)
1600         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1601
1602         """
1603             case g: Seql <= Tl < Bl
1604                 - pre-crypto check: algorithm predicts that the packet is the window
1605                     -> Seqh = Th
1606                     -> check for replayed packet
1607                 - integrity check: should fail
1608                 - post-crypto check: ...
1609         """
1610         pkts = [
1611             (
1612                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1613                 / p.scapy_tra_sa.encrypt(
1614                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1615                     seq_num=seq,
1616                 )
1617             )
1618             for seq in range(seq_num(Th, 10), seq_num(Th, 15))
1619         ]
1620
1621         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1622
1623         p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
1624         pkts = [
1625             (
1626                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1627                 / p.scapy_tra_sa.encrypt(
1628                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1629                     seq_num=seq,
1630                 )
1631             )
1632             for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
1633         ]
1634
1635         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1636
1637         # some packets are rejected by the pre-crypto check
1638         replay_count += 5
1639         self.assertEqual(self.get_replay_counts(p), replay_count)
1640
1641         # out-of-window packets fail integrity check
1642         hash_failed_count += len(pkts) - 5
1643         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1644
1645         """
1646             case h: Tl < Seql < Bl
1647                 - pre-crypto check: algorithm predicts that the packet will shift the window
1648                     -> Seqh = Th
1649                 - integrity check: should fail
1650                 - post-crypto check: ...
1651         """
1652         pkts = [
1653             (
1654                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1655                 / p.scapy_tra_sa.encrypt(
1656                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1657                     seq_num=seq,
1658                 )
1659             )
1660             for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
1661         ]
1662
1663         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1664
1665         # out-of-window packets fail integrity check
1666         hash_failed_count += len(pkts)
1667         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
1668
1669     def _verify_tra_anti_replay_algorithm_no_esn(self):
1670         def seq_num(seql):
1671             return seql & 0xFFFF_FFFF
1672
1673         p = self.params[socket.AF_INET]
1674         anti_replay_window_size = p.anti_replay_window_size
1675
1676         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
1677         replay_count = self.get_replay_counts(p)
1678         hash_failed_count = self.get_hash_failed_counts(p)
1679         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
1680
1681         if ESP == self.encryption_type:
1682             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
1683             undersize_count = self.statistics.get_err_counter(undersize_node_name)
1684
1685         # reset the TX SA to avoid conflict with left configuration
1686         self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
1687
1688         """
1689         RFC 4303 Appendix A2. Case A
1690
1691         a-c: possible seq num received
1692         +: Bl, Tl
1693
1694         |--a--+---b---+-c--|
1695               =========
1696               Bl      Tl
1697
1698         No ESN implies Th = 0
1699         Case A implies Tl >= W - 1
1700         """
1701
1702         Tl = anti_replay_window_size + 40
1703         Bl = Tl - anti_replay_window_size + 1
1704
1705         # move VPP's RX AR window to Case A
1706         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1707         p.scapy_tra_sa.seq_num = seq_num(Tl)
1708
1709         """
1710         case a: Seql < Bl
1711             - pre-crypto check: algorithm predicts that the packet is out of window
1712                 -> packet should be dropped
1713             - integrity check: ...
1714             - post-crypto check: ...
1715         """
1716         pkts = [
1717             (
1718                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1719                 / p.scapy_tra_sa.encrypt(
1720                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1721                     seq_num=seq,
1722                 )
1723             )
1724             for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
1725         ]
1726
1727         # out-of-window packets
1728         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1729         replay_count += len(pkts)
1730         self.assertEqual(self.get_replay_counts(p), replay_count)
1731
1732         """
1733             case b: Bl <= Seql <= Tl
1734                 - pre-crypto check: algorithm predicts that the packet is in the window
1735                     -> check for a replayed packet with Seql
1736                 - integrity check: should pass
1737                 - post-crypto check:
1738                     -> check for a replayed packet with Seql
1739         """
1740         pkts = [
1741             (
1742                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1743                 / p.scapy_tra_sa.encrypt(
1744                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1745                     seq_num=seq,
1746                 )
1747             )
1748             for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
1749         ]
1750         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1751
1752         pkts = [
1753             (
1754                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1755                 / p.scapy_tra_sa.encrypt(
1756                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1757                     seq_num=seq,
1758                 )
1759             )
1760             for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
1761         ]
1762
1763         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
1764
1765         # replayed packets
1766         replay_count += 5
1767         self.assertEqual(self.get_replay_counts(p), replay_count)
1768
1769         """
1770             case c: Seql > Tl
1771                 - pre-crypto check: algorithm predicts that the packet will shift the window
1772                 - integrity check: should pass
1773                 - post-crypto check: should pass
1774                     -> AR window is shifted
1775         """
1776         pkts = [
1777             (
1778                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1779                 / p.scapy_tra_sa.encrypt(
1780                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1781                     seq_num=seq,
1782                 )
1783             )
1784             for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
1785         ]
1786
1787         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1788
1789         """
1790             RFC 4303 Appendix A2. Case B
1791
1792             |-a-----+------b-----|
1793             =========
1794                     Tl
1795
1796             Case B implies Tl < W - 1
1797         """
1798
1799         # reset the VPP's RX AR window to Case B
1800         Tl = 30  # minimum window size of 64, we are sure to overlap
1801         Bl = seq_num(Tl - anti_replay_window_size + 1)
1802
1803         self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
1804
1805         """
1806         case a: Seql <= Tl < Bl
1807             - pre-crypto check: algorithm predicts that the packet is in the window
1808                 -> check for replayed packet
1809             - integrity check: should fail
1810             - post-crypto check: ...
1811         """
1812         pkts = [
1813             (
1814                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1815                 / p.scapy_tra_sa.encrypt(
1816                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1817                     seq_num=seq,
1818                 )
1819             )
1820             for seq in range(seq_num(5), seq_num(10))
1821         ]
1822
1823         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1824
1825         """
1826             case b: Tl < Seql < Bl
1827                 - pre-crypto check: algorithm predicts that the packet will shift the window
1828                 - integrity check: should pass
1829                 - post-crypto check: should pass
1830                     -> AR window is shifted
1831         """
1832         pkts = [
1833             (
1834                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1835                 / p.scapy_tra_sa.encrypt(
1836                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1837                     seq_num=seq,
1838                 )
1839             )
1840             for seq in range(seq_num(-50), seq_num(-20))
1841         ]
1842
1843         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1844
1845     def verify_tra_anti_replay_algorithm(self):
1846         if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
1847             self._verify_tra_anti_replay_algorithm_esn()
1848         else:
1849             self._verify_tra_anti_replay_algorithm_no_esn()
1850
1851
1852 class IpsecTra4Tests(IpsecTra4):
1853     """UT test methods for Transport v4"""
1854
1855     def test_tra_anti_replay(self):
1856         """ipsec v4 transport anti-replay test"""
1857         self.verify_tra_anti_replay()
1858
1859     def test_tra_anti_replay_algorithm(self):
1860         """ipsec v4 transport anti-replay algorithm test"""
1861         self.verify_tra_anti_replay_algorithm()
1862
1863     def test_tra_lost(self):
1864         """ipsec v4 transport lost packet test"""
1865         self.verify_tra_lost()
1866
1867     def test_tra_basic(self, count=1):
1868         """ipsec v4 transport basic test"""
1869         self.verify_tra_basic4(count=1)
1870
1871     def test_tra_burst(self):
1872         """ipsec v4 transport burst test"""
1873         self.verify_tra_basic4(count=257)
1874
1875
1876 class IpsecTra6(object):
1877     """verify methods for Transport v6"""
1878
1879     def verify_tra_basic6(self, count=1, payload_size=54):
1880         self.vapi.cli("clear errors")
1881         self.vapi.cli("clear ipsec sa")
1882         try:
1883             p = self.params[socket.AF_INET6]
1884             send_pkts = self.gen_encrypt_pkts6(
1885                 p,
1886                 p.scapy_tra_sa,
1887                 self.tra_if,
1888                 src=self.tra_if.remote_ip6,
1889                 dst=self.tra_if.local_ip6,
1890                 count=count,
1891                 payload_size=payload_size,
1892             )
1893             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1894             for rx in recv_pkts:
1895                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1896                 try:
1897                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1898                     self.assert_packet_checksums_valid(decrypted)
1899                 except:
1900                     self.logger.debug(ppp("Unexpected packet:", rx))
1901                     raise
1902         finally:
1903             self.logger.info(self.vapi.ppcli("show error"))
1904             self.logger.info(self.vapi.ppcli("show ipsec all"))
1905
1906         pkts = p.tra_sa_in.get_stats()["packets"]
1907         self.assertEqual(
1908             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1909         )
1910         pkts = p.tra_sa_out.get_stats()["packets"]
1911         self.assertEqual(
1912             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1913         )
1914         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1915         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1916
1917     def gen_encrypt_pkts_ext_hdrs6(
1918         self, sa, sw_intf, src, dst, count=1, payload_size=54
1919     ):
1920         return [
1921             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1922             / sa.encrypt(
1923                 IPv6(src=src, dst=dst)
1924                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1925             )
1926             for i in range(count)
1927         ]
1928
1929     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1930         return [
1931             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1932             / IPv6(src=src, dst=dst)
1933             / IPv6ExtHdrHopByHop()
1934             / IPv6ExtHdrFragment(id=2, offset=200)
1935             / Raw(b"\xff" * 200)
1936             for i in range(count)
1937         ]
1938
1939     def verify_tra_encrypted6(self, p, sa, rxs):
1940         decrypted = []
1941         for rx in rxs:
1942             self.assert_packet_checksums_valid(rx)
1943             try:
1944                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1945                 decrypted.append(decrypt_pkt)
1946                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1947                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1948             except:
1949                 self.logger.debug(ppp("Unexpected packet:", rx))
1950                 try:
1951                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1952                 except:
1953                     pass
1954                 raise
1955         return decrypted
1956
1957     def verify_tra_66_ext_hdrs(self, p):
1958         count = 63
1959
1960         #
1961         # check we can decrypt with options
1962         #
1963         tx = self.gen_encrypt_pkts_ext_hdrs6(
1964             p.scapy_tra_sa,
1965             self.tra_if,
1966             src=self.tra_if.remote_ip6,
1967             dst=self.tra_if.local_ip6,
1968             count=count,
1969         )
1970         self.send_and_expect(self.tra_if, tx, self.tra_if)
1971
1972         #
1973         # injecting a packet from ourselves to be routed of box is a hack
1974         # but it matches an outbout policy, alors je ne regrette rien
1975         #
1976
1977         # one extension before ESP
1978         tx = (
1979             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1980             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1981             / IPv6ExtHdrFragment(id=2, offset=200)
1982             / Raw(b"\xff" * 200)
1983         )
1984
1985         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1986         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1987
1988         for dc in dcs:
1989             # for reasons i'm not going to investigate scapy does not
1990             # created the correct headers after decrypt. but reparsing
1991             # the ipv6 packet fixes it
1992             dc = IPv6(raw(dc[IPv6]))
1993             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1994
1995         # two extensions before ESP
1996         tx = (
1997             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1998             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1999             / IPv6ExtHdrHopByHop()
2000             / IPv6ExtHdrFragment(id=2, offset=200)
2001             / Raw(b"\xff" * 200)
2002         )
2003
2004         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2005         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2006
2007         for dc in dcs:
2008             dc = IPv6(raw(dc[IPv6]))
2009             self.assertTrue(dc[IPv6ExtHdrHopByHop])
2010             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2011
2012         # two extensions before ESP, one after
2013         tx = (
2014             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
2015             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
2016             / IPv6ExtHdrHopByHop()
2017             / IPv6ExtHdrFragment(id=2, offset=200)
2018             / IPv6ExtHdrDestOpt()
2019             / Raw(b"\xff" * 200)
2020         )
2021
2022         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
2023         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
2024
2025         for dc in dcs:
2026             dc = IPv6(raw(dc[IPv6]))
2027             self.assertTrue(dc[IPv6ExtHdrDestOpt])
2028             self.assertTrue(dc[IPv6ExtHdrHopByHop])
2029             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
2030
2031
2032 class IpsecTra6Tests(IpsecTra6):
2033     """UT test methods for Transport v6"""
2034
2035     def test_tra_basic6(self):
2036         """ipsec v6 transport basic test"""
2037         self.verify_tra_basic6(count=1)
2038
2039     def test_tra_burst6(self):
2040         """ipsec v6 transport burst test"""
2041         self.verify_tra_basic6(count=257)
2042
2043
2044 class IpsecTra6ExtTests(IpsecTra6):
2045     def test_tra_ext_hdrs_66(self):
2046         """ipsec 6o6 tra extension headers test"""
2047         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
2048
2049
2050 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
2051     """UT test methods for Transport v6 and v4"""
2052
2053     pass
2054
2055
2056 class IpsecTun4(object):
2057     """verify methods for Tunnel v4"""
2058
2059     def verify_counters4(self, p, count, n_frags=None, worker=None):
2060         if not n_frags:
2061             n_frags = count
2062         if hasattr(p, "spd_policy_in_any"):
2063             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
2064             self.assertEqual(
2065                 pkts,
2066                 count,
2067                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
2068             )
2069
2070         if hasattr(p, "tun_sa_in"):
2071             pkts = p.tun_sa_in.get_stats(worker)["packets"]
2072             self.assertEqual(
2073                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2074             )
2075             pkts = p.tun_sa_out.get_stats(worker)["packets"]
2076             self.assertEqual(
2077                 pkts,
2078                 n_frags,
2079                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2080             )
2081
2082         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
2083         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
2084
2085     def verify_decrypted(self, p, rxs):
2086         for rx in rxs:
2087             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2088             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2089             self.assert_packet_checksums_valid(rx)
2090
2091     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
2092         align = sa.crypt_algo.block_size
2093         if align < 4:
2094             align = 4
2095         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
2096         exp_len += sa.crypt_algo.iv_size
2097         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
2098         self.assertEqual(exp_len, len(esp_payload))
2099
2100     def verify_encrypted(self, p, sa, rxs):
2101         decrypt_pkts = []
2102         for rx in rxs:
2103             if p.nat_header:
2104                 self.assertEqual(rx[UDP].dport, p.nat_header.dport)
2105             self.assert_packet_checksums_valid(rx)
2106             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
2107             try:
2108                 rx_ip = rx[IP]
2109                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
2110                 if not decrypt_pkt.haslayer(IP):
2111                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
2112                 if rx_ip.proto == socket.IPPROTO_ESP:
2113                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
2114                 decrypt_pkts.append(decrypt_pkt)
2115                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2116                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2117             except:
2118                 self.logger.debug(ppp("Unexpected packet:", rx))
2119                 try:
2120                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2121                 except:
2122                     pass
2123                 raise
2124         pkts = reassemble4(decrypt_pkts)
2125         for pkt in pkts:
2126             self.assert_packet_checksums_valid(pkt)
2127
2128     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
2129         self.vapi.cli("clear errors")
2130         self.vapi.cli("clear ipsec counters")
2131         self.vapi.cli("clear ipsec sa")
2132         if not n_rx:
2133             n_rx = count
2134         try:
2135             send_pkts = self.gen_encrypt_pkts(
2136                 p,
2137                 p.scapy_tun_sa,
2138                 self.tun_if,
2139                 src=p.remote_tun_if_host,
2140                 dst=self.pg1.remote_ip4,
2141                 count=count,
2142                 payload_size=payload_size,
2143             )
2144             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2145             self.verify_decrypted(p, recv_pkts)
2146
2147             send_pkts = self.gen_pkts(
2148                 self.pg1,
2149                 src=self.pg1.remote_ip4,
2150                 dst=p.remote_tun_if_host,
2151                 count=count,
2152                 payload_size=payload_size,
2153             )
2154             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
2155             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2156
2157             for rx in recv_pkts:
2158                 self.assertEqual(rx[IP].src, p.tun_src)
2159                 self.assertEqual(rx[IP].dst, p.tun_dst)
2160
2161         finally:
2162             self.logger.info(self.vapi.ppcli("show error"))
2163             self.logger.info(self.vapi.ppcli("show ipsec all"))
2164
2165         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
2166         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
2167         self.verify_counters4(p, count, n_rx)
2168
2169     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
2170         self.vapi.cli("clear errors")
2171         if not n_rx:
2172             n_rx = count
2173         try:
2174             send_pkts = self.gen_encrypt_pkts(
2175                 p,
2176                 p.scapy_tun_sa,
2177                 self.tun_if,
2178                 src=p.remote_tun_if_host,
2179                 dst=self.pg1.remote_ip4,
2180                 count=count,
2181             )
2182             self.send_and_assert_no_replies(self.tun_if, send_pkts)
2183
2184             send_pkts = self.gen_pkts(
2185                 self.pg1,
2186                 src=self.pg1.remote_ip4,
2187                 dst=p.remote_tun_if_host,
2188                 count=count,
2189                 payload_size=payload_size,
2190             )
2191             self.send_and_assert_no_replies(self.pg1, send_pkts)
2192
2193         finally:
2194             self.logger.info(self.vapi.ppcli("show error"))
2195             self.logger.info(self.vapi.ppcli("show ipsec all"))
2196
2197     def verify_tun_reass_44(self, p):
2198         self.vapi.cli("clear errors")
2199         self.vapi.ip_reassembly_enable_disable(
2200             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
2201         )
2202
2203         try:
2204             send_pkts = self.gen_encrypt_pkts(
2205                 p,
2206                 p.scapy_tun_sa,
2207                 self.tun_if,
2208                 src=p.remote_tun_if_host,
2209                 dst=self.pg1.remote_ip4,
2210                 payload_size=1900,
2211                 count=1,
2212             )
2213             send_pkts = fragment_rfc791(send_pkts[0], 1400)
2214             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2215             self.verify_decrypted(p, recv_pkts)
2216
2217             send_pkts = self.gen_pkts(
2218                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
2219             )
2220             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2221             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2222
2223         finally:
2224             self.logger.info(self.vapi.ppcli("show error"))
2225             self.logger.info(self.vapi.ppcli("show ipsec all"))
2226
2227         self.verify_counters4(p, 1, 1)
2228         self.vapi.ip_reassembly_enable_disable(
2229             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
2230         )
2231
2232     def verify_tun_64(self, p, count=1):
2233         self.vapi.cli("clear errors")
2234         self.vapi.cli("clear ipsec sa")
2235         try:
2236             send_pkts = self.gen_encrypt_pkts6(
2237                 p,
2238                 p.scapy_tun_sa,
2239                 self.tun_if,
2240                 src=p.remote_tun_if_host6,
2241                 dst=self.pg1.remote_ip6,
2242                 count=count,
2243             )
2244             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2245             for recv_pkt in recv_pkts:
2246                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
2247                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
2248                 self.assert_packet_checksums_valid(recv_pkt)
2249             send_pkts = self.gen_pkts6(
2250                 p,
2251                 self.pg1,
2252                 src=self.pg1.remote_ip6,
2253                 dst=p.remote_tun_if_host6,
2254                 count=count,
2255             )
2256             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2257             for recv_pkt in recv_pkts:
2258                 try:
2259                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
2260                     if not decrypt_pkt.haslayer(IPv6):
2261                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2262                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2263                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
2264                     self.assert_packet_checksums_valid(decrypt_pkt)
2265                 except:
2266                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
2267                     try:
2268                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2269                     except:
2270                         pass
2271                     raise
2272         finally:
2273             self.logger.info(self.vapi.ppcli("show error"))
2274             self.logger.info(self.vapi.ppcli("show ipsec all"))
2275
2276         self.verify_counters4(p, count)
2277
2278     def verify_keepalive(self, p):
2279         # the sizeof Raw is calculated to pad to the minimum ehternet
2280         # frame size of 64 btyes
2281         pkt = (
2282             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2283             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2284             / UDP(sport=333, dport=4500)
2285             / Raw(b"\xff")
2286             / Padding(0 * 21)
2287         )
2288         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2289         self.assert_error_counter_equal(
2290             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
2291         )
2292
2293         pkt = (
2294             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2295             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2296             / UDP(sport=333, dport=4500)
2297             / Raw(b"\xfe")
2298         )
2299         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2300         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
2301
2302         pkt = (
2303             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2304             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
2305             / UDP(sport=333, dport=4500)
2306             / Raw(b"\xfe")
2307             / Padding(0 * 21)
2308         )
2309         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2310         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
2311
2312
2313 class IpsecTun4Tests(IpsecTun4):
2314     """UT test methods for Tunnel v4"""
2315
2316     def test_tun_basic44(self):
2317         """ipsec 4o4 tunnel basic test"""
2318         self.verify_tun_44(self.params[socket.AF_INET], count=1)
2319         self.tun_if.admin_down()
2320         self.tun_if.resolve_arp()
2321         self.tun_if.admin_up()
2322         self.verify_tun_44(self.params[socket.AF_INET], count=1)
2323
2324     def test_tun_reass_basic44(self):
2325         """ipsec 4o4 tunnel basic reassembly test"""
2326         self.verify_tun_reass_44(self.params[socket.AF_INET])
2327
2328     def test_tun_burst44(self):
2329         """ipsec 4o4 tunnel burst test"""
2330         self.verify_tun_44(self.params[socket.AF_INET], count=127)
2331
2332
2333 class IpsecTun6(object):
2334     """verify methods for Tunnel v6"""
2335
2336     def verify_counters6(self, p_in, p_out, count, worker=None):
2337         if hasattr(p_in, "tun_sa_in"):
2338             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
2339             self.assertEqual(
2340                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
2341             )
2342         if hasattr(p_out, "tun_sa_out"):
2343             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
2344             self.assertEqual(
2345                 pkts,
2346                 count,
2347                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
2348             )
2349         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
2350         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
2351
2352     def verify_decrypted6(self, p, rxs):
2353         for rx in rxs:
2354             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2355             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2356             self.assert_packet_checksums_valid(rx)
2357
2358     def verify_encrypted6(self, p, sa, rxs):
2359         for rx in rxs:
2360             self.assert_packet_checksums_valid(rx)
2361             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
2362             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
2363             if p.outer_flow_label:
2364                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
2365             try:
2366                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
2367                 if not decrypt_pkt.haslayer(IPv6):
2368                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
2369                 self.assert_packet_checksums_valid(decrypt_pkt)
2370                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
2371                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
2372                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
2373                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
2374             except:
2375                 self.logger.debug(ppp("Unexpected packet:", rx))
2376                 try:
2377                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2378                 except:
2379                     pass
2380                 raise
2381
2382     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
2383         self.vapi.cli("clear errors")
2384         self.vapi.cli("clear ipsec sa")
2385
2386         send_pkts = self.gen_pkts6(
2387             p_in,
2388             self.pg1,
2389             src=self.pg1.remote_ip6,
2390             dst=p_in.remote_tun_if_host,
2391             count=count,
2392             payload_size=payload_size,
2393         )
2394         self.send_and_assert_no_replies(self.tun_if, send_pkts)
2395         self.logger.info(self.vapi.cli("sh punt stats"))
2396
2397     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
2398         self.vapi.cli("clear errors")
2399         self.vapi.cli("clear ipsec sa")
2400
2401         send_pkts = self.gen_encrypt_pkts6(
2402             p_in,
2403             p_in.scapy_tun_sa,
2404             self.tun_if,
2405             src=p_in.remote_tun_if_host,
2406             dst=self.pg1.remote_ip6,
2407             count=count,
2408         )
2409         self.send_and_assert_no_replies(self.tun_if, send_pkts)
2410
2411     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
2412         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
2413         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
2414
2415     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
2416         self.vapi.cli("clear errors")
2417         self.vapi.cli("clear ipsec sa")
2418         if not p_out:
2419             p_out = p_in
2420         try:
2421             send_pkts = self.gen_encrypt_pkts6(
2422                 p_in,
2423                 p_in.scapy_tun_sa,
2424                 self.tun_if,
2425                 src=p_in.remote_tun_if_host,
2426                 dst=self.pg1.remote_ip6,
2427                 count=count,
2428                 payload_size=payload_size,
2429             )
2430             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2431             self.verify_decrypted6(p_in, recv_pkts)
2432
2433             send_pkts = self.gen_pkts6(
2434                 p_in,
2435                 self.pg1,
2436                 src=self.pg1.remote_ip6,
2437                 dst=p_out.remote_tun_if_host,
2438                 count=count,
2439                 payload_size=payload_size,
2440             )
2441             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2442             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
2443
2444             for rx in recv_pkts:
2445                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
2446                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
2447
2448         finally:
2449             self.logger.info(self.vapi.ppcli("show error"))
2450             self.logger.info(self.vapi.ppcli("show ipsec all"))
2451         self.verify_counters6(p_in, p_out, count)
2452
2453     def verify_tun_reass_66(self, p):
2454         self.vapi.cli("clear errors")
2455         self.vapi.ip_reassembly_enable_disable(
2456             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
2457         )
2458
2459         try:
2460             send_pkts = self.gen_encrypt_pkts6(
2461                 p,
2462                 p.scapy_tun_sa,
2463                 self.tun_if,
2464                 src=p.remote_tun_if_host,
2465                 dst=self.pg1.remote_ip6,
2466                 count=1,
2467                 payload_size=1850,
2468             )
2469             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
2470             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
2471             self.verify_decrypted6(p, recv_pkts)
2472
2473             send_pkts = self.gen_pkts6(
2474                 p,
2475                 self.pg1,
2476                 src=self.pg1.remote_ip6,
2477                 dst=p.remote_tun_if_host,
2478                 count=1,
2479                 payload_size=64,
2480             )
2481             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2482             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2483         finally:
2484             self.logger.info(self.vapi.ppcli("show error"))
2485             self.logger.info(self.vapi.ppcli("show ipsec all"))
2486         self.verify_counters6(p, p, 1)
2487         self.vapi.ip_reassembly_enable_disable(
2488             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
2489         )
2490
2491     def verify_tun_46(self, p, count=1):
2492         """ipsec 4o6 tunnel basic test"""
2493         self.vapi.cli("clear errors")
2494         self.vapi.cli("clear ipsec sa")
2495         try:
2496             send_pkts = self.gen_encrypt_pkts(
2497                 p,
2498                 p.scapy_tun_sa,
2499                 self.tun_if,
2500                 src=p.remote_tun_if_host4,
2501                 dst=self.pg1.remote_ip4,
2502                 count=count,
2503             )
2504             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
2505             for recv_pkt in recv_pkts:
2506                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
2507                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
2508                 self.assert_packet_checksums_valid(recv_pkt)
2509             send_pkts = self.gen_pkts(
2510                 self.pg1,
2511                 src=self.pg1.remote_ip4,
2512                 dst=p.remote_tun_if_host4,
2513                 count=count,
2514             )
2515             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
2516             for recv_pkt in recv_pkts:
2517                 try:
2518                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
2519                     if not decrypt_pkt.haslayer(IP):
2520                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
2521                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
2522                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
2523                     self.assert_packet_checksums_valid(decrypt_pkt)
2524                 except:
2525                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
2526                     try:
2527                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
2528                     except:
2529                         pass
2530                     raise
2531         finally:
2532             self.logger.info(self.vapi.ppcli("show error"))
2533             self.logger.info(self.vapi.ppcli("show ipsec all"))
2534         self.verify_counters6(p, p, count)
2535
2536     def verify_keepalive(self, p):
2537         # the sizeof Raw is calculated to pad to the minimum ehternet
2538         # frame size of 64 btyes
2539         pkt = (
2540             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2541             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2542             / UDP(sport=333, dport=4500)
2543             / Raw(b"\xff")
2544             / Padding(0 * 1)
2545         )
2546         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2547         self.assert_error_counter_equal(
2548             "/err/%s/nat_keepalive" % self.tun6_input_node, 31
2549         )
2550
2551         pkt = (
2552             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2553             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2554             / UDP(sport=333, dport=4500)
2555             / Raw(b"\xfe")
2556         )
2557         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2558         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
2559
2560         pkt = (
2561             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
2562             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
2563             / UDP(sport=333, dport=4500)
2564             / Raw(b"\xfe")
2565             / Padding(0 * 21)
2566         )
2567         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
2568         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
2569
2570
2571 class IpsecTun6Tests(IpsecTun6):
2572     """UT test methods for Tunnel v6"""
2573
2574     def test_tun_basic66(self):
2575         """ipsec 6o6 tunnel basic test"""
2576         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
2577
2578     def test_tun_reass_basic66(self):
2579         """ipsec 6o6 tunnel basic reassembly test"""
2580         self.verify_tun_reass_66(self.params[socket.AF_INET6])
2581
2582     def test_tun_burst66(self):
2583         """ipsec 6o6 tunnel burst test"""
2584         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
2585
2586
2587 class IpsecTun6HandoffTests(IpsecTun6):
2588     """UT test methods for Tunnel v6 with multiple workers"""
2589
2590     vpp_worker_count = 2
2591
2592     def test_tun_handoff_66(self):
2593         """ipsec 6o6 tunnel worker hand-off test"""
2594         self.vapi.cli("clear errors")
2595         self.vapi.cli("clear ipsec sa")
2596
2597         N_PKTS = 15
2598         p = self.params[socket.AF_INET6]
2599
2600         # inject alternately on worker 0 and 1. all counts on the SA
2601         # should be against worker 0
2602         for worker in [0, 1, 0, 1]:
2603             send_pkts = self.gen_encrypt_pkts6(
2604                 p,
2605                 p.scapy_tun_sa,
2606                 self.tun_if,
2607                 src=p.remote_tun_if_host,
2608                 dst=self.pg1.remote_ip6,
2609                 count=N_PKTS,
2610             )
2611             recv_pkts = self.send_and_expect(
2612                 self.tun_if, send_pkts, self.pg1, worker=worker
2613             )
2614             self.verify_decrypted6(p, recv_pkts)
2615
2616             send_pkts = self.gen_pkts6(
2617                 p,
2618                 self.pg1,
2619                 src=self.pg1.remote_ip6,
2620                 dst=p.remote_tun_if_host,
2621                 count=N_PKTS,
2622             )
2623             recv_pkts = self.send_and_expect(
2624                 self.pg1, send_pkts, self.tun_if, worker=worker
2625             )
2626             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
2627
2628         # all counts against the first worker that was used
2629         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
2630
2631
2632 class IpsecTun4HandoffTests(IpsecTun4):
2633     """UT test methods for Tunnel v4 with multiple workers"""
2634
2635     vpp_worker_count = 2
2636
2637     def test_tun_handooff_44(self):
2638         """ipsec 4o4 tunnel worker hand-off test"""
2639         self.vapi.cli("clear errors")
2640         self.vapi.cli("clear ipsec sa")
2641
2642         N_PKTS = 15
2643         p = self.params[socket.AF_INET]
2644
2645         # inject alternately on worker 0 and 1. all counts on the SA
2646         # should be against worker 0
2647         for worker in [0, 1, 0, 1]:
2648             send_pkts = self.gen_encrypt_pkts(
2649                 p,
2650                 p.scapy_tun_sa,
2651                 self.tun_if,
2652                 src=p.remote_tun_if_host,
2653                 dst=self.pg1.remote_ip4,
2654                 count=N_PKTS,
2655             )
2656             recv_pkts = self.send_and_expect(
2657                 self.tun_if, send_pkts, self.pg1, worker=worker
2658             )
2659             self.verify_decrypted(p, recv_pkts)
2660
2661             send_pkts = self.gen_pkts(
2662                 self.pg1,
2663                 src=self.pg1.remote_ip4,
2664                 dst=p.remote_tun_if_host,
2665                 count=N_PKTS,
2666             )
2667             recv_pkts = self.send_and_expect(
2668                 self.pg1, send_pkts, self.tun_if, worker=worker
2669             )
2670             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
2671
2672         # all counts against the first worker that was used
2673         self.verify_counters4(p, 4 * N_PKTS, worker=0)
2674
2675
2676 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
2677     """UT test methods for Tunnel v6 & v4"""
2678
2679     pass
2680
2681
2682 class IPSecIPv4Fwd(VppTestCase):
2683     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
2684
2685     @classmethod
2686     def setUpConstants(cls):
2687         super(IPSecIPv4Fwd, cls).setUpConstants()
2688
2689     def setUp(self):
2690         super(IPSecIPv4Fwd, self).setUp()
2691         # store SPD objects so we can remove configs on tear down
2692         self.spd_objs = []
2693         self.spd_policies = []
2694
2695     def tearDown(self):
2696         # remove SPD policies
2697         for obj in self.spd_policies:
2698             obj.remove_vpp_config()
2699         self.spd_policies = []
2700         # remove SPD items (interface bindings first, then SPD)
2701         for obj in reversed(self.spd_objs):
2702             obj.remove_vpp_config()
2703         self.spd_objs = []
2704         # close down pg intfs
2705         for pg in self.pg_interfaces:
2706             pg.unconfig_ip4()
2707             pg.admin_down()
2708         super(IPSecIPv4Fwd, self).tearDown()
2709
2710     def create_interfaces(self, num_ifs=2):
2711         # create interfaces pg0 ... pg<num_ifs>
2712         self.create_pg_interfaces(range(num_ifs))
2713         for pg in self.pg_interfaces:
2714             # put the interface up
2715             pg.admin_up()
2716             # configure IPv4 address on the interface
2717             pg.config_ip4()
2718             # resolve ARP, so that we know VPP MAC
2719             pg.resolve_arp()
2720         self.logger.info(self.vapi.ppcli("show int addr"))
2721
2722     def spd_create_and_intf_add(self, spd_id, pg_list):
2723         spd = VppIpsecSpd(self, spd_id)
2724         spd.add_vpp_config()
2725         self.spd_objs.append(spd)
2726         for pg in pg_list:
2727             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2728             spdItf.add_vpp_config()
2729             self.spd_objs.append(spdItf)
2730
2731     def get_policy(self, policy_type):
2732         e = VppEnum.vl_api_ipsec_spd_action_t
2733         if policy_type == "protect":
2734             return e.IPSEC_API_SPD_ACTION_PROTECT
2735         elif policy_type == "bypass":
2736             return e.IPSEC_API_SPD_ACTION_BYPASS
2737         elif policy_type == "discard":
2738             return e.IPSEC_API_SPD_ACTION_DISCARD
2739         else:
2740             raise Exception("Invalid policy type: %s", policy_type)
2741
2742     def spd_add_rem_policy(
2743         self,
2744         spd_id,
2745         src_if,
2746         dst_if,
2747         proto,
2748         is_out,
2749         priority,
2750         policy_type,
2751         remove=False,
2752         all_ips=False,
2753         ip_range=False,
2754         local_ip_start=ip_address("0.0.0.0"),
2755         local_ip_stop=ip_address("255.255.255.255"),
2756         remote_ip_start=ip_address("0.0.0.0"),
2757         remote_ip_stop=ip_address("255.255.255.255"),
2758         remote_port_start=0,
2759         remote_port_stop=65535,
2760         local_port_start=0,
2761         local_port_stop=65535,
2762     ):
2763         spd = VppIpsecSpd(self, spd_id)
2764
2765         if all_ips:
2766             src_range_low = ip_address("0.0.0.0")
2767             src_range_high = ip_address("255.255.255.255")
2768             dst_range_low = ip_address("0.0.0.0")
2769             dst_range_high = ip_address("255.255.255.255")
2770
2771         elif ip_range:
2772             src_range_low = local_ip_start
2773             src_range_high = local_ip_stop
2774             dst_range_low = remote_ip_start
2775             dst_range_high = remote_ip_stop
2776
2777         else:
2778             src_range_low = src_if.remote_ip4
2779             src_range_high = src_if.remote_ip4
2780             dst_range_low = dst_if.remote_ip4
2781             dst_range_high = dst_if.remote_ip4
2782
2783         spdEntry = VppIpsecSpdEntry(
2784             self,
2785             spd,
2786             0,
2787             src_range_low,
2788             src_range_high,
2789             dst_range_low,
2790             dst_range_high,
2791             proto,
2792             priority=priority,
2793             policy=self.get_policy(policy_type),
2794             is_outbound=is_out,
2795             remote_port_start=remote_port_start,
2796             remote_port_stop=remote_port_stop,
2797             local_port_start=local_port_start,
2798             local_port_stop=local_port_stop,
2799         )
2800
2801         if remove is False:
2802             spdEntry.add_vpp_config()
2803             self.spd_policies.append(spdEntry)
2804         else:
2805             spdEntry.remove_vpp_config()
2806             self.spd_policies.remove(spdEntry)
2807         self.logger.info(self.vapi.ppcli("show ipsec all"))
2808         return spdEntry
2809
2810     def create_stream(
2811         self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
2812     ):
2813         packets = []
2814         # create SA
2815         sa = SecurityAssociation(
2816             ESP,
2817             spi=1000,
2818             crypt_algo="AES-CBC",
2819             crypt_key=b"JPjyOWBeVEQiMe7h",
2820             auth_algo="HMAC-SHA1-96",
2821             auth_key=b"C91KUR9GYMm5GfkEvNjX",
2822             tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
2823             nat_t_header=UDP(sport=src_prt, dport=dst_prt),
2824         )
2825         for i in range(pkt_count):
2826             # create packet info stored in the test case instance
2827             info = self.create_packet_info(src_if, dst_if)
2828             # convert the info into packet payload
2829             payload = self.info_to_payload(info)
2830             # create the packet itself
2831             p = []
2832             if proto == "UDP-ESP":
2833                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
2834                     IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2835                     / UDP(sport=src_prt, dport=dst_prt)
2836                     / Raw(payload)
2837                 )
2838             elif proto == "UDP":
2839                 p = (
2840                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2841                     / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2842                     / UDP(sport=src_prt, dport=dst_prt)
2843                     / Raw(payload)
2844                 )
2845             elif proto == "TCP":
2846                 p = (
2847                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2848                     / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2849                     / TCP(sport=src_prt, dport=dst_prt)
2850                     / Raw(payload)
2851                 )
2852             # store a copy of the packet in the packet info
2853             info.data = p.copy()
2854             # append the packet to the list
2855             packets.append(p)
2856         # return the created packet list
2857         return packets
2858
2859     def verify_capture(self, src_if, dst_if, capture):
2860         packet_info = None
2861         for packet in capture:
2862             try:
2863                 ip = packet[IP]
2864                 udp = packet[UDP]
2865                 # convert the payload to packet info object
2866                 payload_info = self.payload_to_info(packet)
2867                 # make sure the indexes match
2868                 self.assert_equal(
2869                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2870                 )
2871                 self.assert_equal(
2872                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2873                 )
2874                 packet_info = self.get_next_packet_info_for_interface2(
2875                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2876                 )
2877                 # make sure we didn't run out of saved packets
2878                 self.assertIsNotNone(packet_info)
2879                 self.assert_equal(
2880                     payload_info.index, packet_info.index, "packet info index"
2881                 )
2882                 saved_packet = packet_info.data  # fetch the saved packet
2883                 # assert the values match
2884                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2885                 # ... more assertions here
2886                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2887             except Exception as e:
2888                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2889                 raise
2890         remaining_packet = self.get_next_packet_info_for_interface2(
2891             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2892         )
2893         self.assertIsNone(
2894             remaining_packet,
2895             "Interface %s: Packet expected from interface "
2896             "%s didn't arrive" % (dst_if.name, src_if.name),
2897         )
2898
2899     def verify_policy_match(self, pkt_count, spdEntry):
2900         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2901         matched_pkts = spdEntry.get_stats().get("packets")
2902         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2903         self.assert_equal(pkt_count, matched_pkts)
2904
2905     # Method verify_l3_l4_capture() will verify network and transport layer
2906     # fields of the packet sa.encrypt() gives interface number garbadge.
2907     # thus interface validation get failed (scapy bug?). However our intent
2908     # is to verify IP layer and above and that is covered.
2909
2910     def verify_l3_l4_capture(
2911         self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
2912     ):
2913         for packet in capture:
2914             try:
2915                 self.assert_packet_checksums_valid(packet)
2916                 self.assert_equal(
2917                     packet[IP].src,
2918                     src_if.remote_ip4,
2919                     "decrypted packet source address",
2920                 )
2921                 self.assert_equal(
2922                     packet[IP].dst,
2923                     dst_if.remote_ip4,
2924                     "decrypted packet destination address",
2925                 )
2926                 if packet.haslayer(TCP):
2927                     self.assertFalse(
2928                         packet.haslayer(UDP),
2929                         "unexpected UDP header in decrypted packet",
2930                     )
2931                 elif packet.haslayer(UDP):
2932                     if packet[UDP].payload:
2933                         self.assertFalse(
2934                             packet[UDP][1].haslayer(UDP),
2935                             "unexpected UDP header in decrypted packet",
2936                         )
2937                 else:
2938                     self.assertFalse(
2939                         packet.haslayer(UDP),
2940                         "unexpected UDP header in decrypted packet",
2941                     )
2942                     self.assert_equal(
2943                         packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
2944                     )
2945             except Exception:
2946                 self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
2947                 raise
2948
2949
2950 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2951     @classmethod
2952     def setUpConstants(cls):
2953         super(SpdFlowCacheTemplate, cls).setUpConstants()
2954         # Override this method with required cmdline parameters e.g.
2955         # cls.vpp_cmdline.extend(["ipsec", "{",
2956         #                         "ipv4-outbound-spd-flow-cache on",
2957         #                         "}"])
2958         # cls.logger.info("VPP modified cmdline is %s" % " "
2959         #                 .join(cls.vpp_cmdline))
2960
2961     def setUp(self):
2962         super(SpdFlowCacheTemplate, self).setUp()
2963
2964     def tearDown(self):
2965         super(SpdFlowCacheTemplate, self).tearDown()
2966
2967     def get_spd_flow_cache_entries(self, outbound):
2968         """'show ipsec spd' output:
2969         ipv4-inbound-spd-flow-cache-entries: 0
2970         ipv4-outbound-spd-flow-cache-entries: 0
2971         """
2972         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2973         # match the relevant section of 'show ipsec spd' output
2974         if outbound:
2975             regex_match = re.search(
2976                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2977                 show_ipsec_reply,
2978                 re.DOTALL,
2979             )
2980         else:
2981             regex_match = re.search(
2982                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2983             )
2984         if regex_match is None:
2985             raise Exception(
2986                 "Unable to find spd flow cache entries \
2987                 in 'show ipsec spd' CLI output - regex failed to match"
2988             )
2989         else:
2990             try:
2991                 num_entries = int(regex_match.group(1))
2992             except ValueError:
2993                 raise Exception(
2994                     "Unable to get spd flow cache entries \
2995                 from 'show ipsec spd' string: %s",
2996                     regex_match.group(0),
2997                 )
2998             self.logger.info("%s", regex_match.group(0))
2999         return num_entries
3000
3001     def verify_num_outbound_flow_cache_entries(self, expected_elements):
3002         self.assertEqual(
3003             self.get_spd_flow_cache_entries(outbound=True), expected_elements
3004         )
3005
3006     def verify_num_inbound_flow_cache_entries(self, expected_elements):
3007         self.assertEqual(
3008             self.get_spd_flow_cache_entries(outbound=False), expected_elements
3009         )
3010
3011     def crc32_supported(self):
3012         # lscpu is part of util-linux package, available on all Linux Distros
3013         stream = os.popen("lscpu")
3014         cpu_info = stream.read()
3015         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
3016         # see vppinfra/crc32.h
3017         if "crc32" or "sse4_2" in cpu_info:
3018             self.logger.info("\ncrc32 supported:\n" + cpu_info)
3019             return True
3020         else:
3021             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
3022             return False
3023
3024     def create_stream(
3025         cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3026     ):
3027         packets = []
3028         packets = super(SpdFlowCacheTemplate, cls).create_stream(
3029             src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3030         )
3031         return packets
3032
3033     def verify_capture(
3034         self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3035     ):
3036         super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
3037             src_if, dst_if, capture, tcp_port_in, udp_port_in
3038         )
3039
3040
3041 class SpdFastPathTemplate(IPSecIPv4Fwd):
3042     @classmethod
3043     def setUpConstants(cls):
3044         super(SpdFastPathTemplate, cls).setUpConstants()
3045         # Override this method with required cmdline parameters e.g.
3046         # cls.vpp_cmdline.extend(["ipsec", "{",
3047         #                         "ipv4-outbound-spd-flow-cache on",
3048         #                         "}"])
3049         # cls.logger.info("VPP modified cmdline is %s" % " "
3050         #                 .join(cls.vpp_cmdline))
3051
3052     def setUp(self):
3053         super(SpdFastPathTemplate, self).setUp()
3054
3055     def tearDown(self):
3056         super(SpdFastPathTemplate, self).tearDown()
3057
3058     def create_stream(
3059         cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3060     ):
3061         packets = []
3062         packets = super(SpdFastPathTemplate, cls).create_stream(
3063             src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3064         )
3065         return packets
3066
3067     def verify_capture(
3068         self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3069     ):
3070         super(SpdFastPathTemplate, self).verify_l3_l4_capture(
3071             src_if, dst_if, capture, tcp_port_in, udp_port_in
3072         )
3073
3074
3075 class IpsecDefaultTemplate(IPSecIPv4Fwd):
3076     @classmethod
3077     def setUpConstants(cls):
3078         super(IpsecDefaultTemplate, cls).setUpConstants()
3079
3080     def setUp(self):
3081         super(IpsecDefaultTemplate, self).setUp()
3082
3083     def tearDown(self):
3084         super(IpsecDefaultTemplate, self).tearDown()
3085
3086     def create_stream(
3087         cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
3088     ):
3089         packets = []
3090         packets = super(IpsecDefaultTemplate, cls).create_stream(
3091             src_if, dst_if, pkt_count, src_prt, dst_prt, proto
3092         )
3093         return packets
3094
3095     def verify_capture(
3096         self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
3097     ):
3098         super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
3099             src_if, dst_if, capture, tcp_port_in, udp_port_in
3100         )
3101
3102
3103 class IPSecIPv6Fwd(VppTestCase):
3104     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
3105
3106     @classmethod
3107     def setUpConstants(cls):
3108         super(IPSecIPv6Fwd, cls).setUpConstants()
3109
3110     def setUp(self):
3111         super(IPSecIPv6Fwd, self).setUp()
3112         # store SPD objects so we can remove configs on tear down
3113         self.spd_objs = []
3114         self.spd_policies = []
3115
3116     def tearDown(self):
3117         # remove SPD policies
3118         for obj in self.spd_policies:
3119             obj.remove_vpp_config()
3120         self.spd_policies = []
3121         # remove SPD items (interface bindings first, then SPD)
3122         for obj in reversed(self.spd_objs):
3123             obj.remove_vpp_config()
3124         self.spd_objs = []
3125         # close down pg intfs
3126         for pg in self.pg_interfaces:
3127             pg.unconfig_ip6()
3128             pg.admin_down()
3129         super(IPSecIPv6Fwd, self).tearDown()
3130
3131     def create_interfaces(self, num_ifs=2):
3132         # create interfaces pg0 ... pg<num_ifs>
3133         self.create_pg_interfaces(range(num_ifs))
3134         for pg in self.pg_interfaces:
3135             # put the interface up
3136             pg.admin_up()
3137             # configure IPv6 address on the interface
3138             pg.config_ip6()
3139             pg.resolve_ndp()
3140         self.logger.info(self.vapi.ppcli("show int addr"))
3141
3142     def spd_create_and_intf_add(self, spd_id, pg_list):
3143         spd = VppIpsecSpd(self, spd_id)
3144         spd.add_vpp_config()
3145         self.spd_objs.append(spd)
3146         for pg in pg_list:
3147             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
3148             spdItf.add_vpp_config()
3149             self.spd_objs.append(spdItf)
3150
3151     def get_policy(self, policy_type):
3152         e = VppEnum.vl_api_ipsec_spd_action_t
3153         if policy_type == "protect":
3154             return e.IPSEC_API_SPD_ACTION_PROTECT
3155         elif policy_type == "bypass":
3156             return e.IPSEC_API_SPD_ACTION_BYPASS
3157         elif policy_type == "discard":
3158             return e.IPSEC_API_SPD_ACTION_DISCARD
3159         else:
3160             raise Exception("Invalid policy type: %s", policy_type)
3161
3162     def spd_add_rem_policy(
3163         self,
3164         spd_id,
3165         src_if,
3166         dst_if,
3167         proto,
3168         is_out,
3169         priority,
3170         policy_type,
3171         remove=False,
3172         all_ips=False,
3173         ip_range=False,
3174         local_ip_start=ip_address("0::0"),
3175         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3176         remote_ip_start=ip_address("0::0"),
3177         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
3178         remote_port_start=0,
3179         remote_port_stop=65535,
3180         local_port_start=0,
3181         local_port_stop=65535,
3182     ):
3183         spd = VppIpsecSpd(self, spd_id)
3184
3185         if all_ips:
3186             src_range_low = ip_address("0::0")
3187             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3188             dst_range_low = ip_address("0::0")
3189             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
3190
3191         elif ip_range:
3192             src_range_low = local_ip_start
3193             src_range_high = local_ip_stop
3194             dst_range_low = remote_ip_start
3195             dst_range_high = remote_ip_stop
3196
3197         else:
3198             src_range_low = src_if.remote_ip6
3199             src_range_high = src_if.remote_ip6
3200             dst_range_low = dst_if.remote_ip6
3201             dst_range_high = dst_if.remote_ip6
3202
3203         spdEntry = VppIpsecSpdEntry(
3204             self,
3205             spd,
3206             0,
3207             src_range_low,
3208             src_range_high,
3209             dst_range_low,
3210             dst_range_high,
3211             proto,
3212             priority=priority,
3213             policy=self.get_policy(policy_type),
3214             is_outbound=is_out,
3215             remote_port_start=remote_port_start,
3216             remote_port_stop=remote_port_stop,
3217             local_port_start=local_port_start,
3218             local_port_stop=local_port_stop,
3219         )
3220
3221         if remove is False:
3222             spdEntry.add_vpp_config()
3223             self.spd_policies.append(spdEntry)
3224         else:
3225             spdEntry.remove_vpp_config()
3226             self.spd_policies.remove(spdEntry)
3227         self.logger.info(self.vapi.ppcli("show ipsec all"))
3228         return spdEntry
3229
3230     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
3231         packets = []
3232         for i in range(pkt_count):
3233             # create packet info stored in the test case instance
3234             info = self.create_packet_info(src_if, dst_if)
3235             # convert the info into packet payload
3236             payload = self.info_to_payload(info)
3237             # create the packet itself
3238             p = (
3239                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
3240                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
3241                 / UDP(sport=src_prt, dport=dst_prt)
3242                 / Raw(payload)
3243             )
3244             # store a copy of the packet in the packet info
3245             info.data = p.copy()
3246             # append the packet to the list
3247             packets.append(p)
3248         # return the created packet list
3249         return packets
3250
3251     def verify_capture(self, src_if, dst_if, capture):
3252         packet_info = None
3253         for packet in capture:
3254             try:
3255                 ip = packet[IPv6]
3256                 udp = packet[UDP]
3257                 # convert the payload to packet info object
3258                 payload_info = self.payload_to_info(packet)
3259                 # make sure the indexes match
3260                 self.assert_equal(
3261                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
3262                 )
3263                 self.assert_equal(
3264                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
3265                 )
3266                 packet_info = self.get_next_packet_info_for_interface2(
3267                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
3268                 )
3269                 # make sure we didn't run out of saved packets
3270                 self.assertIsNotNone(packet_info)
3271                 self.assert_equal(
3272                     payload_info.index, packet_info.index, "packet info index"
3273                 )
3274                 saved_packet = packet_info.data  # fetch the saved packet
3275                 # assert the values match
3276                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
3277                 # ... more assertions here
3278                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
3279             except Exception as e:
3280                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3281                 raise
3282         remaining_packet = self.get_next_packet_info_for_interface2(
3283             src_if.sw_if_index, dst_if.sw_if_index, packet_info
3284         )
3285         self.assertIsNone(
3286             remaining_packet,
3287             "Interface %s: Packet expected from interface "
3288             "%s didn't arrive" % (dst_if.name, src_if.name),
3289         )
3290
3291     def verify_policy_match(self, pkt_count, spdEntry):
3292         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
3293         matched_pkts = spdEntry.get_stats().get("packets")
3294         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
3295         self.assert_equal(pkt_count, matched_pkts)
3296
3297
3298 if __name__ == "__main__":
3299     unittest.main(testRunner=VppTestRunner)