defd6f2bb3fcb54ced8bd0c85dc4d336dd467771
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.auth_algo_vpp_id = (
56             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
57         )
58         self.auth_algo = "HMAC-SHA1-96"  # scapy name
59         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
60
61         self.crypt_algo_vpp_id = (
62             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
63         )
64         self.crypt_algo = "AES-CBC"  # scapy name
65         self.crypt_key = b"JPjyOWBeVEQiMe7h"
66         self.salt = 0
67         self.flags = 0
68         self.nat_header = None
69         self.tun_flags = (
70             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
71         )
72         self.dscp = 0
73         self.async_mode = False
74
75
76 class IPsecIPv6Params:
77
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.auth_algo_vpp_id = (
104             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
105         )
106         self.auth_algo = "HMAC-SHA1-96"  # scapy name
107         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
108
109         self.crypt_algo_vpp_id = (
110             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
111         )
112         self.crypt_algo = "AES-CBC"  # scapy name
113         self.crypt_key = b"JPjyOWBeVEQiMe7h"
114         self.salt = 0
115         self.flags = 0
116         self.nat_header = None
117         self.tun_flags = (
118             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
119         )
120         self.dscp = 0
121         self.async_mode = False
122
123
124 def mk_scapy_crypt_key(p):
125     if p.crypt_algo in ("AES-GCM", "AES-CTR", "AES-NULL-GMAC"):
126         return p.crypt_key + struct.pack("!I", p.salt)
127     else:
128         return p.crypt_key
129
130
131 def config_tun_params(p, encryption_type, tun_if):
132     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
133     esn_en = bool(
134         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
135     )
136     p.tun_dst = tun_if.remote_addr[p.addr_type]
137     p.tun_src = tun_if.local_addr[p.addr_type]
138     crypt_key = mk_scapy_crypt_key(p)
139     p.scapy_tun_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.scapy_tun_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147         nat_t_header=p.nat_header,
148         esn_en=esn_en,
149     )
150     p.vpp_tun_sa = SecurityAssociation(
151         encryption_type,
152         spi=p.vpp_tun_spi,
153         crypt_algo=p.crypt_algo,
154         crypt_key=crypt_key,
155         auth_algo=p.auth_algo,
156         auth_key=p.auth_key,
157         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158         nat_t_header=p.nat_header,
159         esn_en=esn_en,
160     )
161
162
163 def config_tra_params(p, encryption_type):
164     esn_en = bool(
165         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
166     )
167     crypt_key = mk_scapy_crypt_key(p)
168     p.scapy_tra_sa = SecurityAssociation(
169         encryption_type,
170         spi=p.scapy_tra_spi,
171         crypt_algo=p.crypt_algo,
172         crypt_key=crypt_key,
173         auth_algo=p.auth_algo,
174         auth_key=p.auth_key,
175         nat_t_header=p.nat_header,
176         esn_en=esn_en,
177     )
178     p.vpp_tra_sa = SecurityAssociation(
179         encryption_type,
180         spi=p.vpp_tra_spi,
181         crypt_algo=p.crypt_algo,
182         crypt_key=crypt_key,
183         auth_algo=p.auth_algo,
184         auth_key=p.auth_key,
185         nat_t_header=p.nat_header,
186         esn_en=esn_en,
187     )
188
189
190 class TemplateIpsec(VppTestCase):
191     """
192     TRANSPORT MODE::
193
194          ------   encrypt   ---
195         |tra_if| <-------> |VPP|
196          ------   decrypt   ---
197
198     TUNNEL MODE::
199
200          ------   encrypt   ---   plain   ---
201         |tun_if| <-------  |VPP| <------ |pg1|
202          ------             ---           ---
203
204          ------   decrypt   ---   plain   ---
205         |tun_if| ------->  |VPP| ------> |pg1|
206          ------             ---           ---
207     """
208
209     tun_spd_id = 1
210     tra_spd_id = 2
211
212     def ipsec_select_backend(self):
213         """empty method to be overloaded when necessary"""
214         pass
215
216     @classmethod
217     def setUpClass(cls):
218         super(TemplateIpsec, cls).setUpClass()
219
220     @classmethod
221     def tearDownClass(cls):
222         super(TemplateIpsec, cls).tearDownClass()
223
224     def setup_params(self):
225         if not hasattr(self, "ipv4_params"):
226             self.ipv4_params = IPsecIPv4Params()
227         if not hasattr(self, "ipv6_params"):
228             self.ipv6_params = IPsecIPv6Params()
229         self.params = {
230             self.ipv4_params.addr_type: self.ipv4_params,
231             self.ipv6_params.addr_type: self.ipv6_params,
232         }
233
234     def config_interfaces(self):
235         self.create_pg_interfaces(range(3))
236         self.interfaces = list(self.pg_interfaces)
237         for i in self.interfaces:
238             i.admin_up()
239             i.config_ip4()
240             i.resolve_arp()
241             i.config_ip6()
242             i.resolve_ndp()
243
244     def setUp(self):
245         super(TemplateIpsec, self).setUp()
246
247         self.setup_params()
248
249         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
251
252         self.config_interfaces()
253
254         self.ipsec_select_backend()
255
256     def unconfig_interfaces(self):
257         for i in self.interfaces:
258             i.admin_down()
259             i.unconfig_ip4()
260             i.unconfig_ip6()
261
262     def tearDown(self):
263         super(TemplateIpsec, self).tearDown()
264
265         self.unconfig_interfaces()
266
267     def show_commands_at_teardown(self):
268         self.logger.info(self.vapi.cli("show hardware"))
269
270     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
271         return [
272             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274             for i in range(count)
275         ]
276
277     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
278         return [
279             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
280             / sa.encrypt(
281                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
283             )
284             for i in range(count)
285         ]
286
287     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
288         return [
289             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290             / IP(src=src, dst=dst)
291             / ICMP()
292             / Raw(b"X" * payload_size)
293             for i in range(count)
294         ]
295
296     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
297         return [
298             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301             for i in range(count)
302         ]
303
304
305 class IpsecTcp(object):
306     def verify_tcp_checksum(self):
307         # start http cli server listener on http://0.0.0.0:80
308         self.vapi.cli("http cli server")
309         p = self.params[socket.AF_INET]
310         send = Ether(
311             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312         ) / p.scapy_tun_sa.encrypt(
313             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314             / TCP(flags="S", dport=80)
315         )
316         self.logger.debug(ppp("Sending packet:", send))
317         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
318         recv = recv[0]
319         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320         self.assert_packet_checksums_valid(decrypted)
321
322
323 class IpsecTcpTests(IpsecTcp):
324     def test_tcp_checksum(self):
325         """verify checksum correctness for vpp generated packets"""
326         self.verify_tcp_checksum()
327
328
329 class IpsecTra4(object):
330     """verify methods for Transport v4"""
331
332     def get_replay_counts(self, p):
333         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
334         count = self.statistics.get_err_counter(replay_node_name)
335
336         if p.async_mode:
337             replay_post_node_name = (
338                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
339             )
340             count += self.statistics.get_err_counter(replay_post_node_name)
341
342         return count
343
344     def get_hash_failed_counts(self, p):
345         if ESP == self.encryption_type and p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
346             hash_failed_node_name = (
347                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
348             )
349         else:
350             hash_failed_node_name = (
351                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
352             )
353         count = self.statistics.get_err_counter(hash_failed_node_name)
354
355         if p.async_mode:
356             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
357
358         return count
359
360     def verify_hi_seq_num(self):
361         p = self.params[socket.AF_INET]
362         saf = VppEnum.vl_api_ipsec_sad_flags_t
363         esn_on = p.vpp_tra_sa.esn_en
364         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
365
366         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
367         replay_count = self.get_replay_counts(p)
368         hash_failed_count = self.get_hash_failed_counts(p)
369         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
370
371         # a few packets so we get the rx seq number above the window size and
372         # thus can simulate a wrap with an out of window packet
373         pkts = [
374             (
375                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
376                 / p.scapy_tra_sa.encrypt(
377                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
378                     seq_num=seq,
379                 )
380             )
381             for seq in range(63, 80)
382         ]
383         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
384
385         # these 4 packets will all choose seq-num 0 to decrpyt since none
386         # are out of window when first checked. however, once #200 has
387         # decrypted it will move the window to 200 and has #81 is out of
388         # window. this packet should be dropped.
389         pkts = [
390             (
391                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
392                 / p.scapy_tra_sa.encrypt(
393                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
394                     seq_num=200,
395                 )
396             ),
397             (
398                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
399                 / p.scapy_tra_sa.encrypt(
400                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
401                     seq_num=81,
402                 )
403             ),
404             (
405                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
406                 / p.scapy_tra_sa.encrypt(
407                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
408                     seq_num=201,
409                 )
410             ),
411             (
412                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
413                 / p.scapy_tra_sa.encrypt(
414                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
415                     seq_num=202,
416                 )
417             ),
418         ]
419
420         # if anti-replay is off then we won't drop #81
421         n_rx = 3 if ar_on else 4
422         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
423         # this packet is one before the wrap
424         pkts = [
425             (
426                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
427                 / p.scapy_tra_sa.encrypt(
428                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
429                     seq_num=203,
430                 )
431             )
432         ]
433         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
434
435         # a replayed packet, then an out of window, then a legit
436         # tests that a early failure on the batch doesn't affect subsequent packets.
437         pkts = [
438             (
439                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
440                 / p.scapy_tra_sa.encrypt(
441                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
442                     seq_num=203,
443                 )
444             ),
445             (
446                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
447                 / p.scapy_tra_sa.encrypt(
448                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
449                     seq_num=81,
450                 )
451             ),
452             (
453                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
454                 / p.scapy_tra_sa.encrypt(
455                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
456                     seq_num=204,
457                 )
458             ),
459         ]
460         n_rx = 1 if ar_on else 3
461         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
462
463         # move the window over half way to a wrap
464         pkts = [
465             (
466                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
467                 / p.scapy_tra_sa.encrypt(
468                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
469                     seq_num=0x80000001,
470                 )
471             )
472         ]
473         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
474
475         # anti-replay will drop old packets, no anti-replay will not
476         pkts = [
477             (
478                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
479                 / p.scapy_tra_sa.encrypt(
480                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
481                     seq_num=0x44000001,
482                 )
483             )
484         ]
485
486         if ar_on:
487             self.send_and_assert_no_replies(self.tra_if, pkts)
488         else:
489             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
490
491         if esn_on:
492             #
493             # validate wrapping the ESN
494             #
495
496             # wrap scapy's TX SA SN
497             p.scapy_tra_sa.seq_num = 0x100000005
498
499             # send a packet that wraps the window for both AR and no AR
500             pkts = [
501                 (
502                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
503                     / p.scapy_tra_sa.encrypt(
504                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
505                         / ICMP(),
506                         seq_num=0x100000005,
507                     )
508                 )
509             ]
510
511             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
512             for rx in rxs:
513                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
514
515             # move the window forward to half way to the next wrap
516             pkts = [
517                 (
518                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
519                     / p.scapy_tra_sa.encrypt(
520                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
521                         / ICMP(),
522                         seq_num=0x180000005,
523                     )
524                 )
525             ]
526
527             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
528
529             # a packet less than 2^30 from the current position is:
530             #  - AR: out of window and dropped
531             #  - non-AR: accepted
532             pkts = [
533                 (
534                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
535                     / p.scapy_tra_sa.encrypt(
536                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
537                         / ICMP(),
538                         seq_num=0x170000005,
539                     )
540                 )
541             ]
542
543             if ar_on:
544                 self.send_and_assert_no_replies(self.tra_if, pkts)
545             else:
546                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
547
548             # a packet more than 2^30 from the current position is:
549             #  - AR: out of window and dropped
550             #  - non-AR: considered a wrap, but since it's not a wrap
551             #    it won't decrpyt and so will be dropped
552             pkts = [
553                 (
554                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
555                     / p.scapy_tra_sa.encrypt(
556                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
557                         / ICMP(),
558                         seq_num=0x130000005,
559                     )
560                 )
561             ]
562
563             self.send_and_assert_no_replies(self.tra_if, pkts)
564
565             # a packet less than 2^30 from the current position and is a
566             # wrap; (the seq is currently at 0x180000005).
567             #  - AR: out of window so considered a wrap, so accepted
568             #  - non-AR: not considered a wrap, so won't decrypt
569             p.scapy_tra_sa.seq_num = 0x260000005
570             pkts = [
571                 (
572                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
573                     / p.scapy_tra_sa.encrypt(
574                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
575                         / ICMP(),
576                         seq_num=0x260000005,
577                     )
578                 )
579             ]
580             if ar_on:
581                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
582             else:
583                 self.send_and_assert_no_replies(self.tra_if, pkts)
584
585             #
586             # window positions are different now for AR/non-AR
587             #  move non-AR forward
588             #
589             if not ar_on:
590                 # a packet more than 2^30 from the current position and is a
591                 # wrap; (the seq is currently at 0x180000005).
592                 #  - AR: accepted
593                 #  - non-AR: not considered a wrap, so won't decrypt
594
595                 pkts = [
596                     (
597                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
598                         / p.scapy_tra_sa.encrypt(
599                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
600                             / ICMP(),
601                             seq_num=0x200000005,
602                         )
603                     ),
604                     (
605                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
606                         / p.scapy_tra_sa.encrypt(
607                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
608                             / ICMP(),
609                             seq_num=0x200000006,
610                         )
611                     ),
612                 ]
613                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
614
615                 pkts = [
616                     (
617                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
618                         / p.scapy_tra_sa.encrypt(
619                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
620                             / ICMP(),
621                             seq_num=0x260000005,
622                         )
623                     )
624                 ]
625                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
626
627     def verify_tra_anti_replay(self):
628         p = self.params[socket.AF_INET]
629         esn_en = p.vpp_tra_sa.esn_en
630
631         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
632         replay_count = self.get_replay_counts(p)
633         hash_failed_count = self.get_hash_failed_counts(p)
634         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
635         hash_err = "integ_error"
636
637         if ESP == self.encryption_type:
638             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
639             undersize_count = self.statistics.get_err_counter(undersize_node_name)
640             # For AES-GCM an error in the hash is reported as a decryption failure
641             if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
642                 hash_err = "decryption_failed"
643         # In async mode, we don't report errors in the hash.
644         if p.async_mode:
645             hash_err = ""
646
647         #
648         # send packets with seq numbers 1->34
649         # this means the window size is still in Case B (see RFC4303
650         # Appendix A)
651         #
652         # for reasons i haven't investigated Scapy won't create a packet with
653         # seq_num=0
654         #
655         pkts = [
656             (
657                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
658                 / p.scapy_tra_sa.encrypt(
659                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
660                     seq_num=seq,
661                 )
662             )
663             for seq in range(1, 34)
664         ]
665         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
666
667         # replayed packets are dropped
668         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
669         replay_count += len(pkts)
670         self.assertEqual(self.get_replay_counts(p), replay_count)
671         err = p.tra_sa_in.get_err("replay")
672         self.assertEqual(err, replay_count)
673
674         #
675         # now send a batch of packets all with the same sequence number
676         # the first packet in the batch is legitimate, the rest bogus
677         #
678         self.vapi.cli("clear error")
679         self.vapi.cli("clear node counters")
680         pkts = Ether(
681             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
682         ) / p.scapy_tra_sa.encrypt(
683             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
684             seq_num=35,
685         )
686         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
687         replay_count += 7
688         self.assertEqual(self.get_replay_counts(p), replay_count)
689         err = p.tra_sa_in.get_err("replay")
690         self.assertEqual(err, replay_count)
691
692         #
693         # now move the window over to 257 (more than one byte) and into Case A
694         #
695         self.vapi.cli("clear error")
696         pkt = Ether(
697             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
698         ) / p.scapy_tra_sa.encrypt(
699             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
700             seq_num=257,
701         )
702         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
703
704         # replayed packets are dropped
705         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
706         replay_count += 3
707         self.assertEqual(self.get_replay_counts(p), replay_count)
708         err = p.tra_sa_in.get_err("replay")
709         self.assertEqual(err, replay_count)
710
711         # the window size is 64 packets
712         # in window are still accepted
713         pkt = Ether(
714             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
715         ) / p.scapy_tra_sa.encrypt(
716             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
717             seq_num=200,
718         )
719         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
720
721         # a packet that does not decrypt does not move the window forward
722         bogus_sa = SecurityAssociation(
723             self.encryption_type,
724             p.scapy_tra_spi,
725             crypt_algo=p.crypt_algo,
726             crypt_key=mk_scapy_crypt_key(p)[::-1],
727             auth_algo=p.auth_algo,
728             auth_key=p.auth_key[::-1],
729         )
730         pkt = Ether(
731             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
732         ) / bogus_sa.encrypt(
733             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
734             seq_num=350,
735         )
736         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
737
738         hash_failed_count += 17
739         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
740         if hash_err != "":
741             err = p.tra_sa_in.get_err(hash_err)
742             self.assertEqual(err, hash_failed_count)
743
744         # a malformed 'runt' packet
745         #  created by a mis-constructed SA
746         if ESP == self.encryption_type and p.crypt_algo != "NULL":
747             bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
748             pkt = Ether(
749                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
750             ) / bogus_sa.encrypt(
751                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
752                 seq_num=350,
753             )
754             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
755
756             undersize_count += 17
757             self.assert_error_counter_equal(undersize_node_name, undersize_count)
758             err = p.tra_sa_in.get_err("runt")
759             self.assertEqual(err, undersize_count)
760
761         # which we can determine since this packet is still in the window
762         pkt = Ether(
763             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
764         ) / p.scapy_tra_sa.encrypt(
765             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
766             seq_num=234,
767         )
768         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
769
770         #
771         # out of window are dropped
772         #  this is Case B. So VPP will consider this to be a high seq num wrap
773         #  and so the decrypt attempt will fail
774         #
775         pkt = Ether(
776             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
777         ) / p.scapy_tra_sa.encrypt(
778             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
779             seq_num=17,
780         )
781         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
782
783         if esn_en:
784             # an out of window error with ESN looks like a high sequence
785             # wrap. but since it isn't then the verify will fail.
786             hash_failed_count += 17
787             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
788             if hash_err != "":
789                 err = p.tra_sa_in.get_err(hash_err)
790                 self.assertEqual(err, hash_failed_count)
791
792         else:
793             replay_count += 17
794             self.assertEqual(self.get_replay_counts(p), replay_count)
795             err = p.tra_sa_in.get_err("replay")
796             self.assertEqual(err, replay_count)
797
798         # valid packet moves the window over to 258
799         pkt = Ether(
800             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
801         ) / p.scapy_tra_sa.encrypt(
802             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
803             seq_num=258,
804         )
805         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
806         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
807
808         #
809         # move VPP's SA TX seq-num to just before the seq-number wrap.
810         # then fire in a packet that VPP should drop on TX because it
811         # causes the TX seq number to wrap; unless we're using extened sequence
812         # numbers.
813         #
814         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
815         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
816         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
817
818         pkts = [
819             (
820                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
821                 / p.scapy_tra_sa.encrypt(
822                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
823                     seq_num=seq,
824                 )
825             )
826             for seq in range(259, 280)
827         ]
828
829         if esn_en:
830             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
831
832             #
833             # in order for scapy to decrypt its SA's high order number needs
834             # to wrap
835             #
836             p.vpp_tra_sa.seq_num = 0x100000000
837             for rx in rxs:
838                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
839
840             #
841             # wrap scapy's TX high sequence number. VPP is in case B, so it
842             # will consider this a high seq wrap also.
843             # The low seq num we set it to will place VPP's RX window in Case A
844             #
845             p.scapy_tra_sa.seq_num = 0x100000005
846             pkt = Ether(
847                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
848             ) / p.scapy_tra_sa.encrypt(
849                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
850                 seq_num=0x100000005,
851             )
852             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
853
854             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
855
856             #
857             # A packet that has seq num between (2^32-64) and 5 is within
858             # the window
859             #
860             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
861             pkt = Ether(
862                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
863             ) / p.scapy_tra_sa.encrypt(
864                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
865                 seq_num=0xFFFFFFFD,
866             )
867             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
868             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
869
870             #
871             # While in case A we cannot wrap the high sequence number again
872             # because VPP will consider this packet to be one that moves the
873             # window forward
874             #
875             pkt = Ether(
876                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
877             ) / p.scapy_tra_sa.encrypt(
878                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
879                 seq_num=0x200000999,
880             )
881             self.send_and_assert_no_replies(
882                 self.tra_if, [pkt], self.tra_if, timeout=0.2
883             )
884
885             hash_failed_count += 1
886             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
887             if hash_err != "":
888                 err = p.tra_sa_in.get_err(hash_err)
889                 self.assertEqual(err, hash_failed_count)
890
891             #
892             # but if we move the window forward to case B, then we can wrap
893             # again
894             #
895             p.scapy_tra_sa.seq_num = 0x100000555
896             pkt = Ether(
897                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
898             ) / p.scapy_tra_sa.encrypt(
899                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
900                 seq_num=0x100000555,
901             )
902             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
903             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
904
905             p.scapy_tra_sa.seq_num = 0x200000444
906             pkt = Ether(
907                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
908             ) / p.scapy_tra_sa.encrypt(
909                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
910                 seq_num=0x200000444,
911             )
912             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
913             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
914
915         else:
916             #
917             # without ESN TX sequence numbers can't wrap and packets are
918             # dropped from here on out.
919             #
920             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
921             seq_cycle_count += len(pkts)
922             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
923             err = p.tra_sa_out.get_err("seq_cycled")
924             self.assertEqual(err, seq_cycle_count)
925
926         # move the security-associations seq number on to the last we used
927         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
928         p.scapy_tra_sa.seq_num = 351
929         p.vpp_tra_sa.seq_num = 351
930
931     def verify_tra_lost(self):
932         p = self.params[socket.AF_INET]
933         esn_en = p.vpp_tra_sa.esn_en
934
935         #
936         # send packets with seq numbers 1->34
937         # this means the window size is still in Case B (see RFC4303
938         # Appendix A)
939         #
940         # for reasons i haven't investigated Scapy won't create a packet with
941         # seq_num=0
942         #
943         pkts = [
944             (
945                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
946                 / p.scapy_tra_sa.encrypt(
947                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
948                     seq_num=seq,
949                 )
950             )
951             for seq in range(1, 3)
952         ]
953         self.send_and_expect(self.tra_if, pkts, self.tra_if)
954
955         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
956
957         # skip a sequence number
958         pkts = [
959             (
960                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
961                 / p.scapy_tra_sa.encrypt(
962                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
963                     seq_num=seq,
964                 )
965             )
966             for seq in range(4, 6)
967         ]
968         self.send_and_expect(self.tra_if, pkts, self.tra_if)
969
970         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
971
972         # the lost packet are counted untill we get up past the first
973         # sizeof(replay_window) packets
974         pkts = [
975             (
976                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
977                 / p.scapy_tra_sa.encrypt(
978                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
979                     seq_num=seq,
980                 )
981             )
982             for seq in range(6, 100)
983         ]
984         self.send_and_expect(self.tra_if, pkts, self.tra_if)
985
986         self.assertEqual(p.tra_sa_in.get_err("lost"), 1)
987
988         # lost of holes in the sequence
989         pkts = [
990             (
991                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
992                 / p.scapy_tra_sa.encrypt(
993                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
994                     seq_num=seq,
995                 )
996             )
997             for seq in range(100, 200, 2)
998         ]
999         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
1000
1001         pkts = [
1002             (
1003                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1004                 / p.scapy_tra_sa.encrypt(
1005                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1006                     seq_num=seq,
1007                 )
1008             )
1009             for seq in range(200, 300)
1010         ]
1011         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1012
1013         self.assertEqual(p.tra_sa_in.get_err("lost"), 51)
1014
1015         # a big hole in the seq number space
1016         pkts = [
1017             (
1018                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
1019                 / p.scapy_tra_sa.encrypt(
1020                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
1021                     seq_num=seq,
1022                 )
1023             )
1024             for seq in range(400, 500)
1025         ]
1026         self.send_and_expect(self.tra_if, pkts, self.tra_if)
1027
1028         self.assertEqual(p.tra_sa_in.get_err("lost"), 151)
1029
1030     def verify_tra_basic4(self, count=1, payload_size=54):
1031         """ipsec v4 transport basic test"""
1032         self.vapi.cli("clear errors")
1033         self.vapi.cli("clear ipsec sa")
1034         try:
1035             p = self.params[socket.AF_INET]
1036             send_pkts = self.gen_encrypt_pkts(
1037                 p,
1038                 p.scapy_tra_sa,
1039                 self.tra_if,
1040                 src=self.tra_if.remote_ip4,
1041                 dst=self.tra_if.local_ip4,
1042                 count=count,
1043                 payload_size=payload_size,
1044             )
1045             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1046             for rx in recv_pkts:
1047                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1048                 self.assert_packet_checksums_valid(rx)
1049                 try:
1050                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1051                     self.assert_packet_checksums_valid(decrypted)
1052                 except:
1053                     self.logger.debug(ppp("Unexpected packet:", rx))
1054                     raise
1055         finally:
1056             self.logger.info(self.vapi.ppcli("show error"))
1057             self.logger.info(self.vapi.ppcli("show ipsec all"))
1058
1059         pkts = p.tra_sa_in.get_stats()["packets"]
1060         self.assertEqual(
1061             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1062         )
1063         pkts = p.tra_sa_out.get_stats()["packets"]
1064         self.assertEqual(
1065             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1066         )
1067         self.assertEqual(p.tra_sa_out.get_err("lost"), 0)
1068         self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
1069
1070         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1071         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1072
1073
1074 class IpsecTra4Tests(IpsecTra4):
1075     """UT test methods for Transport v4"""
1076
1077     def test_tra_anti_replay(self):
1078         """ipsec v4 transport anti-replay test"""
1079         self.verify_tra_anti_replay()
1080
1081     def test_tra_lost(self):
1082         """ipsec v4 transport lost packet test"""
1083         self.verify_tra_lost()
1084
1085     def test_tra_basic(self, count=1):
1086         """ipsec v4 transport basic test"""
1087         self.verify_tra_basic4(count=1)
1088
1089     def test_tra_burst(self):
1090         """ipsec v4 transport burst test"""
1091         self.verify_tra_basic4(count=257)
1092
1093
1094 class IpsecTra6(object):
1095     """verify methods for Transport v6"""
1096
1097     def verify_tra_basic6(self, count=1, payload_size=54):
1098         self.vapi.cli("clear errors")
1099         self.vapi.cli("clear ipsec sa")
1100         try:
1101             p = self.params[socket.AF_INET6]
1102             send_pkts = self.gen_encrypt_pkts6(
1103                 p,
1104                 p.scapy_tra_sa,
1105                 self.tra_if,
1106                 src=self.tra_if.remote_ip6,
1107                 dst=self.tra_if.local_ip6,
1108                 count=count,
1109                 payload_size=payload_size,
1110             )
1111             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1112             for rx in recv_pkts:
1113                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1114                 try:
1115                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1116                     self.assert_packet_checksums_valid(decrypted)
1117                 except:
1118                     self.logger.debug(ppp("Unexpected packet:", rx))
1119                     raise
1120         finally:
1121             self.logger.info(self.vapi.ppcli("show error"))
1122             self.logger.info(self.vapi.ppcli("show ipsec all"))
1123
1124         pkts = p.tra_sa_in.get_stats()["packets"]
1125         self.assertEqual(
1126             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1127         )
1128         pkts = p.tra_sa_out.get_stats()["packets"]
1129         self.assertEqual(
1130             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1131         )
1132         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1133         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1134
1135     def gen_encrypt_pkts_ext_hdrs6(
1136         self, sa, sw_intf, src, dst, count=1, payload_size=54
1137     ):
1138         return [
1139             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1140             / sa.encrypt(
1141                 IPv6(src=src, dst=dst)
1142                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1143             )
1144             for i in range(count)
1145         ]
1146
1147     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1148         return [
1149             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1150             / IPv6(src=src, dst=dst)
1151             / IPv6ExtHdrHopByHop()
1152             / IPv6ExtHdrFragment(id=2, offset=200)
1153             / Raw(b"\xff" * 200)
1154             for i in range(count)
1155         ]
1156
1157     def verify_tra_encrypted6(self, p, sa, rxs):
1158         decrypted = []
1159         for rx in rxs:
1160             self.assert_packet_checksums_valid(rx)
1161             try:
1162                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1163                 decrypted.append(decrypt_pkt)
1164                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1165                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1166             except:
1167                 self.logger.debug(ppp("Unexpected packet:", rx))
1168                 try:
1169                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1170                 except:
1171                     pass
1172                 raise
1173         return decrypted
1174
1175     def verify_tra_66_ext_hdrs(self, p):
1176         count = 63
1177
1178         #
1179         # check we can decrypt with options
1180         #
1181         tx = self.gen_encrypt_pkts_ext_hdrs6(
1182             p.scapy_tra_sa,
1183             self.tra_if,
1184             src=self.tra_if.remote_ip6,
1185             dst=self.tra_if.local_ip6,
1186             count=count,
1187         )
1188         self.send_and_expect(self.tra_if, tx, self.tra_if)
1189
1190         #
1191         # injecting a packet from ourselves to be routed of box is a hack
1192         # but it matches an outbout policy, alors je ne regrette rien
1193         #
1194
1195         # one extension before ESP
1196         tx = (
1197             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1198             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1199             / IPv6ExtHdrFragment(id=2, offset=200)
1200             / Raw(b"\xff" * 200)
1201         )
1202
1203         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1204         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1205
1206         for dc in dcs:
1207             # for reasons i'm not going to investigate scapy does not
1208             # created the correct headers after decrypt. but reparsing
1209             # the ipv6 packet fixes it
1210             dc = IPv6(raw(dc[IPv6]))
1211             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1212
1213         # two extensions before ESP
1214         tx = (
1215             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1216             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1217             / IPv6ExtHdrHopByHop()
1218             / IPv6ExtHdrFragment(id=2, offset=200)
1219             / Raw(b"\xff" * 200)
1220         )
1221
1222         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1223         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1224
1225         for dc in dcs:
1226             dc = IPv6(raw(dc[IPv6]))
1227             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1228             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1229
1230         # two extensions before ESP, one after
1231         tx = (
1232             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1233             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1234             / IPv6ExtHdrHopByHop()
1235             / IPv6ExtHdrFragment(id=2, offset=200)
1236             / IPv6ExtHdrDestOpt()
1237             / Raw(b"\xff" * 200)
1238         )
1239
1240         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1241         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1242
1243         for dc in dcs:
1244             dc = IPv6(raw(dc[IPv6]))
1245             self.assertTrue(dc[IPv6ExtHdrDestOpt])
1246             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1247             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1248
1249
1250 class IpsecTra6Tests(IpsecTra6):
1251     """UT test methods for Transport v6"""
1252
1253     def test_tra_basic6(self):
1254         """ipsec v6 transport basic test"""
1255         self.verify_tra_basic6(count=1)
1256
1257     def test_tra_burst6(self):
1258         """ipsec v6 transport burst test"""
1259         self.verify_tra_basic6(count=257)
1260
1261
1262 class IpsecTra6ExtTests(IpsecTra6):
1263     def test_tra_ext_hdrs_66(self):
1264         """ipsec 6o6 tra extension headers test"""
1265         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1266
1267
1268 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1269     """UT test methods for Transport v6 and v4"""
1270
1271     pass
1272
1273
1274 class IpsecTun4(object):
1275     """verify methods for Tunnel v4"""
1276
1277     def verify_counters4(self, p, count, n_frags=None, worker=None):
1278         if not n_frags:
1279             n_frags = count
1280         if hasattr(p, "spd_policy_in_any"):
1281             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1282             self.assertEqual(
1283                 pkts,
1284                 count,
1285                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1286             )
1287
1288         if hasattr(p, "tun_sa_in"):
1289             pkts = p.tun_sa_in.get_stats(worker)["packets"]
1290             self.assertEqual(
1291                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1292             )
1293             pkts = p.tun_sa_out.get_stats(worker)["packets"]
1294             self.assertEqual(
1295                 pkts,
1296                 n_frags,
1297                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1298             )
1299
1300         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1301         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1302
1303     def verify_decrypted(self, p, rxs):
1304         for rx in rxs:
1305             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1306             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1307             self.assert_packet_checksums_valid(rx)
1308
1309     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1310         align = sa.crypt_algo.block_size
1311         if align < 4:
1312             align = 4
1313         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1314         exp_len += sa.crypt_algo.iv_size
1315         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1316         self.assertEqual(exp_len, len(esp_payload))
1317
1318     def verify_encrypted(self, p, sa, rxs):
1319         decrypt_pkts = []
1320         for rx in rxs:
1321             if p.nat_header:
1322                 self.assertEqual(rx[UDP].dport, p.nat_header.dport)
1323             self.assert_packet_checksums_valid(rx)
1324             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1325             try:
1326                 rx_ip = rx[IP]
1327                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1328                 if not decrypt_pkt.haslayer(IP):
1329                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
1330                 if rx_ip.proto == socket.IPPROTO_ESP:
1331                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1332                 decrypt_pkts.append(decrypt_pkt)
1333                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1334                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1335             except:
1336                 self.logger.debug(ppp("Unexpected packet:", rx))
1337                 try:
1338                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1339                 except:
1340                     pass
1341                 raise
1342         pkts = reassemble4(decrypt_pkts)
1343         for pkt in pkts:
1344             self.assert_packet_checksums_valid(pkt)
1345
1346     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1347         self.vapi.cli("clear errors")
1348         self.vapi.cli("clear ipsec counters")
1349         self.vapi.cli("clear ipsec sa")
1350         if not n_rx:
1351             n_rx = count
1352         try:
1353             send_pkts = self.gen_encrypt_pkts(
1354                 p,
1355                 p.scapy_tun_sa,
1356                 self.tun_if,
1357                 src=p.remote_tun_if_host,
1358                 dst=self.pg1.remote_ip4,
1359                 count=count,
1360                 payload_size=payload_size,
1361             )
1362             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1363             self.verify_decrypted(p, recv_pkts)
1364
1365             send_pkts = self.gen_pkts(
1366                 self.pg1,
1367                 src=self.pg1.remote_ip4,
1368                 dst=p.remote_tun_if_host,
1369                 count=count,
1370                 payload_size=payload_size,
1371             )
1372             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1373             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1374
1375             for rx in recv_pkts:
1376                 self.assertEqual(rx[IP].src, p.tun_src)
1377                 self.assertEqual(rx[IP].dst, p.tun_dst)
1378
1379         finally:
1380             self.logger.info(self.vapi.ppcli("show error"))
1381             self.logger.info(self.vapi.ppcli("show ipsec all"))
1382
1383         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1384         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1385         self.verify_counters4(p, count, n_rx)
1386
1387     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1388         self.vapi.cli("clear errors")
1389         if not n_rx:
1390             n_rx = count
1391         try:
1392             send_pkts = self.gen_encrypt_pkts(
1393                 p,
1394                 p.scapy_tun_sa,
1395                 self.tun_if,
1396                 src=p.remote_tun_if_host,
1397                 dst=self.pg1.remote_ip4,
1398                 count=count,
1399             )
1400             self.send_and_assert_no_replies(self.tun_if, send_pkts)
1401
1402             send_pkts = self.gen_pkts(
1403                 self.pg1,
1404                 src=self.pg1.remote_ip4,
1405                 dst=p.remote_tun_if_host,
1406                 count=count,
1407                 payload_size=payload_size,
1408             )
1409             self.send_and_assert_no_replies(self.pg1, send_pkts)
1410
1411         finally:
1412             self.logger.info(self.vapi.ppcli("show error"))
1413             self.logger.info(self.vapi.ppcli("show ipsec all"))
1414
1415     def verify_tun_reass_44(self, p):
1416         self.vapi.cli("clear errors")
1417         self.vapi.ip_reassembly_enable_disable(
1418             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1419         )
1420
1421         try:
1422             send_pkts = self.gen_encrypt_pkts(
1423                 p,
1424                 p.scapy_tun_sa,
1425                 self.tun_if,
1426                 src=p.remote_tun_if_host,
1427                 dst=self.pg1.remote_ip4,
1428                 payload_size=1900,
1429                 count=1,
1430             )
1431             send_pkts = fragment_rfc791(send_pkts[0], 1400)
1432             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1433             self.verify_decrypted(p, recv_pkts)
1434
1435             send_pkts = self.gen_pkts(
1436                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1437             )
1438             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1439             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1440
1441         finally:
1442             self.logger.info(self.vapi.ppcli("show error"))
1443             self.logger.info(self.vapi.ppcli("show ipsec all"))
1444
1445         self.verify_counters4(p, 1, 1)
1446         self.vapi.ip_reassembly_enable_disable(
1447             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1448         )
1449
1450     def verify_tun_64(self, p, count=1):
1451         self.vapi.cli("clear errors")
1452         self.vapi.cli("clear ipsec sa")
1453         try:
1454             send_pkts = self.gen_encrypt_pkts6(
1455                 p,
1456                 p.scapy_tun_sa,
1457                 self.tun_if,
1458                 src=p.remote_tun_if_host6,
1459                 dst=self.pg1.remote_ip6,
1460                 count=count,
1461             )
1462             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1463             for recv_pkt in recv_pkts:
1464                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1465                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1466                 self.assert_packet_checksums_valid(recv_pkt)
1467             send_pkts = self.gen_pkts6(
1468                 p,
1469                 self.pg1,
1470                 src=self.pg1.remote_ip6,
1471                 dst=p.remote_tun_if_host6,
1472                 count=count,
1473             )
1474             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1475             for recv_pkt in recv_pkts:
1476                 try:
1477                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1478                     if not decrypt_pkt.haslayer(IPv6):
1479                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1480                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1481                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1482                     self.assert_packet_checksums_valid(decrypt_pkt)
1483                 except:
1484                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
1485                     try:
1486                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1487                     except:
1488                         pass
1489                     raise
1490         finally:
1491             self.logger.info(self.vapi.ppcli("show error"))
1492             self.logger.info(self.vapi.ppcli("show ipsec all"))
1493
1494         self.verify_counters4(p, count)
1495
1496     def verify_keepalive(self, p):
1497         # the sizeof Raw is calculated to pad to the minimum ehternet
1498         # frame size of 64 btyes
1499         pkt = (
1500             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1501             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1502             / UDP(sport=333, dport=4500)
1503             / Raw(b"\xff")
1504             / Padding(0 * 21)
1505         )
1506         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1507         self.assert_error_counter_equal(
1508             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
1509         )
1510
1511         pkt = (
1512             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1513             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1514             / UDP(sport=333, dport=4500)
1515             / Raw(b"\xfe")
1516         )
1517         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1518         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
1519
1520         pkt = (
1521             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1522             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1523             / UDP(sport=333, dport=4500)
1524             / Raw(b"\xfe")
1525             / Padding(0 * 21)
1526         )
1527         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1528         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
1529
1530
1531 class IpsecTun4Tests(IpsecTun4):
1532     """UT test methods for Tunnel v4"""
1533
1534     def test_tun_basic44(self):
1535         """ipsec 4o4 tunnel basic test"""
1536         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1537         self.tun_if.admin_down()
1538         self.tun_if.resolve_arp()
1539         self.tun_if.admin_up()
1540         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1541
1542     def test_tun_reass_basic44(self):
1543         """ipsec 4o4 tunnel basic reassembly test"""
1544         self.verify_tun_reass_44(self.params[socket.AF_INET])
1545
1546     def test_tun_burst44(self):
1547         """ipsec 4o4 tunnel burst test"""
1548         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1549
1550
1551 class IpsecTun6(object):
1552     """verify methods for Tunnel v6"""
1553
1554     def verify_counters6(self, p_in, p_out, count, worker=None):
1555         if hasattr(p_in, "tun_sa_in"):
1556             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1557             self.assertEqual(
1558                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1559             )
1560         if hasattr(p_out, "tun_sa_out"):
1561             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1562             self.assertEqual(
1563                 pkts,
1564                 count,
1565                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1566             )
1567         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1568         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1569
1570     def verify_decrypted6(self, p, rxs):
1571         for rx in rxs:
1572             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1573             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1574             self.assert_packet_checksums_valid(rx)
1575
1576     def verify_encrypted6(self, p, sa, rxs):
1577         for rx in rxs:
1578             self.assert_packet_checksums_valid(rx)
1579             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1580             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1581             if p.outer_flow_label:
1582                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1583             try:
1584                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1585                 if not decrypt_pkt.haslayer(IPv6):
1586                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1587                 self.assert_packet_checksums_valid(decrypt_pkt)
1588                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1589                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1590                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1591                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1592             except:
1593                 self.logger.debug(ppp("Unexpected packet:", rx))
1594                 try:
1595                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1596                 except:
1597                     pass
1598                 raise
1599
1600     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1601         self.vapi.cli("clear errors")
1602         self.vapi.cli("clear ipsec sa")
1603
1604         send_pkts = self.gen_pkts6(
1605             p_in,
1606             self.pg1,
1607             src=self.pg1.remote_ip6,
1608             dst=p_in.remote_tun_if_host,
1609             count=count,
1610             payload_size=payload_size,
1611         )
1612         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1613         self.logger.info(self.vapi.cli("sh punt stats"))
1614
1615     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1616         self.vapi.cli("clear errors")
1617         self.vapi.cli("clear ipsec sa")
1618
1619         send_pkts = self.gen_encrypt_pkts6(
1620             p_in,
1621             p_in.scapy_tun_sa,
1622             self.tun_if,
1623             src=p_in.remote_tun_if_host,
1624             dst=self.pg1.remote_ip6,
1625             count=count,
1626         )
1627         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1628
1629     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1630         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1631         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1632
1633     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1634         self.vapi.cli("clear errors")
1635         self.vapi.cli("clear ipsec sa")
1636         if not p_out:
1637             p_out = p_in
1638         try:
1639             send_pkts = self.gen_encrypt_pkts6(
1640                 p_in,
1641                 p_in.scapy_tun_sa,
1642                 self.tun_if,
1643                 src=p_in.remote_tun_if_host,
1644                 dst=self.pg1.remote_ip6,
1645                 count=count,
1646                 payload_size=payload_size,
1647             )
1648             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1649             self.verify_decrypted6(p_in, recv_pkts)
1650
1651             send_pkts = self.gen_pkts6(
1652                 p_in,
1653                 self.pg1,
1654                 src=self.pg1.remote_ip6,
1655                 dst=p_out.remote_tun_if_host,
1656                 count=count,
1657                 payload_size=payload_size,
1658             )
1659             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1660             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1661
1662             for rx in recv_pkts:
1663                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1664                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1665
1666         finally:
1667             self.logger.info(self.vapi.ppcli("show error"))
1668             self.logger.info(self.vapi.ppcli("show ipsec all"))
1669         self.verify_counters6(p_in, p_out, count)
1670
1671     def verify_tun_reass_66(self, p):
1672         self.vapi.cli("clear errors")
1673         self.vapi.ip_reassembly_enable_disable(
1674             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1675         )
1676
1677         try:
1678             send_pkts = self.gen_encrypt_pkts6(
1679                 p,
1680                 p.scapy_tun_sa,
1681                 self.tun_if,
1682                 src=p.remote_tun_if_host,
1683                 dst=self.pg1.remote_ip6,
1684                 count=1,
1685                 payload_size=1850,
1686             )
1687             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1688             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1689             self.verify_decrypted6(p, recv_pkts)
1690
1691             send_pkts = self.gen_pkts6(
1692                 p,
1693                 self.pg1,
1694                 src=self.pg1.remote_ip6,
1695                 dst=p.remote_tun_if_host,
1696                 count=1,
1697                 payload_size=64,
1698             )
1699             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1700             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1701         finally:
1702             self.logger.info(self.vapi.ppcli("show error"))
1703             self.logger.info(self.vapi.ppcli("show ipsec all"))
1704         self.verify_counters6(p, p, 1)
1705         self.vapi.ip_reassembly_enable_disable(
1706             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1707         )
1708
1709     def verify_tun_46(self, p, count=1):
1710         """ipsec 4o6 tunnel basic test"""
1711         self.vapi.cli("clear errors")
1712         self.vapi.cli("clear ipsec sa")
1713         try:
1714             send_pkts = self.gen_encrypt_pkts(
1715                 p,
1716                 p.scapy_tun_sa,
1717                 self.tun_if,
1718                 src=p.remote_tun_if_host4,
1719                 dst=self.pg1.remote_ip4,
1720                 count=count,
1721             )
1722             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1723             for recv_pkt in recv_pkts:
1724                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1725                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1726                 self.assert_packet_checksums_valid(recv_pkt)
1727             send_pkts = self.gen_pkts(
1728                 self.pg1,
1729                 src=self.pg1.remote_ip4,
1730                 dst=p.remote_tun_if_host4,
1731                 count=count,
1732             )
1733             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1734             for recv_pkt in recv_pkts:
1735                 try:
1736                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1737                     if not decrypt_pkt.haslayer(IP):
1738                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1739                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1740                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1741                     self.assert_packet_checksums_valid(decrypt_pkt)
1742                 except:
1743                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1744                     try:
1745                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1746                     except:
1747                         pass
1748                     raise
1749         finally:
1750             self.logger.info(self.vapi.ppcli("show error"))
1751             self.logger.info(self.vapi.ppcli("show ipsec all"))
1752         self.verify_counters6(p, p, count)
1753
1754     def verify_keepalive(self, p):
1755         # the sizeof Raw is calculated to pad to the minimum ehternet
1756         # frame size of 64 btyes
1757         pkt = (
1758             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1759             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1760             / UDP(sport=333, dport=4500)
1761             / Raw(b"\xff")
1762             / Padding(0 * 1)
1763         )
1764         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1765         self.assert_error_counter_equal(
1766             "/err/%s/nat_keepalive" % self.tun6_input_node, 31
1767         )
1768
1769         pkt = (
1770             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1771             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1772             / UDP(sport=333, dport=4500)
1773             / Raw(b"\xfe")
1774         )
1775         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1776         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
1777
1778         pkt = (
1779             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1780             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1781             / UDP(sport=333, dport=4500)
1782             / Raw(b"\xfe")
1783             / Padding(0 * 21)
1784         )
1785         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1786         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
1787
1788
1789 class IpsecTun6Tests(IpsecTun6):
1790     """UT test methods for Tunnel v6"""
1791
1792     def test_tun_basic66(self):
1793         """ipsec 6o6 tunnel basic test"""
1794         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1795
1796     def test_tun_reass_basic66(self):
1797         """ipsec 6o6 tunnel basic reassembly test"""
1798         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1799
1800     def test_tun_burst66(self):
1801         """ipsec 6o6 tunnel burst test"""
1802         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1803
1804
1805 class IpsecTun6HandoffTests(IpsecTun6):
1806     """UT test methods for Tunnel v6 with multiple workers"""
1807
1808     vpp_worker_count = 2
1809
1810     def test_tun_handoff_66(self):
1811         """ipsec 6o6 tunnel worker hand-off test"""
1812         self.vapi.cli("clear errors")
1813         self.vapi.cli("clear ipsec sa")
1814
1815         N_PKTS = 15
1816         p = self.params[socket.AF_INET6]
1817
1818         # inject alternately on worker 0 and 1. all counts on the SA
1819         # should be against worker 0
1820         for worker in [0, 1, 0, 1]:
1821             send_pkts = self.gen_encrypt_pkts6(
1822                 p,
1823                 p.scapy_tun_sa,
1824                 self.tun_if,
1825                 src=p.remote_tun_if_host,
1826                 dst=self.pg1.remote_ip6,
1827                 count=N_PKTS,
1828             )
1829             recv_pkts = self.send_and_expect(
1830                 self.tun_if, send_pkts, self.pg1, worker=worker
1831             )
1832             self.verify_decrypted6(p, recv_pkts)
1833
1834             send_pkts = self.gen_pkts6(
1835                 p,
1836                 self.pg1,
1837                 src=self.pg1.remote_ip6,
1838                 dst=p.remote_tun_if_host,
1839                 count=N_PKTS,
1840             )
1841             recv_pkts = self.send_and_expect(
1842                 self.pg1, send_pkts, self.tun_if, worker=worker
1843             )
1844             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1845
1846         # all counts against the first worker that was used
1847         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1848
1849
1850 class IpsecTun4HandoffTests(IpsecTun4):
1851     """UT test methods for Tunnel v4 with multiple workers"""
1852
1853     vpp_worker_count = 2
1854
1855     def test_tun_handooff_44(self):
1856         """ipsec 4o4 tunnel worker hand-off test"""
1857         self.vapi.cli("clear errors")
1858         self.vapi.cli("clear ipsec sa")
1859
1860         N_PKTS = 15
1861         p = self.params[socket.AF_INET]
1862
1863         # inject alternately on worker 0 and 1. all counts on the SA
1864         # should be against worker 0
1865         for worker in [0, 1, 0, 1]:
1866             send_pkts = self.gen_encrypt_pkts(
1867                 p,
1868                 p.scapy_tun_sa,
1869                 self.tun_if,
1870                 src=p.remote_tun_if_host,
1871                 dst=self.pg1.remote_ip4,
1872                 count=N_PKTS,
1873             )
1874             recv_pkts = self.send_and_expect(
1875                 self.tun_if, send_pkts, self.pg1, worker=worker
1876             )
1877             self.verify_decrypted(p, recv_pkts)
1878
1879             send_pkts = self.gen_pkts(
1880                 self.pg1,
1881                 src=self.pg1.remote_ip4,
1882                 dst=p.remote_tun_if_host,
1883                 count=N_PKTS,
1884             )
1885             recv_pkts = self.send_and_expect(
1886                 self.pg1, send_pkts, self.tun_if, worker=worker
1887             )
1888             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1889
1890         # all counts against the first worker that was used
1891         self.verify_counters4(p, 4 * N_PKTS, worker=0)
1892
1893
1894 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1895     """UT test methods for Tunnel v6 & v4"""
1896
1897     pass
1898
1899
1900 class IPSecIPv4Fwd(VppTestCase):
1901     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1902
1903     @classmethod
1904     def setUpConstants(cls):
1905         super(IPSecIPv4Fwd, cls).setUpConstants()
1906
1907     def setUp(self):
1908         super(IPSecIPv4Fwd, self).setUp()
1909         # store SPD objects so we can remove configs on tear down
1910         self.spd_objs = []
1911         self.spd_policies = []
1912
1913     def tearDown(self):
1914         # remove SPD policies
1915         for obj in self.spd_policies:
1916             obj.remove_vpp_config()
1917         self.spd_policies = []
1918         # remove SPD items (interface bindings first, then SPD)
1919         for obj in reversed(self.spd_objs):
1920             obj.remove_vpp_config()
1921         self.spd_objs = []
1922         # close down pg intfs
1923         for pg in self.pg_interfaces:
1924             pg.unconfig_ip4()
1925             pg.admin_down()
1926         super(IPSecIPv4Fwd, self).tearDown()
1927
1928     def create_interfaces(self, num_ifs=2):
1929         # create interfaces pg0 ... pg<num_ifs>
1930         self.create_pg_interfaces(range(num_ifs))
1931         for pg in self.pg_interfaces:
1932             # put the interface up
1933             pg.admin_up()
1934             # configure IPv4 address on the interface
1935             pg.config_ip4()
1936             # resolve ARP, so that we know VPP MAC
1937             pg.resolve_arp()
1938         self.logger.info(self.vapi.ppcli("show int addr"))
1939
1940     def spd_create_and_intf_add(self, spd_id, pg_list):
1941         spd = VppIpsecSpd(self, spd_id)
1942         spd.add_vpp_config()
1943         self.spd_objs.append(spd)
1944         for pg in pg_list:
1945             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1946             spdItf.add_vpp_config()
1947             self.spd_objs.append(spdItf)
1948
1949     def get_policy(self, policy_type):
1950         e = VppEnum.vl_api_ipsec_spd_action_t
1951         if policy_type == "protect":
1952             return e.IPSEC_API_SPD_ACTION_PROTECT
1953         elif policy_type == "bypass":
1954             return e.IPSEC_API_SPD_ACTION_BYPASS
1955         elif policy_type == "discard":
1956             return e.IPSEC_API_SPD_ACTION_DISCARD
1957         else:
1958             raise Exception("Invalid policy type: %s", policy_type)
1959
1960     def spd_add_rem_policy(
1961         self,
1962         spd_id,
1963         src_if,
1964         dst_if,
1965         proto,
1966         is_out,
1967         priority,
1968         policy_type,
1969         remove=False,
1970         all_ips=False,
1971         ip_range=False,
1972         local_ip_start=ip_address("0.0.0.0"),
1973         local_ip_stop=ip_address("255.255.255.255"),
1974         remote_ip_start=ip_address("0.0.0.0"),
1975         remote_ip_stop=ip_address("255.255.255.255"),
1976         remote_port_start=0,
1977         remote_port_stop=65535,
1978         local_port_start=0,
1979         local_port_stop=65535,
1980     ):
1981         spd = VppIpsecSpd(self, spd_id)
1982
1983         if all_ips:
1984             src_range_low = ip_address("0.0.0.0")
1985             src_range_high = ip_address("255.255.255.255")
1986             dst_range_low = ip_address("0.0.0.0")
1987             dst_range_high = ip_address("255.255.255.255")
1988
1989         elif ip_range:
1990             src_range_low = local_ip_start
1991             src_range_high = local_ip_stop
1992             dst_range_low = remote_ip_start
1993             dst_range_high = remote_ip_stop
1994
1995         else:
1996             src_range_low = src_if.remote_ip4
1997             src_range_high = src_if.remote_ip4
1998             dst_range_low = dst_if.remote_ip4
1999             dst_range_high = dst_if.remote_ip4
2000
2001         spdEntry = VppIpsecSpdEntry(
2002             self,
2003             spd,
2004             0,
2005             src_range_low,
2006             src_range_high,
2007             dst_range_low,
2008             dst_range_high,
2009             proto,
2010             priority=priority,
2011             policy=self.get_policy(policy_type),
2012             is_outbound=is_out,
2013             remote_port_start=remote_port_start,
2014             remote_port_stop=remote_port_stop,
2015             local_port_start=local_port_start,
2016             local_port_stop=local_port_stop,
2017         )
2018
2019         if remove is False:
2020             spdEntry.add_vpp_config()
2021             self.spd_policies.append(spdEntry)
2022         else:
2023             spdEntry.remove_vpp_config()
2024             self.spd_policies.remove(spdEntry)
2025         self.logger.info(self.vapi.ppcli("show ipsec all"))
2026         return spdEntry
2027
2028     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2029         packets = []
2030         for i in range(pkt_count):
2031             # create packet info stored in the test case instance
2032             info = self.create_packet_info(src_if, dst_if)
2033             # convert the info into packet payload
2034             payload = self.info_to_payload(info)
2035             # create the packet itself
2036             p = (
2037                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2038                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2039                 / UDP(sport=src_prt, dport=dst_prt)
2040                 / Raw(payload)
2041             )
2042             # store a copy of the packet in the packet info
2043             info.data = p.copy()
2044             # append the packet to the list
2045             packets.append(p)
2046         # return the created packet list
2047         return packets
2048
2049     def verify_capture(self, src_if, dst_if, capture):
2050         packet_info = None
2051         for packet in capture:
2052             try:
2053                 ip = packet[IP]
2054                 udp = packet[UDP]
2055                 # convert the payload to packet info object
2056                 payload_info = self.payload_to_info(packet)
2057                 # make sure the indexes match
2058                 self.assert_equal(
2059                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2060                 )
2061                 self.assert_equal(
2062                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2063                 )
2064                 packet_info = self.get_next_packet_info_for_interface2(
2065                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2066                 )
2067                 # make sure we didn't run out of saved packets
2068                 self.assertIsNotNone(packet_info)
2069                 self.assert_equal(
2070                     payload_info.index, packet_info.index, "packet info index"
2071                 )
2072                 saved_packet = packet_info.data  # fetch the saved packet
2073                 # assert the values match
2074                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2075                 # ... more assertions here
2076                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2077             except Exception as e:
2078                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2079                 raise
2080         remaining_packet = self.get_next_packet_info_for_interface2(
2081             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2082         )
2083         self.assertIsNone(
2084             remaining_packet,
2085             "Interface %s: Packet expected from interface "
2086             "%s didn't arrive" % (dst_if.name, src_if.name),
2087         )
2088
2089     def verify_policy_match(self, pkt_count, spdEntry):
2090         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2091         matched_pkts = spdEntry.get_stats().get("packets")
2092         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2093         self.assert_equal(pkt_count, matched_pkts)
2094
2095
2096 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2097     @classmethod
2098     def setUpConstants(cls):
2099         super(SpdFlowCacheTemplate, cls).setUpConstants()
2100         # Override this method with required cmdline parameters e.g.
2101         # cls.vpp_cmdline.extend(["ipsec", "{",
2102         #                         "ipv4-outbound-spd-flow-cache on",
2103         #                         "}"])
2104         # cls.logger.info("VPP modified cmdline is %s" % " "
2105         #                 .join(cls.vpp_cmdline))
2106
2107     def setUp(self):
2108         super(SpdFlowCacheTemplate, self).setUp()
2109
2110     def tearDown(self):
2111         super(SpdFlowCacheTemplate, self).tearDown()
2112
2113     def get_spd_flow_cache_entries(self, outbound):
2114         """'show ipsec spd' output:
2115         ipv4-inbound-spd-flow-cache-entries: 0
2116         ipv4-outbound-spd-flow-cache-entries: 0
2117         """
2118         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2119         # match the relevant section of 'show ipsec spd' output
2120         if outbound:
2121             regex_match = re.search(
2122                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2123                 show_ipsec_reply,
2124                 re.DOTALL,
2125             )
2126         else:
2127             regex_match = re.search(
2128                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2129             )
2130         if regex_match is None:
2131             raise Exception(
2132                 "Unable to find spd flow cache entries \
2133                 in 'show ipsec spd' CLI output - regex failed to match"
2134             )
2135         else:
2136             try:
2137                 num_entries = int(regex_match.group(1))
2138             except ValueError:
2139                 raise Exception(
2140                     "Unable to get spd flow cache entries \
2141                 from 'show ipsec spd' string: %s",
2142                     regex_match.group(0),
2143                 )
2144             self.logger.info("%s", regex_match.group(0))
2145         return num_entries
2146
2147     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2148         self.assertEqual(
2149             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2150         )
2151
2152     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2153         self.assertEqual(
2154             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2155         )
2156
2157     def crc32_supported(self):
2158         # lscpu is part of util-linux package, available on all Linux Distros
2159         stream = os.popen("lscpu")
2160         cpu_info = stream.read()
2161         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2162         # see vppinfra/crc32.h
2163         if "crc32" or "sse4_2" in cpu_info:
2164             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2165             return True
2166         else:
2167             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2168             return False
2169
2170
2171 class IPSecIPv6Fwd(VppTestCase):
2172     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2173
2174     @classmethod
2175     def setUpConstants(cls):
2176         super(IPSecIPv6Fwd, cls).setUpConstants()
2177
2178     def setUp(self):
2179         super(IPSecIPv6Fwd, self).setUp()
2180         # store SPD objects so we can remove configs on tear down
2181         self.spd_objs = []
2182         self.spd_policies = []
2183
2184     def tearDown(self):
2185         # remove SPD policies
2186         for obj in self.spd_policies:
2187             obj.remove_vpp_config()
2188         self.spd_policies = []
2189         # remove SPD items (interface bindings first, then SPD)
2190         for obj in reversed(self.spd_objs):
2191             obj.remove_vpp_config()
2192         self.spd_objs = []
2193         # close down pg intfs
2194         for pg in self.pg_interfaces:
2195             pg.unconfig_ip6()
2196             pg.admin_down()
2197         super(IPSecIPv6Fwd, self).tearDown()
2198
2199     def create_interfaces(self, num_ifs=2):
2200         # create interfaces pg0 ... pg<num_ifs>
2201         self.create_pg_interfaces(range(num_ifs))
2202         for pg in self.pg_interfaces:
2203             # put the interface up
2204             pg.admin_up()
2205             # configure IPv6 address on the interface
2206             pg.config_ip6()
2207             pg.resolve_ndp()
2208         self.logger.info(self.vapi.ppcli("show int addr"))
2209
2210     def spd_create_and_intf_add(self, spd_id, pg_list):
2211         spd = VppIpsecSpd(self, spd_id)
2212         spd.add_vpp_config()
2213         self.spd_objs.append(spd)
2214         for pg in pg_list:
2215             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2216             spdItf.add_vpp_config()
2217             self.spd_objs.append(spdItf)
2218
2219     def get_policy(self, policy_type):
2220         e = VppEnum.vl_api_ipsec_spd_action_t
2221         if policy_type == "protect":
2222             return e.IPSEC_API_SPD_ACTION_PROTECT
2223         elif policy_type == "bypass":
2224             return e.IPSEC_API_SPD_ACTION_BYPASS
2225         elif policy_type == "discard":
2226             return e.IPSEC_API_SPD_ACTION_DISCARD
2227         else:
2228             raise Exception("Invalid policy type: %s", policy_type)
2229
2230     def spd_add_rem_policy(
2231         self,
2232         spd_id,
2233         src_if,
2234         dst_if,
2235         proto,
2236         is_out,
2237         priority,
2238         policy_type,
2239         remove=False,
2240         all_ips=False,
2241         ip_range=False,
2242         local_ip_start=ip_address("0::0"),
2243         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2244         remote_ip_start=ip_address("0::0"),
2245         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2246         remote_port_start=0,
2247         remote_port_stop=65535,
2248         local_port_start=0,
2249         local_port_stop=65535,
2250     ):
2251         spd = VppIpsecSpd(self, spd_id)
2252
2253         if all_ips:
2254             src_range_low = ip_address("0::0")
2255             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2256             dst_range_low = ip_address("0::0")
2257             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2258
2259         elif ip_range:
2260             src_range_low = local_ip_start
2261             src_range_high = local_ip_stop
2262             dst_range_low = remote_ip_start
2263             dst_range_high = remote_ip_stop
2264
2265         else:
2266             src_range_low = src_if.remote_ip6
2267             src_range_high = src_if.remote_ip6
2268             dst_range_low = dst_if.remote_ip6
2269             dst_range_high = dst_if.remote_ip6
2270
2271         spdEntry = VppIpsecSpdEntry(
2272             self,
2273             spd,
2274             0,
2275             src_range_low,
2276             src_range_high,
2277             dst_range_low,
2278             dst_range_high,
2279             proto,
2280             priority=priority,
2281             policy=self.get_policy(policy_type),
2282             is_outbound=is_out,
2283             remote_port_start=remote_port_start,
2284             remote_port_stop=remote_port_stop,
2285             local_port_start=local_port_start,
2286             local_port_stop=local_port_stop,
2287         )
2288
2289         if remove is False:
2290             spdEntry.add_vpp_config()
2291             self.spd_policies.append(spdEntry)
2292         else:
2293             spdEntry.remove_vpp_config()
2294             self.spd_policies.remove(spdEntry)
2295         self.logger.info(self.vapi.ppcli("show ipsec all"))
2296         return spdEntry
2297
2298     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2299         packets = []
2300         for i in range(pkt_count):
2301             # create packet info stored in the test case instance
2302             info = self.create_packet_info(src_if, dst_if)
2303             # convert the info into packet payload
2304             payload = self.info_to_payload(info)
2305             # create the packet itself
2306             p = (
2307                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2308                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2309                 / UDP(sport=src_prt, dport=dst_prt)
2310                 / Raw(payload)
2311             )
2312             # store a copy of the packet in the packet info
2313             info.data = p.copy()
2314             # append the packet to the list
2315             packets.append(p)
2316         # return the created packet list
2317         return packets
2318
2319     def verify_capture(self, src_if, dst_if, capture):
2320         packet_info = None
2321         for packet in capture:
2322             try:
2323                 ip = packet[IPv6]
2324                 udp = packet[UDP]
2325                 # convert the payload to packet info object
2326                 payload_info = self.payload_to_info(packet)
2327                 # make sure the indexes match
2328                 self.assert_equal(
2329                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2330                 )
2331                 self.assert_equal(
2332                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2333                 )
2334                 packet_info = self.get_next_packet_info_for_interface2(
2335                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2336                 )
2337                 # make sure we didn't run out of saved packets
2338                 self.assertIsNotNone(packet_info)
2339                 self.assert_equal(
2340                     payload_info.index, packet_info.index, "packet info index"
2341                 )
2342                 saved_packet = packet_info.data  # fetch the saved packet
2343                 # assert the values match
2344                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2345                 # ... more assertions here
2346                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2347             except Exception as e:
2348                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2349                 raise
2350         remaining_packet = self.get_next_packet_info_for_interface2(
2351             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2352         )
2353         self.assertIsNone(
2354             remaining_packet,
2355             "Interface %s: Packet expected from interface "
2356             "%s didn't arrive" % (dst_if.name, src_if.name),
2357         )
2358
2359     def verify_policy_match(self, pkt_count, spdEntry):
2360         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2361         matched_pkts = spdEntry.get_stats().get("packets")
2362         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2363         self.assert_equal(pkt_count, matched_pkts)
2364
2365
2366 if __name__ == "__main__":
2367     unittest.main(testRunner=VppTestRunner)