tests: add fast path ipv6 python tests for outbound policy matching
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.auth_algo_vpp_id = (
56             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
57         )
58         self.auth_algo = "HMAC-SHA1-96"  # scapy name
59         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
60
61         self.crypt_algo_vpp_id = (
62             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
63         )
64         self.crypt_algo = "AES-CBC"  # scapy name
65         self.crypt_key = b"JPjyOWBeVEQiMe7h"
66         self.salt = 0
67         self.flags = 0
68         self.nat_header = None
69         self.tun_flags = (
70             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
71         )
72         self.dscp = 0
73         self.async_mode = False
74
75
76 class IPsecIPv6Params:
77
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.auth_algo_vpp_id = (
104             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
105         )
106         self.auth_algo = "HMAC-SHA1-96"  # scapy name
107         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
108
109         self.crypt_algo_vpp_id = (
110             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
111         )
112         self.crypt_algo = "AES-CBC"  # scapy name
113         self.crypt_key = b"JPjyOWBeVEQiMe7h"
114         self.salt = 0
115         self.flags = 0
116         self.nat_header = None
117         self.tun_flags = (
118             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
119         )
120         self.dscp = 0
121         self.async_mode = False
122
123
124 def mk_scapy_crypt_key(p):
125     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
126         return p.crypt_key + struct.pack("!I", p.salt)
127     else:
128         return p.crypt_key
129
130
131 def config_tun_params(p, encryption_type, tun_if):
132     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
133     esn_en = bool(
134         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
135     )
136     p.tun_dst = tun_if.remote_addr[p.addr_type]
137     p.tun_src = tun_if.local_addr[p.addr_type]
138     crypt_key = mk_scapy_crypt_key(p)
139     p.scapy_tun_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.vpp_tun_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147         nat_t_header=p.nat_header,
148         esn_en=esn_en,
149     )
150     p.vpp_tun_sa = SecurityAssociation(
151         encryption_type,
152         spi=p.scapy_tun_spi,
153         crypt_algo=p.crypt_algo,
154         crypt_key=crypt_key,
155         auth_algo=p.auth_algo,
156         auth_key=p.auth_key,
157         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158         nat_t_header=p.nat_header,
159         esn_en=esn_en,
160     )
161
162
163 def config_tra_params(p, encryption_type):
164     esn_en = bool(
165         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
166     )
167     crypt_key = mk_scapy_crypt_key(p)
168     p.scapy_tra_sa = SecurityAssociation(
169         encryption_type,
170         spi=p.vpp_tra_spi,
171         crypt_algo=p.crypt_algo,
172         crypt_key=crypt_key,
173         auth_algo=p.auth_algo,
174         auth_key=p.auth_key,
175         nat_t_header=p.nat_header,
176         esn_en=esn_en,
177     )
178     p.vpp_tra_sa = SecurityAssociation(
179         encryption_type,
180         spi=p.scapy_tra_spi,
181         crypt_algo=p.crypt_algo,
182         crypt_key=crypt_key,
183         auth_algo=p.auth_algo,
184         auth_key=p.auth_key,
185         nat_t_header=p.nat_header,
186         esn_en=esn_en,
187     )
188
189
190 class TemplateIpsec(VppTestCase):
191     """
192     TRANSPORT MODE::
193
194          ------   encrypt   ---
195         |tra_if| <-------> |VPP|
196          ------   decrypt   ---
197
198     TUNNEL MODE::
199
200          ------   encrypt   ---   plain   ---
201         |tun_if| <-------  |VPP| <------ |pg1|
202          ------             ---           ---
203
204          ------   decrypt   ---   plain   ---
205         |tun_if| ------->  |VPP| ------> |pg1|
206          ------             ---           ---
207     """
208
209     tun_spd_id = 1
210     tra_spd_id = 2
211
212     def ipsec_select_backend(self):
213         """empty method to be overloaded when necessary"""
214         pass
215
216     @classmethod
217     def setUpClass(cls):
218         super(TemplateIpsec, cls).setUpClass()
219
220     @classmethod
221     def tearDownClass(cls):
222         super(TemplateIpsec, cls).tearDownClass()
223
224     def setup_params(self):
225         if not hasattr(self, "ipv4_params"):
226             self.ipv4_params = IPsecIPv4Params()
227         if not hasattr(self, "ipv6_params"):
228             self.ipv6_params = IPsecIPv6Params()
229         self.params = {
230             self.ipv4_params.addr_type: self.ipv4_params,
231             self.ipv6_params.addr_type: self.ipv6_params,
232         }
233
234     def config_interfaces(self):
235         self.create_pg_interfaces(range(3))
236         self.interfaces = list(self.pg_interfaces)
237         for i in self.interfaces:
238             i.admin_up()
239             i.config_ip4()
240             i.resolve_arp()
241             i.config_ip6()
242             i.resolve_ndp()
243
244     def setUp(self):
245         super(TemplateIpsec, self).setUp()
246
247         self.setup_params()
248
249         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
251
252         self.config_interfaces()
253
254         self.ipsec_select_backend()
255
256     def unconfig_interfaces(self):
257         for i in self.interfaces:
258             i.admin_down()
259             i.unconfig_ip4()
260             i.unconfig_ip6()
261
262     def tearDown(self):
263         super(TemplateIpsec, self).tearDown()
264
265         self.unconfig_interfaces()
266
267     def show_commands_at_teardown(self):
268         self.logger.info(self.vapi.cli("show hardware"))
269
270     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
271         return [
272             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274             for i in range(count)
275         ]
276
277     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
278         return [
279             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
280             / sa.encrypt(
281                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
283             )
284             for i in range(count)
285         ]
286
287     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
288         return [
289             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290             / IP(src=src, dst=dst)
291             / ICMP()
292             / Raw(b"X" * payload_size)
293             for i in range(count)
294         ]
295
296     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
297         return [
298             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301             for i in range(count)
302         ]
303
304
305 class IpsecTcp(object):
306     def verify_tcp_checksum(self):
307         # start http cli server listener on http://0.0.0.0:80
308         self.vapi.cli("http cli server")
309         p = self.params[socket.AF_INET]
310         send = Ether(
311             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312         ) / p.scapy_tun_sa.encrypt(
313             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314             / TCP(flags="S", dport=80)
315         )
316         self.logger.debug(ppp("Sending packet:", send))
317         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
318         recv = recv[0]
319         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320         self.assert_packet_checksums_valid(decrypted)
321
322
323 class IpsecTcpTests(IpsecTcp):
324     def test_tcp_checksum(self):
325         """verify checksum correctness for vpp generated packets"""
326         self.verify_tcp_checksum()
327
328
329 class IpsecTra4(object):
330     """verify methods for Transport v4"""
331
332     def get_replay_counts(self, p):
333         replay_node_name = "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[0]
334         count = self.statistics.get_err_counter(replay_node_name)
335
336         if p.async_mode:
337             replay_post_node_name = (
338                 "/err/%s/SA replayed packet" % self.tra4_decrypt_node_name[p.async_mode]
339             )
340             count += self.statistics.get_err_counter(replay_post_node_name)
341
342         return count
343
344     def get_hash_failed_counts(self, p):
345         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
346             hash_failed_node_name = (
347                 "/err/%s/ESP decryption failed"
348                 % self.tra4_decrypt_node_name[p.async_mode]
349             )
350         else:
351             hash_failed_node_name = (
352                 "/err/%s/Integrity check failed"
353                 % self.tra4_decrypt_node_name[p.async_mode]
354             )
355         count = self.statistics.get_err_counter(hash_failed_node_name)
356
357         if p.async_mode:
358             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
359
360         return count
361
362     def verify_hi_seq_num(self):
363         p = self.params[socket.AF_INET]
364         saf = VppEnum.vl_api_ipsec_sad_flags_t
365         esn_on = p.vpp_tra_sa.esn_en
366         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
367
368         seq_cycle_node_name = (
369             "/err/%s/sequence number cycled (packet dropped)"
370             % self.tra4_encrypt_node_name
371         )
372         replay_count = self.get_replay_counts(p)
373         hash_failed_count = self.get_hash_failed_counts(p)
374         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
375
376         # a few packets so we get the rx seq number above the window size and
377         # thus can simulate a wrap with an out of window packet
378         pkts = [
379             (
380                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
381                 / p.scapy_tra_sa.encrypt(
382                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
383                     seq_num=seq,
384                 )
385             )
386             for seq in range(63, 80)
387         ]
388         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
389
390         # these 4 packets will all choose seq-num 0 to decrpyt since none
391         # are out of window when first checked. however, once #200 has
392         # decrypted it will move the window to 200 and has #81 is out of
393         # window. this packet should be dropped.
394         pkts = [
395             (
396                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
397                 / p.scapy_tra_sa.encrypt(
398                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
399                     seq_num=200,
400                 )
401             ),
402             (
403                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
404                 / p.scapy_tra_sa.encrypt(
405                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
406                     seq_num=81,
407                 )
408             ),
409             (
410                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
411                 / p.scapy_tra_sa.encrypt(
412                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
413                     seq_num=201,
414                 )
415             ),
416             (
417                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
418                 / p.scapy_tra_sa.encrypt(
419                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
420                     seq_num=202,
421                 )
422             ),
423         ]
424
425         # if anti-replay is off then we won't drop #81
426         n_rx = 3 if ar_on else 4
427         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
428         # this packet is one before the wrap
429         pkts = [
430             (
431                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
432                 / p.scapy_tra_sa.encrypt(
433                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
434                     seq_num=203,
435                 )
436             )
437         ]
438         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
439
440         # move the window over half way to a wrap
441         pkts = [
442             (
443                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
444                 / p.scapy_tra_sa.encrypt(
445                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
446                     seq_num=0x80000001,
447                 )
448             )
449         ]
450         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
451
452         # anti-replay will drop old packets, no anti-replay will not
453         pkts = [
454             (
455                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
456                 / p.scapy_tra_sa.encrypt(
457                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
458                     seq_num=0x44000001,
459                 )
460             )
461         ]
462
463         if ar_on:
464             self.send_and_assert_no_replies(self.tra_if, pkts)
465         else:
466             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
467
468         if esn_on:
469             #
470             # validate wrapping the ESN
471             #
472
473             # wrap scapy's TX SA SN
474             p.scapy_tra_sa.seq_num = 0x100000005
475
476             # send a packet that wraps the window for both AR and no AR
477             pkts = [
478                 (
479                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
480                     / p.scapy_tra_sa.encrypt(
481                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
482                         / ICMP(),
483                         seq_num=0x100000005,
484                     )
485                 )
486             ]
487
488             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
489             for rx in rxs:
490                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
491
492             # move the window forward to half way to the next wrap
493             pkts = [
494                 (
495                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
496                     / p.scapy_tra_sa.encrypt(
497                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
498                         / ICMP(),
499                         seq_num=0x180000005,
500                     )
501                 )
502             ]
503
504             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
505
506             # a packet less than 2^30 from the current position is:
507             #  - AR: out of window and dropped
508             #  - non-AR: accepted
509             pkts = [
510                 (
511                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
512                     / p.scapy_tra_sa.encrypt(
513                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
514                         / ICMP(),
515                         seq_num=0x170000005,
516                     )
517                 )
518             ]
519
520             if ar_on:
521                 self.send_and_assert_no_replies(self.tra_if, pkts)
522             else:
523                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
524
525             # a packet more than 2^30 from the current position is:
526             #  - AR: out of window and dropped
527             #  - non-AR: considered a wrap, but since it's not a wrap
528             #    it won't decrpyt and so will be dropped
529             pkts = [
530                 (
531                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
532                     / p.scapy_tra_sa.encrypt(
533                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
534                         / ICMP(),
535                         seq_num=0x130000005,
536                     )
537                 )
538             ]
539
540             self.send_and_assert_no_replies(self.tra_if, pkts)
541
542             # a packet less than 2^30 from the current position and is a
543             # wrap; (the seq is currently at 0x180000005).
544             #  - AR: out of window so considered a wrap, so accepted
545             #  - non-AR: not considered a wrap, so won't decrypt
546             p.scapy_tra_sa.seq_num = 0x260000005
547             pkts = [
548                 (
549                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
550                     / p.scapy_tra_sa.encrypt(
551                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
552                         / ICMP(),
553                         seq_num=0x260000005,
554                     )
555                 )
556             ]
557             if ar_on:
558                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
559             else:
560                 self.send_and_assert_no_replies(self.tra_if, pkts)
561
562             #
563             # window positions are different now for AR/non-AR
564             #  move non-AR forward
565             #
566             if not ar_on:
567                 # a packet more than 2^30 from the current position and is a
568                 # wrap; (the seq is currently at 0x180000005).
569                 #  - AR: accepted
570                 #  - non-AR: not considered a wrap, so won't decrypt
571
572                 pkts = [
573                     (
574                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
575                         / p.scapy_tra_sa.encrypt(
576                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
577                             / ICMP(),
578                             seq_num=0x200000005,
579                         )
580                     ),
581                     (
582                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
583                         / p.scapy_tra_sa.encrypt(
584                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
585                             / ICMP(),
586                             seq_num=0x200000006,
587                         )
588                     ),
589                 ]
590                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
591
592                 pkts = [
593                     (
594                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
595                         / p.scapy_tra_sa.encrypt(
596                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
597                             / ICMP(),
598                             seq_num=0x260000005,
599                         )
600                     )
601                 ]
602                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
603
604     def verify_tra_anti_replay(self):
605         p = self.params[socket.AF_INET]
606         esn_en = p.vpp_tra_sa.esn_en
607
608         seq_cycle_node_name = (
609             "/err/%s/sequence number cycled (packet dropped)"
610             % self.tra4_encrypt_node_name
611         )
612         replay_count = self.get_replay_counts(p)
613         hash_failed_count = self.get_hash_failed_counts(p)
614         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
615
616         if ESP == self.encryption_type:
617             undersize_node_name = (
618                 "/err/%s/undersized packet" % self.tra4_decrypt_node_name[0]
619             )
620             undersize_count = self.statistics.get_err_counter(undersize_node_name)
621
622         #
623         # send packets with seq numbers 1->34
624         # this means the window size is still in Case B (see RFC4303
625         # Appendix A)
626         #
627         # for reasons i haven't investigated Scapy won't create a packet with
628         # seq_num=0
629         #
630         pkts = [
631             (
632                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
633                 / p.scapy_tra_sa.encrypt(
634                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
635                     seq_num=seq,
636                 )
637             )
638             for seq in range(1, 34)
639         ]
640         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
641
642         # replayed packets are dropped
643         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
644         replay_count += len(pkts)
645         self.assertEqual(self.get_replay_counts(p), replay_count)
646
647         #
648         # now send a batch of packets all with the same sequence number
649         # the first packet in the batch is legitimate, the rest bogus
650         #
651         self.vapi.cli("clear error")
652         self.vapi.cli("clear node counters")
653         pkts = Ether(
654             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
655         ) / p.scapy_tra_sa.encrypt(
656             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
657             seq_num=35,
658         )
659         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
660         replay_count += 7
661         self.assertEqual(self.get_replay_counts(p), replay_count)
662
663         #
664         # now move the window over to 257 (more than one byte) and into Case A
665         #
666         self.vapi.cli("clear error")
667         pkt = Ether(
668             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
669         ) / p.scapy_tra_sa.encrypt(
670             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
671             seq_num=257,
672         )
673         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
674
675         # replayed packets are dropped
676         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
677         replay_count += 3
678         self.assertEqual(self.get_replay_counts(p), replay_count)
679
680         # the window size is 64 packets
681         # in window are still accepted
682         pkt = Ether(
683             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
684         ) / p.scapy_tra_sa.encrypt(
685             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
686             seq_num=200,
687         )
688         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
689
690         # a packet that does not decrypt does not move the window forward
691         bogus_sa = SecurityAssociation(
692             self.encryption_type,
693             p.vpp_tra_spi,
694             crypt_algo=p.crypt_algo,
695             crypt_key=mk_scapy_crypt_key(p)[::-1],
696             auth_algo=p.auth_algo,
697             auth_key=p.auth_key[::-1],
698         )
699         pkt = Ether(
700             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
701         ) / bogus_sa.encrypt(
702             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
703             seq_num=350,
704         )
705         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
706
707         hash_failed_count += 17
708         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
709
710         # a malformed 'runt' packet
711         #  created by a mis-constructed SA
712         if ESP == self.encryption_type and p.crypt_algo != "NULL":
713             bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi)
714             pkt = Ether(
715                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
716             ) / bogus_sa.encrypt(
717                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
718                 seq_num=350,
719             )
720             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
721
722             undersize_count += 17
723             self.assert_error_counter_equal(undersize_node_name, undersize_count)
724
725         # which we can determine since this packet is still in the window
726         pkt = Ether(
727             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
728         ) / p.scapy_tra_sa.encrypt(
729             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
730             seq_num=234,
731         )
732         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
733
734         #
735         # out of window are dropped
736         #  this is Case B. So VPP will consider this to be a high seq num wrap
737         #  and so the decrypt attempt will fail
738         #
739         pkt = Ether(
740             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
741         ) / p.scapy_tra_sa.encrypt(
742             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
743             seq_num=17,
744         )
745         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
746
747         if esn_en:
748             # an out of window error with ESN looks like a high sequence
749             # wrap. but since it isn't then the verify will fail.
750             hash_failed_count += 17
751             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
752
753         else:
754             replay_count += 17
755             self.assertEqual(self.get_replay_counts(p), replay_count)
756
757         # valid packet moves the window over to 258
758         pkt = Ether(
759             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
760         ) / p.scapy_tra_sa.encrypt(
761             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
762             seq_num=258,
763         )
764         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
765         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
766
767         #
768         # move VPP's SA TX seq-num to just before the seq-number wrap.
769         # then fire in a packet that VPP should drop on TX because it
770         # causes the TX seq number to wrap; unless we're using extened sequence
771         # numbers.
772         #
773         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
774         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
775         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
776
777         pkts = [
778             (
779                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
780                 / p.scapy_tra_sa.encrypt(
781                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
782                     seq_num=seq,
783                 )
784             )
785             for seq in range(259, 280)
786         ]
787
788         if esn_en:
789             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
790
791             #
792             # in order for scapy to decrypt its SA's high order number needs
793             # to wrap
794             #
795             p.vpp_tra_sa.seq_num = 0x100000000
796             for rx in rxs:
797                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
798
799             #
800             # wrap scapy's TX high sequence number. VPP is in case B, so it
801             # will consider this a high seq wrap also.
802             # The low seq num we set it to will place VPP's RX window in Case A
803             #
804             p.scapy_tra_sa.seq_num = 0x100000005
805             pkt = Ether(
806                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
807             ) / p.scapy_tra_sa.encrypt(
808                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
809                 seq_num=0x100000005,
810             )
811             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
812
813             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
814
815             #
816             # A packet that has seq num between (2^32-64) and 5 is within
817             # the window
818             #
819             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
820             pkt = Ether(
821                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
822             ) / p.scapy_tra_sa.encrypt(
823                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
824                 seq_num=0xFFFFFFFD,
825             )
826             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
827             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
828
829             #
830             # While in case A we cannot wrap the high sequence number again
831             # because VPP will consider this packet to be one that moves the
832             # window forward
833             #
834             pkt = Ether(
835                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
836             ) / p.scapy_tra_sa.encrypt(
837                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
838                 seq_num=0x200000999,
839             )
840             self.send_and_assert_no_replies(
841                 self.tra_if, [pkt], self.tra_if, timeout=0.2
842             )
843
844             hash_failed_count += 1
845             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
846
847             #
848             # but if we move the window forward to case B, then we can wrap
849             # again
850             #
851             p.scapy_tra_sa.seq_num = 0x100000555
852             pkt = Ether(
853                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
854             ) / p.scapy_tra_sa.encrypt(
855                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
856                 seq_num=0x100000555,
857             )
858             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
859             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
860
861             p.scapy_tra_sa.seq_num = 0x200000444
862             pkt = Ether(
863                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
864             ) / p.scapy_tra_sa.encrypt(
865                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
866                 seq_num=0x200000444,
867             )
868             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
869             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
870
871         else:
872             #
873             # without ESN TX sequence numbers can't wrap and packets are
874             # dropped from here on out.
875             #
876             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
877             seq_cycle_count += len(pkts)
878             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
879
880         # move the security-associations seq number on to the last we used
881         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
882         p.scapy_tra_sa.seq_num = 351
883         p.vpp_tra_sa.seq_num = 351
884
885     def verify_tra_lost(self):
886         p = self.params[socket.AF_INET]
887         esn_en = p.vpp_tra_sa.esn_en
888
889         #
890         # send packets with seq numbers 1->34
891         # this means the window size is still in Case B (see RFC4303
892         # Appendix A)
893         #
894         # for reasons i haven't investigated Scapy won't create a packet with
895         # seq_num=0
896         #
897         pkts = [
898             (
899                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
900                 / p.scapy_tra_sa.encrypt(
901                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
902                     seq_num=seq,
903                 )
904             )
905             for seq in range(1, 3)
906         ]
907         self.send_and_expect(self.tra_if, pkts, self.tra_if)
908
909         self.assertEqual(p.tra_sa_out.get_lost(), 0)
910
911         # skip a sequence number
912         pkts = [
913             (
914                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
915                 / p.scapy_tra_sa.encrypt(
916                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
917                     seq_num=seq,
918                 )
919             )
920             for seq in range(4, 6)
921         ]
922         self.send_and_expect(self.tra_if, pkts, self.tra_if)
923
924         self.assertEqual(p.tra_sa_out.get_lost(), 0)
925
926         # the lost packet are counted untill we get up past the first
927         # sizeof(replay_window) packets
928         pkts = [
929             (
930                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
931                 / p.scapy_tra_sa.encrypt(
932                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
933                     seq_num=seq,
934                 )
935             )
936             for seq in range(6, 100)
937         ]
938         self.send_and_expect(self.tra_if, pkts, self.tra_if)
939
940         self.assertEqual(p.tra_sa_out.get_lost(), 1)
941
942         # lost of holes in the sequence
943         pkts = [
944             (
945                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
946                 / p.scapy_tra_sa.encrypt(
947                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
948                     seq_num=seq,
949                 )
950             )
951             for seq in range(100, 200, 2)
952         ]
953         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
954
955         pkts = [
956             (
957                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
958                 / p.scapy_tra_sa.encrypt(
959                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
960                     seq_num=seq,
961                 )
962             )
963             for seq in range(200, 300)
964         ]
965         self.send_and_expect(self.tra_if, pkts, self.tra_if)
966
967         self.assertEqual(p.tra_sa_out.get_lost(), 51)
968
969         # a big hole in the seq number space
970         pkts = [
971             (
972                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
973                 / p.scapy_tra_sa.encrypt(
974                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
975                     seq_num=seq,
976                 )
977             )
978             for seq in range(400, 500)
979         ]
980         self.send_and_expect(self.tra_if, pkts, self.tra_if)
981
982         self.assertEqual(p.tra_sa_out.get_lost(), 151)
983
984     def verify_tra_basic4(self, count=1, payload_size=54):
985         """ipsec v4 transport basic test"""
986         self.vapi.cli("clear errors")
987         self.vapi.cli("clear ipsec sa")
988         try:
989             p = self.params[socket.AF_INET]
990             send_pkts = self.gen_encrypt_pkts(
991                 p,
992                 p.scapy_tra_sa,
993                 self.tra_if,
994                 src=self.tra_if.remote_ip4,
995                 dst=self.tra_if.local_ip4,
996                 count=count,
997                 payload_size=payload_size,
998             )
999             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1000             for rx in recv_pkts:
1001                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1002                 self.assert_packet_checksums_valid(rx)
1003                 try:
1004                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1005                     self.assert_packet_checksums_valid(decrypted)
1006                 except:
1007                     self.logger.debug(ppp("Unexpected packet:", rx))
1008                     raise
1009         finally:
1010             self.logger.info(self.vapi.ppcli("show error"))
1011             self.logger.info(self.vapi.ppcli("show ipsec all"))
1012
1013         pkts = p.tra_sa_in.get_stats()["packets"]
1014         self.assertEqual(
1015             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1016         )
1017         pkts = p.tra_sa_out.get_stats()["packets"]
1018         self.assertEqual(
1019             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1020         )
1021         self.assertEqual(p.tra_sa_out.get_lost(), 0)
1022         self.assertEqual(p.tra_sa_in.get_lost(), 0)
1023
1024         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1025         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1026
1027
1028 class IpsecTra4Tests(IpsecTra4):
1029     """UT test methods for Transport v4"""
1030
1031     def test_tra_anti_replay(self):
1032         """ipsec v4 transport anti-replay test"""
1033         self.verify_tra_anti_replay()
1034
1035     def test_tra_lost(self):
1036         """ipsec v4 transport lost packet test"""
1037         self.verify_tra_lost()
1038
1039     def test_tra_basic(self, count=1):
1040         """ipsec v4 transport basic test"""
1041         self.verify_tra_basic4(count=1)
1042
1043     def test_tra_burst(self):
1044         """ipsec v4 transport burst test"""
1045         self.verify_tra_basic4(count=257)
1046
1047
1048 class IpsecTra6(object):
1049     """verify methods for Transport v6"""
1050
1051     def verify_tra_basic6(self, count=1, payload_size=54):
1052         self.vapi.cli("clear errors")
1053         self.vapi.cli("clear ipsec sa")
1054         try:
1055             p = self.params[socket.AF_INET6]
1056             send_pkts = self.gen_encrypt_pkts6(
1057                 p,
1058                 p.scapy_tra_sa,
1059                 self.tra_if,
1060                 src=self.tra_if.remote_ip6,
1061                 dst=self.tra_if.local_ip6,
1062                 count=count,
1063                 payload_size=payload_size,
1064             )
1065             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1066             for rx in recv_pkts:
1067                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1068                 try:
1069                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1070                     self.assert_packet_checksums_valid(decrypted)
1071                 except:
1072                     self.logger.debug(ppp("Unexpected packet:", rx))
1073                     raise
1074         finally:
1075             self.logger.info(self.vapi.ppcli("show error"))
1076             self.logger.info(self.vapi.ppcli("show ipsec all"))
1077
1078         pkts = p.tra_sa_in.get_stats()["packets"]
1079         self.assertEqual(
1080             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1081         )
1082         pkts = p.tra_sa_out.get_stats()["packets"]
1083         self.assertEqual(
1084             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1085         )
1086         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1087         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1088
1089     def gen_encrypt_pkts_ext_hdrs6(
1090         self, sa, sw_intf, src, dst, count=1, payload_size=54
1091     ):
1092         return [
1093             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1094             / sa.encrypt(
1095                 IPv6(src=src, dst=dst)
1096                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1097             )
1098             for i in range(count)
1099         ]
1100
1101     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1102         return [
1103             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1104             / IPv6(src=src, dst=dst)
1105             / IPv6ExtHdrHopByHop()
1106             / IPv6ExtHdrFragment(id=2, offset=200)
1107             / Raw(b"\xff" * 200)
1108             for i in range(count)
1109         ]
1110
1111     def verify_tra_encrypted6(self, p, sa, rxs):
1112         decrypted = []
1113         for rx in rxs:
1114             self.assert_packet_checksums_valid(rx)
1115             try:
1116                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1117                 decrypted.append(decrypt_pkt)
1118                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1119                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1120             except:
1121                 self.logger.debug(ppp("Unexpected packet:", rx))
1122                 try:
1123                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1124                 except:
1125                     pass
1126                 raise
1127         return decrypted
1128
1129     def verify_tra_66_ext_hdrs(self, p):
1130         count = 63
1131
1132         #
1133         # check we can decrypt with options
1134         #
1135         tx = self.gen_encrypt_pkts_ext_hdrs6(
1136             p.scapy_tra_sa,
1137             self.tra_if,
1138             src=self.tra_if.remote_ip6,
1139             dst=self.tra_if.local_ip6,
1140             count=count,
1141         )
1142         self.send_and_expect(self.tra_if, tx, self.tra_if)
1143
1144         #
1145         # injecting a packet from ourselves to be routed of box is a hack
1146         # but it matches an outbout policy, alors je ne regrette rien
1147         #
1148
1149         # one extension before ESP
1150         tx = (
1151             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1152             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1153             / IPv6ExtHdrFragment(id=2, offset=200)
1154             / Raw(b"\xff" * 200)
1155         )
1156
1157         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1158         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1159
1160         for dc in dcs:
1161             # for reasons i'm not going to investigate scapy does not
1162             # created the correct headers after decrypt. but reparsing
1163             # the ipv6 packet fixes it
1164             dc = IPv6(raw(dc[IPv6]))
1165             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1166
1167         # two extensions before ESP
1168         tx = (
1169             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1170             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1171             / IPv6ExtHdrHopByHop()
1172             / IPv6ExtHdrFragment(id=2, offset=200)
1173             / Raw(b"\xff" * 200)
1174         )
1175
1176         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1177         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1178
1179         for dc in dcs:
1180             dc = IPv6(raw(dc[IPv6]))
1181             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1182             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1183
1184         # two extensions before ESP, one after
1185         tx = (
1186             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1187             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1188             / IPv6ExtHdrHopByHop()
1189             / IPv6ExtHdrFragment(id=2, offset=200)
1190             / IPv6ExtHdrDestOpt()
1191             / Raw(b"\xff" * 200)
1192         )
1193
1194         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1195         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1196
1197         for dc in dcs:
1198             dc = IPv6(raw(dc[IPv6]))
1199             self.assertTrue(dc[IPv6ExtHdrDestOpt])
1200             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1201             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1202
1203
1204 class IpsecTra6Tests(IpsecTra6):
1205     """UT test methods for Transport v6"""
1206
1207     def test_tra_basic6(self):
1208         """ipsec v6 transport basic test"""
1209         self.verify_tra_basic6(count=1)
1210
1211     def test_tra_burst6(self):
1212         """ipsec v6 transport burst test"""
1213         self.verify_tra_basic6(count=257)
1214
1215
1216 class IpsecTra6ExtTests(IpsecTra6):
1217     def test_tra_ext_hdrs_66(self):
1218         """ipsec 6o6 tra extension headers test"""
1219         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1220
1221
1222 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1223     """UT test methods for Transport v6 and v4"""
1224
1225     pass
1226
1227
1228 class IpsecTun4(object):
1229     """verify methods for Tunnel v4"""
1230
1231     def verify_counters4(self, p, count, n_frags=None, worker=None):
1232         if not n_frags:
1233             n_frags = count
1234         if hasattr(p, "spd_policy_in_any"):
1235             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1236             self.assertEqual(
1237                 pkts,
1238                 count,
1239                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1240             )
1241
1242         if hasattr(p, "tun_sa_in"):
1243             pkts = p.tun_sa_in.get_stats(worker)["packets"]
1244             self.assertEqual(
1245                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1246             )
1247             pkts = p.tun_sa_out.get_stats(worker)["packets"]
1248             self.assertEqual(
1249                 pkts,
1250                 n_frags,
1251                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1252             )
1253
1254         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1255         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1256
1257     def verify_decrypted(self, p, rxs):
1258         for rx in rxs:
1259             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1260             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1261             self.assert_packet_checksums_valid(rx)
1262
1263     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1264         align = sa.crypt_algo.block_size
1265         if align < 4:
1266             align = 4
1267         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1268         exp_len += sa.crypt_algo.iv_size
1269         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1270         self.assertEqual(exp_len, len(esp_payload))
1271
1272     def verify_encrypted(self, p, sa, rxs):
1273         decrypt_pkts = []
1274         for rx in rxs:
1275             if p.nat_header:
1276                 self.assertEqual(rx[UDP].dport, 4500)
1277             self.assert_packet_checksums_valid(rx)
1278             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1279             try:
1280                 rx_ip = rx[IP]
1281                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1282                 if not decrypt_pkt.haslayer(IP):
1283                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
1284                 if rx_ip.proto == socket.IPPROTO_ESP:
1285                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1286                 decrypt_pkts.append(decrypt_pkt)
1287                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1288                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1289             except:
1290                 self.logger.debug(ppp("Unexpected packet:", rx))
1291                 try:
1292                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1293                 except:
1294                     pass
1295                 raise
1296         pkts = reassemble4(decrypt_pkts)
1297         for pkt in pkts:
1298             self.assert_packet_checksums_valid(pkt)
1299
1300     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1301         self.vapi.cli("clear errors")
1302         self.vapi.cli("clear ipsec counters")
1303         self.vapi.cli("clear ipsec sa")
1304         if not n_rx:
1305             n_rx = count
1306         try:
1307             send_pkts = self.gen_encrypt_pkts(
1308                 p,
1309                 p.scapy_tun_sa,
1310                 self.tun_if,
1311                 src=p.remote_tun_if_host,
1312                 dst=self.pg1.remote_ip4,
1313                 count=count,
1314                 payload_size=payload_size,
1315             )
1316             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1317             self.verify_decrypted(p, recv_pkts)
1318
1319             send_pkts = self.gen_pkts(
1320                 self.pg1,
1321                 src=self.pg1.remote_ip4,
1322                 dst=p.remote_tun_if_host,
1323                 count=count,
1324                 payload_size=payload_size,
1325             )
1326             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1327             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1328
1329             for rx in recv_pkts:
1330                 self.assertEqual(rx[IP].src, p.tun_src)
1331                 self.assertEqual(rx[IP].dst, p.tun_dst)
1332
1333         finally:
1334             self.logger.info(self.vapi.ppcli("show error"))
1335             self.logger.info(self.vapi.ppcli("show ipsec all"))
1336
1337         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1338         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1339         self.verify_counters4(p, count, n_rx)
1340
1341     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1342         self.vapi.cli("clear errors")
1343         if not n_rx:
1344             n_rx = count
1345         try:
1346             send_pkts = self.gen_encrypt_pkts(
1347                 p,
1348                 p.scapy_tun_sa,
1349                 self.tun_if,
1350                 src=p.remote_tun_if_host,
1351                 dst=self.pg1.remote_ip4,
1352                 count=count,
1353             )
1354             self.send_and_assert_no_replies(self.tun_if, send_pkts)
1355
1356             send_pkts = self.gen_pkts(
1357                 self.pg1,
1358                 src=self.pg1.remote_ip4,
1359                 dst=p.remote_tun_if_host,
1360                 count=count,
1361                 payload_size=payload_size,
1362             )
1363             self.send_and_assert_no_replies(self.pg1, send_pkts)
1364
1365         finally:
1366             self.logger.info(self.vapi.ppcli("show error"))
1367             self.logger.info(self.vapi.ppcli("show ipsec all"))
1368
1369     def verify_tun_reass_44(self, p):
1370         self.vapi.cli("clear errors")
1371         self.vapi.ip_reassembly_enable_disable(
1372             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1373         )
1374
1375         try:
1376             send_pkts = self.gen_encrypt_pkts(
1377                 p,
1378                 p.scapy_tun_sa,
1379                 self.tun_if,
1380                 src=p.remote_tun_if_host,
1381                 dst=self.pg1.remote_ip4,
1382                 payload_size=1900,
1383                 count=1,
1384             )
1385             send_pkts = fragment_rfc791(send_pkts[0], 1400)
1386             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1387             self.verify_decrypted(p, recv_pkts)
1388
1389             send_pkts = self.gen_pkts(
1390                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1391             )
1392             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1393             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1394
1395         finally:
1396             self.logger.info(self.vapi.ppcli("show error"))
1397             self.logger.info(self.vapi.ppcli("show ipsec all"))
1398
1399         self.verify_counters4(p, 1, 1)
1400         self.vapi.ip_reassembly_enable_disable(
1401             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1402         )
1403
1404     def verify_tun_64(self, p, count=1):
1405         self.vapi.cli("clear errors")
1406         self.vapi.cli("clear ipsec sa")
1407         try:
1408             send_pkts = self.gen_encrypt_pkts6(
1409                 p,
1410                 p.scapy_tun_sa,
1411                 self.tun_if,
1412                 src=p.remote_tun_if_host6,
1413                 dst=self.pg1.remote_ip6,
1414                 count=count,
1415             )
1416             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1417             for recv_pkt in recv_pkts:
1418                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1419                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1420                 self.assert_packet_checksums_valid(recv_pkt)
1421             send_pkts = self.gen_pkts6(
1422                 p,
1423                 self.pg1,
1424                 src=self.pg1.remote_ip6,
1425                 dst=p.remote_tun_if_host6,
1426                 count=count,
1427             )
1428             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1429             for recv_pkt in recv_pkts:
1430                 try:
1431                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1432                     if not decrypt_pkt.haslayer(IPv6):
1433                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1434                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1435                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1436                     self.assert_packet_checksums_valid(decrypt_pkt)
1437                 except:
1438                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
1439                     try:
1440                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1441                     except:
1442                         pass
1443                     raise
1444         finally:
1445             self.logger.info(self.vapi.ppcli("show error"))
1446             self.logger.info(self.vapi.ppcli("show ipsec all"))
1447
1448         self.verify_counters4(p, count)
1449
1450     def verify_keepalive(self, p):
1451         # the sizeof Raw is calculated to pad to the minimum ehternet
1452         # frame size of 64 btyes
1453         pkt = (
1454             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1455             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1456             / UDP(sport=333, dport=4500)
1457             / Raw(b"\xff")
1458             / Padding(0 * 21)
1459         )
1460         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1461         self.assert_error_counter_equal(
1462             "/err/%s/NAT Keepalive" % self.tun4_input_node, 31
1463         )
1464
1465         pkt = (
1466             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1467             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1468             / UDP(sport=333, dport=4500)
1469             / Raw(b"\xfe")
1470         )
1471         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1472         self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 31)
1473
1474         pkt = (
1475             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1476             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1477             / UDP(sport=333, dport=4500)
1478             / Raw(b"\xfe")
1479             / Padding(0 * 21)
1480         )
1481         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1482         self.assert_error_counter_equal("/err/%s/Too Short" % self.tun4_input_node, 62)
1483
1484
1485 class IpsecTun4Tests(IpsecTun4):
1486     """UT test methods for Tunnel v4"""
1487
1488     def test_tun_basic44(self):
1489         """ipsec 4o4 tunnel basic test"""
1490         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1491         self.tun_if.admin_down()
1492         self.tun_if.resolve_arp()
1493         self.tun_if.admin_up()
1494         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1495
1496     def test_tun_reass_basic44(self):
1497         """ipsec 4o4 tunnel basic reassembly test"""
1498         self.verify_tun_reass_44(self.params[socket.AF_INET])
1499
1500     def test_tun_burst44(self):
1501         """ipsec 4o4 tunnel burst test"""
1502         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1503
1504
1505 class IpsecTun6(object):
1506     """verify methods for Tunnel v6"""
1507
1508     def verify_counters6(self, p_in, p_out, count, worker=None):
1509         if hasattr(p_in, "tun_sa_in"):
1510             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1511             self.assertEqual(
1512                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1513             )
1514         if hasattr(p_out, "tun_sa_out"):
1515             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1516             self.assertEqual(
1517                 pkts,
1518                 count,
1519                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1520             )
1521         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1522         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1523
1524     def verify_decrypted6(self, p, rxs):
1525         for rx in rxs:
1526             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1527             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1528             self.assert_packet_checksums_valid(rx)
1529
1530     def verify_encrypted6(self, p, sa, rxs):
1531         for rx in rxs:
1532             self.assert_packet_checksums_valid(rx)
1533             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1534             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1535             if p.outer_flow_label:
1536                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1537             try:
1538                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1539                 if not decrypt_pkt.haslayer(IPv6):
1540                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1541                 self.assert_packet_checksums_valid(decrypt_pkt)
1542                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1543                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1544                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1545                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1546             except:
1547                 self.logger.debug(ppp("Unexpected packet:", rx))
1548                 try:
1549                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1550                 except:
1551                     pass
1552                 raise
1553
1554     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1555         self.vapi.cli("clear errors")
1556         self.vapi.cli("clear ipsec sa")
1557
1558         send_pkts = self.gen_pkts6(
1559             p_in,
1560             self.pg1,
1561             src=self.pg1.remote_ip6,
1562             dst=p_in.remote_tun_if_host,
1563             count=count,
1564             payload_size=payload_size,
1565         )
1566         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1567         self.logger.info(self.vapi.cli("sh punt stats"))
1568
1569     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1570         self.vapi.cli("clear errors")
1571         self.vapi.cli("clear ipsec sa")
1572
1573         send_pkts = self.gen_encrypt_pkts6(
1574             p_in,
1575             p_in.scapy_tun_sa,
1576             self.tun_if,
1577             src=p_in.remote_tun_if_host,
1578             dst=self.pg1.remote_ip6,
1579             count=count,
1580         )
1581         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1582
1583     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1584         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1585         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1586
1587     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1588         self.vapi.cli("clear errors")
1589         self.vapi.cli("clear ipsec sa")
1590         if not p_out:
1591             p_out = p_in
1592         try:
1593             send_pkts = self.gen_encrypt_pkts6(
1594                 p_in,
1595                 p_in.scapy_tun_sa,
1596                 self.tun_if,
1597                 src=p_in.remote_tun_if_host,
1598                 dst=self.pg1.remote_ip6,
1599                 count=count,
1600                 payload_size=payload_size,
1601             )
1602             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1603             self.verify_decrypted6(p_in, recv_pkts)
1604
1605             send_pkts = self.gen_pkts6(
1606                 p_in,
1607                 self.pg1,
1608                 src=self.pg1.remote_ip6,
1609                 dst=p_out.remote_tun_if_host,
1610                 count=count,
1611                 payload_size=payload_size,
1612             )
1613             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1614             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1615
1616             for rx in recv_pkts:
1617                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1618                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1619
1620         finally:
1621             self.logger.info(self.vapi.ppcli("show error"))
1622             self.logger.info(self.vapi.ppcli("show ipsec all"))
1623         self.verify_counters6(p_in, p_out, count)
1624
1625     def verify_tun_reass_66(self, p):
1626         self.vapi.cli("clear errors")
1627         self.vapi.ip_reassembly_enable_disable(
1628             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1629         )
1630
1631         try:
1632             send_pkts = self.gen_encrypt_pkts6(
1633                 p,
1634                 p.scapy_tun_sa,
1635                 self.tun_if,
1636                 src=p.remote_tun_if_host,
1637                 dst=self.pg1.remote_ip6,
1638                 count=1,
1639                 payload_size=1850,
1640             )
1641             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1642             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1643             self.verify_decrypted6(p, recv_pkts)
1644
1645             send_pkts = self.gen_pkts6(
1646                 p,
1647                 self.pg1,
1648                 src=self.pg1.remote_ip6,
1649                 dst=p.remote_tun_if_host,
1650                 count=1,
1651                 payload_size=64,
1652             )
1653             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1654             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1655         finally:
1656             self.logger.info(self.vapi.ppcli("show error"))
1657             self.logger.info(self.vapi.ppcli("show ipsec all"))
1658         self.verify_counters6(p, p, 1)
1659         self.vapi.ip_reassembly_enable_disable(
1660             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1661         )
1662
1663     def verify_tun_46(self, p, count=1):
1664         """ipsec 4o6 tunnel basic test"""
1665         self.vapi.cli("clear errors")
1666         self.vapi.cli("clear ipsec sa")
1667         try:
1668             send_pkts = self.gen_encrypt_pkts(
1669                 p,
1670                 p.scapy_tun_sa,
1671                 self.tun_if,
1672                 src=p.remote_tun_if_host4,
1673                 dst=self.pg1.remote_ip4,
1674                 count=count,
1675             )
1676             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1677             for recv_pkt in recv_pkts:
1678                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1679                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1680                 self.assert_packet_checksums_valid(recv_pkt)
1681             send_pkts = self.gen_pkts(
1682                 self.pg1,
1683                 src=self.pg1.remote_ip4,
1684                 dst=p.remote_tun_if_host4,
1685                 count=count,
1686             )
1687             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1688             for recv_pkt in recv_pkts:
1689                 try:
1690                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1691                     if not decrypt_pkt.haslayer(IP):
1692                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1693                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1694                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1695                     self.assert_packet_checksums_valid(decrypt_pkt)
1696                 except:
1697                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1698                     try:
1699                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1700                     except:
1701                         pass
1702                     raise
1703         finally:
1704             self.logger.info(self.vapi.ppcli("show error"))
1705             self.logger.info(self.vapi.ppcli("show ipsec all"))
1706         self.verify_counters6(p, p, count)
1707
1708
1709 class IpsecTun6Tests(IpsecTun6):
1710     """UT test methods for Tunnel v6"""
1711
1712     def test_tun_basic66(self):
1713         """ipsec 6o6 tunnel basic test"""
1714         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1715
1716     def test_tun_reass_basic66(self):
1717         """ipsec 6o6 tunnel basic reassembly test"""
1718         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1719
1720     def test_tun_burst66(self):
1721         """ipsec 6o6 tunnel burst test"""
1722         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1723
1724
1725 class IpsecTun6HandoffTests(IpsecTun6):
1726     """UT test methods for Tunnel v6 with multiple workers"""
1727
1728     vpp_worker_count = 2
1729
1730     def test_tun_handoff_66(self):
1731         """ipsec 6o6 tunnel worker hand-off test"""
1732         self.vapi.cli("clear errors")
1733         self.vapi.cli("clear ipsec sa")
1734
1735         N_PKTS = 15
1736         p = self.params[socket.AF_INET6]
1737
1738         # inject alternately on worker 0 and 1. all counts on the SA
1739         # should be against worker 0
1740         for worker in [0, 1, 0, 1]:
1741             send_pkts = self.gen_encrypt_pkts6(
1742                 p,
1743                 p.scapy_tun_sa,
1744                 self.tun_if,
1745                 src=p.remote_tun_if_host,
1746                 dst=self.pg1.remote_ip6,
1747                 count=N_PKTS,
1748             )
1749             recv_pkts = self.send_and_expect(
1750                 self.tun_if, send_pkts, self.pg1, worker=worker
1751             )
1752             self.verify_decrypted6(p, recv_pkts)
1753
1754             send_pkts = self.gen_pkts6(
1755                 p,
1756                 self.pg1,
1757                 src=self.pg1.remote_ip6,
1758                 dst=p.remote_tun_if_host,
1759                 count=N_PKTS,
1760             )
1761             recv_pkts = self.send_and_expect(
1762                 self.pg1, send_pkts, self.tun_if, worker=worker
1763             )
1764             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1765
1766         # all counts against the first worker that was used
1767         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1768
1769
1770 class IpsecTun4HandoffTests(IpsecTun4):
1771     """UT test methods for Tunnel v4 with multiple workers"""
1772
1773     vpp_worker_count = 2
1774
1775     def test_tun_handooff_44(self):
1776         """ipsec 4o4 tunnel worker hand-off test"""
1777         self.vapi.cli("clear errors")
1778         self.vapi.cli("clear ipsec sa")
1779
1780         N_PKTS = 15
1781         p = self.params[socket.AF_INET]
1782
1783         # inject alternately on worker 0 and 1. all counts on the SA
1784         # should be against worker 0
1785         for worker in [0, 1, 0, 1]:
1786             send_pkts = self.gen_encrypt_pkts(
1787                 p,
1788                 p.scapy_tun_sa,
1789                 self.tun_if,
1790                 src=p.remote_tun_if_host,
1791                 dst=self.pg1.remote_ip4,
1792                 count=N_PKTS,
1793             )
1794             recv_pkts = self.send_and_expect(
1795                 self.tun_if, send_pkts, self.pg1, worker=worker
1796             )
1797             self.verify_decrypted(p, recv_pkts)
1798
1799             send_pkts = self.gen_pkts(
1800                 self.pg1,
1801                 src=self.pg1.remote_ip4,
1802                 dst=p.remote_tun_if_host,
1803                 count=N_PKTS,
1804             )
1805             recv_pkts = self.send_and_expect(
1806                 self.pg1, send_pkts, self.tun_if, worker=worker
1807             )
1808             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1809
1810         # all counts against the first worker that was used
1811         self.verify_counters4(p, 4 * N_PKTS, worker=0)
1812
1813
1814 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1815     """UT test methods for Tunnel v6 & v4"""
1816
1817     pass
1818
1819
1820 class IPSecIPv4Fwd(VppTestCase):
1821     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1822
1823     @classmethod
1824     def setUpConstants(cls):
1825         super(IPSecIPv4Fwd, cls).setUpConstants()
1826
1827     def setUp(self):
1828         super(IPSecIPv4Fwd, self).setUp()
1829         # store SPD objects so we can remove configs on tear down
1830         self.spd_objs = []
1831         self.spd_policies = []
1832
1833     def tearDown(self):
1834         # remove SPD policies
1835         for obj in self.spd_policies:
1836             obj.remove_vpp_config()
1837         self.spd_policies = []
1838         # remove SPD items (interface bindings first, then SPD)
1839         for obj in reversed(self.spd_objs):
1840             obj.remove_vpp_config()
1841         self.spd_objs = []
1842         # close down pg intfs
1843         for pg in self.pg_interfaces:
1844             pg.unconfig_ip4()
1845             pg.admin_down()
1846         super(IPSecIPv4Fwd, self).tearDown()
1847
1848     def create_interfaces(self, num_ifs=2):
1849         # create interfaces pg0 ... pg<num_ifs>
1850         self.create_pg_interfaces(range(num_ifs))
1851         for pg in self.pg_interfaces:
1852             # put the interface up
1853             pg.admin_up()
1854             # configure IPv4 address on the interface
1855             pg.config_ip4()
1856             # resolve ARP, so that we know VPP MAC
1857             pg.resolve_arp()
1858         self.logger.info(self.vapi.ppcli("show int addr"))
1859
1860     def spd_create_and_intf_add(self, spd_id, pg_list):
1861         spd = VppIpsecSpd(self, spd_id)
1862         spd.add_vpp_config()
1863         self.spd_objs.append(spd)
1864         for pg in pg_list:
1865             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1866             spdItf.add_vpp_config()
1867             self.spd_objs.append(spdItf)
1868
1869     def get_policy(self, policy_type):
1870         e = VppEnum.vl_api_ipsec_spd_action_t
1871         if policy_type == "protect":
1872             return e.IPSEC_API_SPD_ACTION_PROTECT
1873         elif policy_type == "bypass":
1874             return e.IPSEC_API_SPD_ACTION_BYPASS
1875         elif policy_type == "discard":
1876             return e.IPSEC_API_SPD_ACTION_DISCARD
1877         else:
1878             raise Exception("Invalid policy type: %s", policy_type)
1879
1880     def spd_add_rem_policy(
1881         self,
1882         spd_id,
1883         src_if,
1884         dst_if,
1885         proto,
1886         is_out,
1887         priority,
1888         policy_type,
1889         remove=False,
1890         all_ips=False,
1891         ip_range=False,
1892         local_ip_start=ip_address("0.0.0.0"),
1893         local_ip_stop=ip_address("255.255.255.255"),
1894         remote_ip_start=ip_address("0.0.0.0"),
1895         remote_ip_stop=ip_address("255.255.255.255"),
1896         remote_port_start=0,
1897         remote_port_stop=65535,
1898         local_port_start=0,
1899         local_port_stop=65535,
1900     ):
1901         spd = VppIpsecSpd(self, spd_id)
1902
1903         if all_ips:
1904             src_range_low = ip_address("0.0.0.0")
1905             src_range_high = ip_address("255.255.255.255")
1906             dst_range_low = ip_address("0.0.0.0")
1907             dst_range_high = ip_address("255.255.255.255")
1908
1909         elif ip_range:
1910             src_range_low = local_ip_start
1911             src_range_high = local_ip_stop
1912             dst_range_low = remote_ip_start
1913             dst_range_high = remote_ip_stop
1914
1915         else:
1916             src_range_low = src_if.remote_ip4
1917             src_range_high = src_if.remote_ip4
1918             dst_range_low = dst_if.remote_ip4
1919             dst_range_high = dst_if.remote_ip4
1920
1921         spdEntry = VppIpsecSpdEntry(
1922             self,
1923             spd,
1924             0,
1925             src_range_low,
1926             src_range_high,
1927             dst_range_low,
1928             dst_range_high,
1929             proto,
1930             priority=priority,
1931             policy=self.get_policy(policy_type),
1932             is_outbound=is_out,
1933             remote_port_start=remote_port_start,
1934             remote_port_stop=remote_port_stop,
1935             local_port_start=local_port_start,
1936             local_port_stop=local_port_stop,
1937         )
1938
1939         if remove is False:
1940             spdEntry.add_vpp_config()
1941             self.spd_policies.append(spdEntry)
1942         else:
1943             spdEntry.remove_vpp_config()
1944             self.spd_policies.remove(spdEntry)
1945         self.logger.info(self.vapi.ppcli("show ipsec all"))
1946         return spdEntry
1947
1948     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
1949         packets = []
1950         for i in range(pkt_count):
1951             # create packet info stored in the test case instance
1952             info = self.create_packet_info(src_if, dst_if)
1953             # convert the info into packet payload
1954             payload = self.info_to_payload(info)
1955             # create the packet itself
1956             p = (
1957                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1958                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
1959                 / UDP(sport=src_prt, dport=dst_prt)
1960                 / Raw(payload)
1961             )
1962             # store a copy of the packet in the packet info
1963             info.data = p.copy()
1964             # append the packet to the list
1965             packets.append(p)
1966         # return the created packet list
1967         return packets
1968
1969     def verify_capture(self, src_if, dst_if, capture):
1970         packet_info = None
1971         for packet in capture:
1972             try:
1973                 ip = packet[IP]
1974                 udp = packet[UDP]
1975                 # convert the payload to packet info object
1976                 payload_info = self.payload_to_info(packet)
1977                 # make sure the indexes match
1978                 self.assert_equal(
1979                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
1980                 )
1981                 self.assert_equal(
1982                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
1983                 )
1984                 packet_info = self.get_next_packet_info_for_interface2(
1985                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
1986                 )
1987                 # make sure we didn't run out of saved packets
1988                 self.assertIsNotNone(packet_info)
1989                 self.assert_equal(
1990                     payload_info.index, packet_info.index, "packet info index"
1991                 )
1992                 saved_packet = packet_info.data  # fetch the saved packet
1993                 # assert the values match
1994                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
1995                 # ... more assertions here
1996                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
1997             except Exception as e:
1998                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1999                 raise
2000         remaining_packet = self.get_next_packet_info_for_interface2(
2001             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2002         )
2003         self.assertIsNone(
2004             remaining_packet,
2005             "Interface %s: Packet expected from interface "
2006             "%s didn't arrive" % (dst_if.name, src_if.name),
2007         )
2008
2009     def verify_policy_match(self, pkt_count, spdEntry):
2010         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2011         matched_pkts = spdEntry.get_stats().get("packets")
2012         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2013         self.assert_equal(pkt_count, matched_pkts)
2014
2015
2016 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2017     @classmethod
2018     def setUpConstants(cls):
2019         super(SpdFlowCacheTemplate, cls).setUpConstants()
2020         # Override this method with required cmdline parameters e.g.
2021         # cls.vpp_cmdline.extend(["ipsec", "{",
2022         #                         "ipv4-outbound-spd-flow-cache on",
2023         #                         "}"])
2024         # cls.logger.info("VPP modified cmdline is %s" % " "
2025         #                 .join(cls.vpp_cmdline))
2026
2027     def setUp(self):
2028         super(SpdFlowCacheTemplate, self).setUp()
2029
2030     def tearDown(self):
2031         super(SpdFlowCacheTemplate, self).tearDown()
2032
2033     def get_spd_flow_cache_entries(self, outbound):
2034         """'show ipsec spd' output:
2035         ipv4-inbound-spd-flow-cache-entries: 0
2036         ipv4-outbound-spd-flow-cache-entries: 0
2037         """
2038         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2039         # match the relevant section of 'show ipsec spd' output
2040         if outbound:
2041             regex_match = re.search(
2042                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2043                 show_ipsec_reply,
2044                 re.DOTALL,
2045             )
2046         else:
2047             regex_match = re.search(
2048                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2049             )
2050         if regex_match is None:
2051             raise Exception(
2052                 "Unable to find spd flow cache entries \
2053                 in 'show ipsec spd' CLI output - regex failed to match"
2054             )
2055         else:
2056             try:
2057                 num_entries = int(regex_match.group(1))
2058             except ValueError:
2059                 raise Exception(
2060                     "Unable to get spd flow cache entries \
2061                 from 'show ipsec spd' string: %s",
2062                     regex_match.group(0),
2063                 )
2064             self.logger.info("%s", regex_match.group(0))
2065         return num_entries
2066
2067     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2068         self.assertEqual(
2069             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2070         )
2071
2072     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2073         self.assertEqual(
2074             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2075         )
2076
2077     def crc32_supported(self):
2078         # lscpu is part of util-linux package, available on all Linux Distros
2079         stream = os.popen("lscpu")
2080         cpu_info = stream.read()
2081         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2082         # see vppinfra/crc32.h
2083         if "crc32" or "sse4_2" in cpu_info:
2084             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2085             return True
2086         else:
2087             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2088             return False
2089
2090
2091 class IPSecIPv6Fwd(VppTestCase):
2092     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2093
2094     @classmethod
2095     def setUpConstants(cls):
2096         super(IPSecIPv6Fwd, cls).setUpConstants()
2097
2098     def setUp(self):
2099         super(IPSecIPv6Fwd, self).setUp()
2100         # store SPD objects so we can remove configs on tear down
2101         self.spd_objs = []
2102         self.spd_policies = []
2103
2104     def tearDown(self):
2105         # remove SPD policies
2106         for obj in self.spd_policies:
2107             obj.remove_vpp_config()
2108         self.spd_policies = []
2109         # remove SPD items (interface bindings first, then SPD)
2110         for obj in reversed(self.spd_objs):
2111             obj.remove_vpp_config()
2112         self.spd_objs = []
2113         # close down pg intfs
2114         for pg in self.pg_interfaces:
2115             pg.unconfig_ip6()
2116             pg.admin_down()
2117         super(IPSecIPv6Fwd, self).tearDown()
2118
2119     def create_interfaces(self, num_ifs=2):
2120         # create interfaces pg0 ... pg<num_ifs>
2121         self.create_pg_interfaces(range(num_ifs))
2122         for pg in self.pg_interfaces:
2123             # put the interface up
2124             pg.admin_up()
2125             # configure IPv6 address on the interface
2126             pg.config_ip6()
2127             pg.resolve_ndp()
2128         self.logger.info(self.vapi.ppcli("show int addr"))
2129
2130     def spd_create_and_intf_add(self, spd_id, pg_list):
2131         spd = VppIpsecSpd(self, spd_id)
2132         spd.add_vpp_config()
2133         self.spd_objs.append(spd)
2134         for pg in pg_list:
2135             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2136             spdItf.add_vpp_config()
2137             self.spd_objs.append(spdItf)
2138
2139     def get_policy(self, policy_type):
2140         e = VppEnum.vl_api_ipsec_spd_action_t
2141         if policy_type == "protect":
2142             return e.IPSEC_API_SPD_ACTION_PROTECT
2143         elif policy_type == "bypass":
2144             return e.IPSEC_API_SPD_ACTION_BYPASS
2145         elif policy_type == "discard":
2146             return e.IPSEC_API_SPD_ACTION_DISCARD
2147         else:
2148             raise Exception("Invalid policy type: %s", policy_type)
2149
2150     def spd_add_rem_policy(
2151         self,
2152         spd_id,
2153         src_if,
2154         dst_if,
2155         proto,
2156         is_out,
2157         priority,
2158         policy_type,
2159         remove=False,
2160         all_ips=False,
2161         ip_range=False,
2162         local_ip_start=ip_address("0::0"),
2163         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2164         remote_ip_start=ip_address("0::0"),
2165         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2166         remote_port_start=0,
2167         remote_port_stop=65535,
2168         local_port_start=0,
2169         local_port_stop=65535,
2170     ):
2171         spd = VppIpsecSpd(self, spd_id)
2172
2173         if all_ips:
2174             src_range_low = ip_address("0::0")
2175             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2176             dst_range_low = ip_address("0::0")
2177             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2178
2179         elif ip_range:
2180             src_range_low = local_ip_start
2181             src_range_high = local_ip_stop
2182             dst_range_low = remote_ip_start
2183             dst_range_high = remote_ip_stop
2184
2185         else:
2186             src_range_low = src_if.remote_ip6
2187             src_range_high = src_if.remote_ip6
2188             dst_range_low = dst_if.remote_ip6
2189             dst_range_high = dst_if.remote_ip6
2190
2191         spdEntry = VppIpsecSpdEntry(
2192             self,
2193             spd,
2194             0,
2195             src_range_low,
2196             src_range_high,
2197             dst_range_low,
2198             dst_range_high,
2199             proto,
2200             priority=priority,
2201             policy=self.get_policy(policy_type),
2202             is_outbound=is_out,
2203             remote_port_start=remote_port_start,
2204             remote_port_stop=remote_port_stop,
2205             local_port_start=local_port_start,
2206             local_port_stop=local_port_stop,
2207         )
2208
2209         if remove is False:
2210             spdEntry.add_vpp_config()
2211             self.spd_policies.append(spdEntry)
2212         else:
2213             spdEntry.remove_vpp_config()
2214             self.spd_policies.remove(spdEntry)
2215         self.logger.info(self.vapi.ppcli("show ipsec all"))
2216         return spdEntry
2217
2218     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2219         packets = []
2220         for i in range(pkt_count):
2221             # create packet info stored in the test case instance
2222             info = self.create_packet_info(src_if, dst_if)
2223             # convert the info into packet payload
2224             payload = self.info_to_payload(info)
2225             # create the packet itself
2226             p = (
2227                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2228                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2229                 / UDP(sport=src_prt, dport=dst_prt)
2230                 / Raw(payload)
2231             )
2232             # store a copy of the packet in the packet info
2233             info.data = p.copy()
2234             # append the packet to the list
2235             packets.append(p)
2236         # return the created packet list
2237         return packets
2238
2239     def verify_capture(self, src_if, dst_if, capture):
2240         packet_info = None
2241         for packet in capture:
2242             try:
2243                 ip = packet[IPv6]
2244                 udp = packet[UDP]
2245                 # convert the payload to packet info object
2246                 payload_info = self.payload_to_info(packet)
2247                 # make sure the indexes match
2248                 self.assert_equal(
2249                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2250                 )
2251                 self.assert_equal(
2252                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2253                 )
2254                 packet_info = self.get_next_packet_info_for_interface2(
2255                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2256                 )
2257                 # make sure we didn't run out of saved packets
2258                 self.assertIsNotNone(packet_info)
2259                 self.assert_equal(
2260                     payload_info.index, packet_info.index, "packet info index"
2261                 )
2262                 saved_packet = packet_info.data  # fetch the saved packet
2263                 # assert the values match
2264                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2265                 # ... more assertions here
2266                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2267             except Exception as e:
2268                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2269                 raise
2270         remaining_packet = self.get_next_packet_info_for_interface2(
2271             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2272         )
2273         self.assertIsNone(
2274             remaining_packet,
2275             "Interface %s: Packet expected from interface "
2276             "%s didn't arrive" % (dst_if.name, src_if.name),
2277         )
2278
2279     def verify_policy_match(self, pkt_count, spdEntry):
2280         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2281         matched_pkts = spdEntry.get_stats().get("packets")
2282         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2283         self.assert_equal(pkt_count, matched_pkts)
2284
2285
2286 if __name__ == "__main__":
2287     unittest.main(testRunner=VppTestRunner)