ipsec: enable UDP encap for IPv6 ESP tun protect
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.auth_algo_vpp_id = (
56             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
57         )
58         self.auth_algo = "HMAC-SHA1-96"  # scapy name
59         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
60
61         self.crypt_algo_vpp_id = (
62             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
63         )
64         self.crypt_algo = "AES-CBC"  # scapy name
65         self.crypt_key = b"JPjyOWBeVEQiMe7h"
66         self.salt = 0
67         self.flags = 0
68         self.nat_header = None
69         self.tun_flags = (
70             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
71         )
72         self.dscp = 0
73         self.async_mode = False
74
75
76 class IPsecIPv6Params:
77
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.auth_algo_vpp_id = (
104             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
105         )
106         self.auth_algo = "HMAC-SHA1-96"  # scapy name
107         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
108
109         self.crypt_algo_vpp_id = (
110             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
111         )
112         self.crypt_algo = "AES-CBC"  # scapy name
113         self.crypt_key = b"JPjyOWBeVEQiMe7h"
114         self.salt = 0
115         self.flags = 0
116         self.nat_header = None
117         self.tun_flags = (
118             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
119         )
120         self.dscp = 0
121         self.async_mode = False
122
123
124 def mk_scapy_crypt_key(p):
125     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
126         return p.crypt_key + struct.pack("!I", p.salt)
127     else:
128         return p.crypt_key
129
130
131 def config_tun_params(p, encryption_type, tun_if):
132     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
133     esn_en = bool(
134         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
135     )
136     p.tun_dst = tun_if.remote_addr[p.addr_type]
137     p.tun_src = tun_if.local_addr[p.addr_type]
138     crypt_key = mk_scapy_crypt_key(p)
139     p.scapy_tun_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.vpp_tun_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147         nat_t_header=p.nat_header,
148         esn_en=esn_en,
149     )
150     p.vpp_tun_sa = SecurityAssociation(
151         encryption_type,
152         spi=p.scapy_tun_spi,
153         crypt_algo=p.crypt_algo,
154         crypt_key=crypt_key,
155         auth_algo=p.auth_algo,
156         auth_key=p.auth_key,
157         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158         nat_t_header=p.nat_header,
159         esn_en=esn_en,
160     )
161
162
163 def config_tra_params(p, encryption_type):
164     esn_en = bool(
165         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
166     )
167     crypt_key = mk_scapy_crypt_key(p)
168     p.scapy_tra_sa = SecurityAssociation(
169         encryption_type,
170         spi=p.vpp_tra_spi,
171         crypt_algo=p.crypt_algo,
172         crypt_key=crypt_key,
173         auth_algo=p.auth_algo,
174         auth_key=p.auth_key,
175         nat_t_header=p.nat_header,
176         esn_en=esn_en,
177     )
178     p.vpp_tra_sa = SecurityAssociation(
179         encryption_type,
180         spi=p.scapy_tra_spi,
181         crypt_algo=p.crypt_algo,
182         crypt_key=crypt_key,
183         auth_algo=p.auth_algo,
184         auth_key=p.auth_key,
185         nat_t_header=p.nat_header,
186         esn_en=esn_en,
187     )
188
189
190 class TemplateIpsec(VppTestCase):
191     """
192     TRANSPORT MODE::
193
194          ------   encrypt   ---
195         |tra_if| <-------> |VPP|
196          ------   decrypt   ---
197
198     TUNNEL MODE::
199
200          ------   encrypt   ---   plain   ---
201         |tun_if| <-------  |VPP| <------ |pg1|
202          ------             ---           ---
203
204          ------   decrypt   ---   plain   ---
205         |tun_if| ------->  |VPP| ------> |pg1|
206          ------             ---           ---
207     """
208
209     tun_spd_id = 1
210     tra_spd_id = 2
211
212     def ipsec_select_backend(self):
213         """empty method to be overloaded when necessary"""
214         pass
215
216     @classmethod
217     def setUpClass(cls):
218         super(TemplateIpsec, cls).setUpClass()
219
220     @classmethod
221     def tearDownClass(cls):
222         super(TemplateIpsec, cls).tearDownClass()
223
224     def setup_params(self):
225         if not hasattr(self, "ipv4_params"):
226             self.ipv4_params = IPsecIPv4Params()
227         if not hasattr(self, "ipv6_params"):
228             self.ipv6_params = IPsecIPv6Params()
229         self.params = {
230             self.ipv4_params.addr_type: self.ipv4_params,
231             self.ipv6_params.addr_type: self.ipv6_params,
232         }
233
234     def config_interfaces(self):
235         self.create_pg_interfaces(range(3))
236         self.interfaces = list(self.pg_interfaces)
237         for i in self.interfaces:
238             i.admin_up()
239             i.config_ip4()
240             i.resolve_arp()
241             i.config_ip6()
242             i.resolve_ndp()
243
244     def setUp(self):
245         super(TemplateIpsec, self).setUp()
246
247         self.setup_params()
248
249         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
251
252         self.config_interfaces()
253
254         self.ipsec_select_backend()
255
256     def unconfig_interfaces(self):
257         for i in self.interfaces:
258             i.admin_down()
259             i.unconfig_ip4()
260             i.unconfig_ip6()
261
262     def tearDown(self):
263         super(TemplateIpsec, self).tearDown()
264
265         self.unconfig_interfaces()
266
267     def show_commands_at_teardown(self):
268         self.logger.info(self.vapi.cli("show hardware"))
269
270     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
271         return [
272             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274             for i in range(count)
275         ]
276
277     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
278         return [
279             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
280             / sa.encrypt(
281                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
283             )
284             for i in range(count)
285         ]
286
287     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
288         return [
289             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290             / IP(src=src, dst=dst)
291             / ICMP()
292             / Raw(b"X" * payload_size)
293             for i in range(count)
294         ]
295
296     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
297         return [
298             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301             for i in range(count)
302         ]
303
304
305 class IpsecTcp(object):
306     def verify_tcp_checksum(self):
307         # start http cli server listener on http://0.0.0.0:80
308         self.vapi.cli("http cli server")
309         p = self.params[socket.AF_INET]
310         send = Ether(
311             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312         ) / p.scapy_tun_sa.encrypt(
313             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314             / TCP(flags="S", dport=80)
315         )
316         self.logger.debug(ppp("Sending packet:", send))
317         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
318         recv = recv[0]
319         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320         self.assert_packet_checksums_valid(decrypted)
321
322
323 class IpsecTcpTests(IpsecTcp):
324     def test_tcp_checksum(self):
325         """verify checksum correctness for vpp generated packets"""
326         self.verify_tcp_checksum()
327
328
329 class IpsecTra4(object):
330     """verify methods for Transport v4"""
331
332     def get_replay_counts(self, p):
333         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
334         count = self.statistics.get_err_counter(replay_node_name)
335
336         if p.async_mode:
337             replay_post_node_name = (
338                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
339             )
340             count += self.statistics.get_err_counter(replay_post_node_name)
341
342         return count
343
344     def get_hash_failed_counts(self, p):
345         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
346             hash_failed_node_name = (
347                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
348             )
349         else:
350             hash_failed_node_name = (
351                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
352             )
353         count = self.statistics.get_err_counter(hash_failed_node_name)
354
355         if p.async_mode:
356             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
357
358         return count
359
360     def verify_hi_seq_num(self):
361         p = self.params[socket.AF_INET]
362         saf = VppEnum.vl_api_ipsec_sad_flags_t
363         esn_on = p.vpp_tra_sa.esn_en
364         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
365
366         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
367         replay_count = self.get_replay_counts(p)
368         hash_failed_count = self.get_hash_failed_counts(p)
369         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
370
371         # a few packets so we get the rx seq number above the window size and
372         # thus can simulate a wrap with an out of window packet
373         pkts = [
374             (
375                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
376                 / p.scapy_tra_sa.encrypt(
377                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
378                     seq_num=seq,
379                 )
380             )
381             for seq in range(63, 80)
382         ]
383         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
384
385         # these 4 packets will all choose seq-num 0 to decrpyt since none
386         # are out of window when first checked. however, once #200 has
387         # decrypted it will move the window to 200 and has #81 is out of
388         # window. this packet should be dropped.
389         pkts = [
390             (
391                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
392                 / p.scapy_tra_sa.encrypt(
393                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
394                     seq_num=200,
395                 )
396             ),
397             (
398                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
399                 / p.scapy_tra_sa.encrypt(
400                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
401                     seq_num=81,
402                 )
403             ),
404             (
405                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
406                 / p.scapy_tra_sa.encrypt(
407                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
408                     seq_num=201,
409                 )
410             ),
411             (
412                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
413                 / p.scapy_tra_sa.encrypt(
414                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
415                     seq_num=202,
416                 )
417             ),
418         ]
419
420         # if anti-replay is off then we won't drop #81
421         n_rx = 3 if ar_on else 4
422         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
423         # this packet is one before the wrap
424         pkts = [
425             (
426                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
427                 / p.scapy_tra_sa.encrypt(
428                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
429                     seq_num=203,
430                 )
431             )
432         ]
433         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
434
435         # move the window over half way to a wrap
436         pkts = [
437             (
438                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
439                 / p.scapy_tra_sa.encrypt(
440                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
441                     seq_num=0x80000001,
442                 )
443             )
444         ]
445         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
446
447         # anti-replay will drop old packets, no anti-replay will not
448         pkts = [
449             (
450                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
451                 / p.scapy_tra_sa.encrypt(
452                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
453                     seq_num=0x44000001,
454                 )
455             )
456         ]
457
458         if ar_on:
459             self.send_and_assert_no_replies(self.tra_if, pkts)
460         else:
461             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
462
463         if esn_on:
464             #
465             # validate wrapping the ESN
466             #
467
468             # wrap scapy's TX SA SN
469             p.scapy_tra_sa.seq_num = 0x100000005
470
471             # send a packet that wraps the window for both AR and no AR
472             pkts = [
473                 (
474                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
475                     / p.scapy_tra_sa.encrypt(
476                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
477                         / ICMP(),
478                         seq_num=0x100000005,
479                     )
480                 )
481             ]
482
483             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
484             for rx in rxs:
485                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
486
487             # move the window forward to half way to the next wrap
488             pkts = [
489                 (
490                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
491                     / p.scapy_tra_sa.encrypt(
492                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
493                         / ICMP(),
494                         seq_num=0x180000005,
495                     )
496                 )
497             ]
498
499             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
500
501             # a packet less than 2^30 from the current position is:
502             #  - AR: out of window and dropped
503             #  - non-AR: accepted
504             pkts = [
505                 (
506                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
507                     / p.scapy_tra_sa.encrypt(
508                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
509                         / ICMP(),
510                         seq_num=0x170000005,
511                     )
512                 )
513             ]
514
515             if ar_on:
516                 self.send_and_assert_no_replies(self.tra_if, pkts)
517             else:
518                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
519
520             # a packet more than 2^30 from the current position is:
521             #  - AR: out of window and dropped
522             #  - non-AR: considered a wrap, but since it's not a wrap
523             #    it won't decrpyt and so will be dropped
524             pkts = [
525                 (
526                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
527                     / p.scapy_tra_sa.encrypt(
528                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
529                         / ICMP(),
530                         seq_num=0x130000005,
531                     )
532                 )
533             ]
534
535             self.send_and_assert_no_replies(self.tra_if, pkts)
536
537             # a packet less than 2^30 from the current position and is a
538             # wrap; (the seq is currently at 0x180000005).
539             #  - AR: out of window so considered a wrap, so accepted
540             #  - non-AR: not considered a wrap, so won't decrypt
541             p.scapy_tra_sa.seq_num = 0x260000005
542             pkts = [
543                 (
544                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
545                     / p.scapy_tra_sa.encrypt(
546                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
547                         / ICMP(),
548                         seq_num=0x260000005,
549                     )
550                 )
551             ]
552             if ar_on:
553                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
554             else:
555                 self.send_and_assert_no_replies(self.tra_if, pkts)
556
557             #
558             # window positions are different now for AR/non-AR
559             #  move non-AR forward
560             #
561             if not ar_on:
562                 # a packet more than 2^30 from the current position and is a
563                 # wrap; (the seq is currently at 0x180000005).
564                 #  - AR: accepted
565                 #  - non-AR: not considered a wrap, so won't decrypt
566
567                 pkts = [
568                     (
569                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
570                         / p.scapy_tra_sa.encrypt(
571                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
572                             / ICMP(),
573                             seq_num=0x200000005,
574                         )
575                     ),
576                     (
577                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
578                         / p.scapy_tra_sa.encrypt(
579                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
580                             / ICMP(),
581                             seq_num=0x200000006,
582                         )
583                     ),
584                 ]
585                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
586
587                 pkts = [
588                     (
589                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
590                         / p.scapy_tra_sa.encrypt(
591                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
592                             / ICMP(),
593                             seq_num=0x260000005,
594                         )
595                     )
596                 ]
597                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
598
599     def verify_tra_anti_replay(self):
600         p = self.params[socket.AF_INET]
601         esn_en = p.vpp_tra_sa.esn_en
602
603         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
604         replay_count = self.get_replay_counts(p)
605         hash_failed_count = self.get_hash_failed_counts(p)
606         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
607
608         if ESP == self.encryption_type:
609             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
610             undersize_count = self.statistics.get_err_counter(undersize_node_name)
611
612         #
613         # send packets with seq numbers 1->34
614         # this means the window size is still in Case B (see RFC4303
615         # Appendix A)
616         #
617         # for reasons i haven't investigated Scapy won't create a packet with
618         # seq_num=0
619         #
620         pkts = [
621             (
622                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
623                 / p.scapy_tra_sa.encrypt(
624                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
625                     seq_num=seq,
626                 )
627             )
628             for seq in range(1, 34)
629         ]
630         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
631
632         # replayed packets are dropped
633         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
634         replay_count += len(pkts)
635         self.assertEqual(self.get_replay_counts(p), replay_count)
636
637         #
638         # now send a batch of packets all with the same sequence number
639         # the first packet in the batch is legitimate, the rest bogus
640         #
641         self.vapi.cli("clear error")
642         self.vapi.cli("clear node counters")
643         pkts = Ether(
644             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
645         ) / p.scapy_tra_sa.encrypt(
646             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
647             seq_num=35,
648         )
649         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
650         replay_count += 7
651         self.assertEqual(self.get_replay_counts(p), replay_count)
652
653         #
654         # now move the window over to 257 (more than one byte) and into Case A
655         #
656         self.vapi.cli("clear error")
657         pkt = Ether(
658             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
659         ) / p.scapy_tra_sa.encrypt(
660             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
661             seq_num=257,
662         )
663         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
664
665         # replayed packets are dropped
666         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
667         replay_count += 3
668         self.assertEqual(self.get_replay_counts(p), replay_count)
669
670         # the window size is 64 packets
671         # in window are still accepted
672         pkt = Ether(
673             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
674         ) / p.scapy_tra_sa.encrypt(
675             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
676             seq_num=200,
677         )
678         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
679
680         # a packet that does not decrypt does not move the window forward
681         bogus_sa = SecurityAssociation(
682             self.encryption_type,
683             p.vpp_tra_spi,
684             crypt_algo=p.crypt_algo,
685             crypt_key=mk_scapy_crypt_key(p)[::-1],
686             auth_algo=p.auth_algo,
687             auth_key=p.auth_key[::-1],
688         )
689         pkt = Ether(
690             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
691         ) / bogus_sa.encrypt(
692             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
693             seq_num=350,
694         )
695         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
696
697         hash_failed_count += 17
698         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
699
700         # a malformed 'runt' packet
701         #  created by a mis-constructed SA
702         if ESP == self.encryption_type and p.crypt_algo != "NULL":
703             bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi)
704             pkt = Ether(
705                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
706             ) / bogus_sa.encrypt(
707                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
708                 seq_num=350,
709             )
710             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
711
712             undersize_count += 17
713             self.assert_error_counter_equal(undersize_node_name, undersize_count)
714
715         # which we can determine since this packet is still in the window
716         pkt = Ether(
717             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
718         ) / p.scapy_tra_sa.encrypt(
719             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
720             seq_num=234,
721         )
722         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
723
724         #
725         # out of window are dropped
726         #  this is Case B. So VPP will consider this to be a high seq num wrap
727         #  and so the decrypt attempt will fail
728         #
729         pkt = Ether(
730             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
731         ) / p.scapy_tra_sa.encrypt(
732             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
733             seq_num=17,
734         )
735         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
736
737         if esn_en:
738             # an out of window error with ESN looks like a high sequence
739             # wrap. but since it isn't then the verify will fail.
740             hash_failed_count += 17
741             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
742
743         else:
744             replay_count += 17
745             self.assertEqual(self.get_replay_counts(p), replay_count)
746
747         # valid packet moves the window over to 258
748         pkt = Ether(
749             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
750         ) / p.scapy_tra_sa.encrypt(
751             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
752             seq_num=258,
753         )
754         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
755         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
756
757         #
758         # move VPP's SA TX seq-num to just before the seq-number wrap.
759         # then fire in a packet that VPP should drop on TX because it
760         # causes the TX seq number to wrap; unless we're using extened sequence
761         # numbers.
762         #
763         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
764         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
765         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
766
767         pkts = [
768             (
769                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
770                 / p.scapy_tra_sa.encrypt(
771                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
772                     seq_num=seq,
773                 )
774             )
775             for seq in range(259, 280)
776         ]
777
778         if esn_en:
779             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
780
781             #
782             # in order for scapy to decrypt its SA's high order number needs
783             # to wrap
784             #
785             p.vpp_tra_sa.seq_num = 0x100000000
786             for rx in rxs:
787                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
788
789             #
790             # wrap scapy's TX high sequence number. VPP is in case B, so it
791             # will consider this a high seq wrap also.
792             # The low seq num we set it to will place VPP's RX window in Case A
793             #
794             p.scapy_tra_sa.seq_num = 0x100000005
795             pkt = Ether(
796                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
797             ) / p.scapy_tra_sa.encrypt(
798                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
799                 seq_num=0x100000005,
800             )
801             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
802
803             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
804
805             #
806             # A packet that has seq num between (2^32-64) and 5 is within
807             # the window
808             #
809             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
810             pkt = Ether(
811                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
812             ) / p.scapy_tra_sa.encrypt(
813                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
814                 seq_num=0xFFFFFFFD,
815             )
816             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
817             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
818
819             #
820             # While in case A we cannot wrap the high sequence number again
821             # because VPP will consider this packet to be one that moves the
822             # window forward
823             #
824             pkt = Ether(
825                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
826             ) / p.scapy_tra_sa.encrypt(
827                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
828                 seq_num=0x200000999,
829             )
830             self.send_and_assert_no_replies(
831                 self.tra_if, [pkt], self.tra_if, timeout=0.2
832             )
833
834             hash_failed_count += 1
835             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
836
837             #
838             # but if we move the window forward to case B, then we can wrap
839             # again
840             #
841             p.scapy_tra_sa.seq_num = 0x100000555
842             pkt = Ether(
843                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
844             ) / p.scapy_tra_sa.encrypt(
845                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
846                 seq_num=0x100000555,
847             )
848             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
849             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
850
851             p.scapy_tra_sa.seq_num = 0x200000444
852             pkt = Ether(
853                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
854             ) / p.scapy_tra_sa.encrypt(
855                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
856                 seq_num=0x200000444,
857             )
858             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
859             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
860
861         else:
862             #
863             # without ESN TX sequence numbers can't wrap and packets are
864             # dropped from here on out.
865             #
866             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
867             seq_cycle_count += len(pkts)
868             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
869
870         # move the security-associations seq number on to the last we used
871         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
872         p.scapy_tra_sa.seq_num = 351
873         p.vpp_tra_sa.seq_num = 351
874
875     def verify_tra_lost(self):
876         p = self.params[socket.AF_INET]
877         esn_en = p.vpp_tra_sa.esn_en
878
879         #
880         # send packets with seq numbers 1->34
881         # this means the window size is still in Case B (see RFC4303
882         # Appendix A)
883         #
884         # for reasons i haven't investigated Scapy won't create a packet with
885         # seq_num=0
886         #
887         pkts = [
888             (
889                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
890                 / p.scapy_tra_sa.encrypt(
891                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
892                     seq_num=seq,
893                 )
894             )
895             for seq in range(1, 3)
896         ]
897         self.send_and_expect(self.tra_if, pkts, self.tra_if)
898
899         self.assertEqual(p.tra_sa_out.get_lost(), 0)
900
901         # skip a sequence number
902         pkts = [
903             (
904                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
905                 / p.scapy_tra_sa.encrypt(
906                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
907                     seq_num=seq,
908                 )
909             )
910             for seq in range(4, 6)
911         ]
912         self.send_and_expect(self.tra_if, pkts, self.tra_if)
913
914         self.assertEqual(p.tra_sa_out.get_lost(), 0)
915
916         # the lost packet are counted untill we get up past the first
917         # sizeof(replay_window) packets
918         pkts = [
919             (
920                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
921                 / p.scapy_tra_sa.encrypt(
922                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
923                     seq_num=seq,
924                 )
925             )
926             for seq in range(6, 100)
927         ]
928         self.send_and_expect(self.tra_if, pkts, self.tra_if)
929
930         self.assertEqual(p.tra_sa_out.get_lost(), 1)
931
932         # lost of holes in the sequence
933         pkts = [
934             (
935                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
936                 / p.scapy_tra_sa.encrypt(
937                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
938                     seq_num=seq,
939                 )
940             )
941             for seq in range(100, 200, 2)
942         ]
943         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
944
945         pkts = [
946             (
947                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
948                 / p.scapy_tra_sa.encrypt(
949                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
950                     seq_num=seq,
951                 )
952             )
953             for seq in range(200, 300)
954         ]
955         self.send_and_expect(self.tra_if, pkts, self.tra_if)
956
957         self.assertEqual(p.tra_sa_out.get_lost(), 51)
958
959         # a big hole in the seq number space
960         pkts = [
961             (
962                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
963                 / p.scapy_tra_sa.encrypt(
964                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
965                     seq_num=seq,
966                 )
967             )
968             for seq in range(400, 500)
969         ]
970         self.send_and_expect(self.tra_if, pkts, self.tra_if)
971
972         self.assertEqual(p.tra_sa_out.get_lost(), 151)
973
974     def verify_tra_basic4(self, count=1, payload_size=54):
975         """ipsec v4 transport basic test"""
976         self.vapi.cli("clear errors")
977         self.vapi.cli("clear ipsec sa")
978         try:
979             p = self.params[socket.AF_INET]
980             send_pkts = self.gen_encrypt_pkts(
981                 p,
982                 p.scapy_tra_sa,
983                 self.tra_if,
984                 src=self.tra_if.remote_ip4,
985                 dst=self.tra_if.local_ip4,
986                 count=count,
987                 payload_size=payload_size,
988             )
989             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
990             for rx in recv_pkts:
991                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
992                 self.assert_packet_checksums_valid(rx)
993                 try:
994                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
995                     self.assert_packet_checksums_valid(decrypted)
996                 except:
997                     self.logger.debug(ppp("Unexpected packet:", rx))
998                     raise
999         finally:
1000             self.logger.info(self.vapi.ppcli("show error"))
1001             self.logger.info(self.vapi.ppcli("show ipsec all"))
1002
1003         pkts = p.tra_sa_in.get_stats()["packets"]
1004         self.assertEqual(
1005             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1006         )
1007         pkts = p.tra_sa_out.get_stats()["packets"]
1008         self.assertEqual(
1009             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1010         )
1011         self.assertEqual(p.tra_sa_out.get_lost(), 0)
1012         self.assertEqual(p.tra_sa_in.get_lost(), 0)
1013
1014         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1015         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1016
1017
1018 class IpsecTra4Tests(IpsecTra4):
1019     """UT test methods for Transport v4"""
1020
1021     def test_tra_anti_replay(self):
1022         """ipsec v4 transport anti-replay test"""
1023         self.verify_tra_anti_replay()
1024
1025     def test_tra_lost(self):
1026         """ipsec v4 transport lost packet test"""
1027         self.verify_tra_lost()
1028
1029     def test_tra_basic(self, count=1):
1030         """ipsec v4 transport basic test"""
1031         self.verify_tra_basic4(count=1)
1032
1033     def test_tra_burst(self):
1034         """ipsec v4 transport burst test"""
1035         self.verify_tra_basic4(count=257)
1036
1037
1038 class IpsecTra6(object):
1039     """verify methods for Transport v6"""
1040
1041     def verify_tra_basic6(self, count=1, payload_size=54):
1042         self.vapi.cli("clear errors")
1043         self.vapi.cli("clear ipsec sa")
1044         try:
1045             p = self.params[socket.AF_INET6]
1046             send_pkts = self.gen_encrypt_pkts6(
1047                 p,
1048                 p.scapy_tra_sa,
1049                 self.tra_if,
1050                 src=self.tra_if.remote_ip6,
1051                 dst=self.tra_if.local_ip6,
1052                 count=count,
1053                 payload_size=payload_size,
1054             )
1055             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1056             for rx in recv_pkts:
1057                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1058                 try:
1059                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1060                     self.assert_packet_checksums_valid(decrypted)
1061                 except:
1062                     self.logger.debug(ppp("Unexpected packet:", rx))
1063                     raise
1064         finally:
1065             self.logger.info(self.vapi.ppcli("show error"))
1066             self.logger.info(self.vapi.ppcli("show ipsec all"))
1067
1068         pkts = p.tra_sa_in.get_stats()["packets"]
1069         self.assertEqual(
1070             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1071         )
1072         pkts = p.tra_sa_out.get_stats()["packets"]
1073         self.assertEqual(
1074             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1075         )
1076         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1077         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1078
1079     def gen_encrypt_pkts_ext_hdrs6(
1080         self, sa, sw_intf, src, dst, count=1, payload_size=54
1081     ):
1082         return [
1083             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1084             / sa.encrypt(
1085                 IPv6(src=src, dst=dst)
1086                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1087             )
1088             for i in range(count)
1089         ]
1090
1091     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1092         return [
1093             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1094             / IPv6(src=src, dst=dst)
1095             / IPv6ExtHdrHopByHop()
1096             / IPv6ExtHdrFragment(id=2, offset=200)
1097             / Raw(b"\xff" * 200)
1098             for i in range(count)
1099         ]
1100
1101     def verify_tra_encrypted6(self, p, sa, rxs):
1102         decrypted = []
1103         for rx in rxs:
1104             self.assert_packet_checksums_valid(rx)
1105             try:
1106                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1107                 decrypted.append(decrypt_pkt)
1108                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1109                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1110             except:
1111                 self.logger.debug(ppp("Unexpected packet:", rx))
1112                 try:
1113                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1114                 except:
1115                     pass
1116                 raise
1117         return decrypted
1118
1119     def verify_tra_66_ext_hdrs(self, p):
1120         count = 63
1121
1122         #
1123         # check we can decrypt with options
1124         #
1125         tx = self.gen_encrypt_pkts_ext_hdrs6(
1126             p.scapy_tra_sa,
1127             self.tra_if,
1128             src=self.tra_if.remote_ip6,
1129             dst=self.tra_if.local_ip6,
1130             count=count,
1131         )
1132         self.send_and_expect(self.tra_if, tx, self.tra_if)
1133
1134         #
1135         # injecting a packet from ourselves to be routed of box is a hack
1136         # but it matches an outbout policy, alors je ne regrette rien
1137         #
1138
1139         # one extension before ESP
1140         tx = (
1141             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1142             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1143             / IPv6ExtHdrFragment(id=2, offset=200)
1144             / Raw(b"\xff" * 200)
1145         )
1146
1147         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1148         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1149
1150         for dc in dcs:
1151             # for reasons i'm not going to investigate scapy does not
1152             # created the correct headers after decrypt. but reparsing
1153             # the ipv6 packet fixes it
1154             dc = IPv6(raw(dc[IPv6]))
1155             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1156
1157         # two extensions before ESP
1158         tx = (
1159             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1160             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1161             / IPv6ExtHdrHopByHop()
1162             / IPv6ExtHdrFragment(id=2, offset=200)
1163             / Raw(b"\xff" * 200)
1164         )
1165
1166         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1167         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1168
1169         for dc in dcs:
1170             dc = IPv6(raw(dc[IPv6]))
1171             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1172             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1173
1174         # two extensions before ESP, one after
1175         tx = (
1176             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1177             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1178             / IPv6ExtHdrHopByHop()
1179             / IPv6ExtHdrFragment(id=2, offset=200)
1180             / IPv6ExtHdrDestOpt()
1181             / Raw(b"\xff" * 200)
1182         )
1183
1184         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1185         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1186
1187         for dc in dcs:
1188             dc = IPv6(raw(dc[IPv6]))
1189             self.assertTrue(dc[IPv6ExtHdrDestOpt])
1190             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1191             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1192
1193
1194 class IpsecTra6Tests(IpsecTra6):
1195     """UT test methods for Transport v6"""
1196
1197     def test_tra_basic6(self):
1198         """ipsec v6 transport basic test"""
1199         self.verify_tra_basic6(count=1)
1200
1201     def test_tra_burst6(self):
1202         """ipsec v6 transport burst test"""
1203         self.verify_tra_basic6(count=257)
1204
1205
1206 class IpsecTra6ExtTests(IpsecTra6):
1207     def test_tra_ext_hdrs_66(self):
1208         """ipsec 6o6 tra extension headers test"""
1209         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1210
1211
1212 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1213     """UT test methods for Transport v6 and v4"""
1214
1215     pass
1216
1217
1218 class IpsecTun4(object):
1219     """verify methods for Tunnel v4"""
1220
1221     def verify_counters4(self, p, count, n_frags=None, worker=None):
1222         if not n_frags:
1223             n_frags = count
1224         if hasattr(p, "spd_policy_in_any"):
1225             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1226             self.assertEqual(
1227                 pkts,
1228                 count,
1229                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1230             )
1231
1232         if hasattr(p, "tun_sa_in"):
1233             pkts = p.tun_sa_in.get_stats(worker)["packets"]
1234             self.assertEqual(
1235                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1236             )
1237             pkts = p.tun_sa_out.get_stats(worker)["packets"]
1238             self.assertEqual(
1239                 pkts,
1240                 n_frags,
1241                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1242             )
1243
1244         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1245         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1246
1247     def verify_decrypted(self, p, rxs):
1248         for rx in rxs:
1249             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1250             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1251             self.assert_packet_checksums_valid(rx)
1252
1253     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1254         align = sa.crypt_algo.block_size
1255         if align < 4:
1256             align = 4
1257         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1258         exp_len += sa.crypt_algo.iv_size
1259         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1260         self.assertEqual(exp_len, len(esp_payload))
1261
1262     def verify_encrypted(self, p, sa, rxs):
1263         decrypt_pkts = []
1264         for rx in rxs:
1265             if p.nat_header:
1266                 self.assertEqual(rx[UDP].dport, 4500)
1267             self.assert_packet_checksums_valid(rx)
1268             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1269             try:
1270                 rx_ip = rx[IP]
1271                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1272                 if not decrypt_pkt.haslayer(IP):
1273                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
1274                 if rx_ip.proto == socket.IPPROTO_ESP:
1275                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1276                 decrypt_pkts.append(decrypt_pkt)
1277                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1278                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1279             except:
1280                 self.logger.debug(ppp("Unexpected packet:", rx))
1281                 try:
1282                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1283                 except:
1284                     pass
1285                 raise
1286         pkts = reassemble4(decrypt_pkts)
1287         for pkt in pkts:
1288             self.assert_packet_checksums_valid(pkt)
1289
1290     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1291         self.vapi.cli("clear errors")
1292         self.vapi.cli("clear ipsec counters")
1293         self.vapi.cli("clear ipsec sa")
1294         if not n_rx:
1295             n_rx = count
1296         try:
1297             send_pkts = self.gen_encrypt_pkts(
1298                 p,
1299                 p.scapy_tun_sa,
1300                 self.tun_if,
1301                 src=p.remote_tun_if_host,
1302                 dst=self.pg1.remote_ip4,
1303                 count=count,
1304                 payload_size=payload_size,
1305             )
1306             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1307             self.verify_decrypted(p, recv_pkts)
1308
1309             send_pkts = self.gen_pkts(
1310                 self.pg1,
1311                 src=self.pg1.remote_ip4,
1312                 dst=p.remote_tun_if_host,
1313                 count=count,
1314                 payload_size=payload_size,
1315             )
1316             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1317             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1318
1319             for rx in recv_pkts:
1320                 self.assertEqual(rx[IP].src, p.tun_src)
1321                 self.assertEqual(rx[IP].dst, p.tun_dst)
1322
1323         finally:
1324             self.logger.info(self.vapi.ppcli("show error"))
1325             self.logger.info(self.vapi.ppcli("show ipsec all"))
1326
1327         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1328         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1329         self.verify_counters4(p, count, n_rx)
1330
1331     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1332         self.vapi.cli("clear errors")
1333         if not n_rx:
1334             n_rx = count
1335         try:
1336             send_pkts = self.gen_encrypt_pkts(
1337                 p,
1338                 p.scapy_tun_sa,
1339                 self.tun_if,
1340                 src=p.remote_tun_if_host,
1341                 dst=self.pg1.remote_ip4,
1342                 count=count,
1343             )
1344             self.send_and_assert_no_replies(self.tun_if, send_pkts)
1345
1346             send_pkts = self.gen_pkts(
1347                 self.pg1,
1348                 src=self.pg1.remote_ip4,
1349                 dst=p.remote_tun_if_host,
1350                 count=count,
1351                 payload_size=payload_size,
1352             )
1353             self.send_and_assert_no_replies(self.pg1, send_pkts)
1354
1355         finally:
1356             self.logger.info(self.vapi.ppcli("show error"))
1357             self.logger.info(self.vapi.ppcli("show ipsec all"))
1358
1359     def verify_tun_reass_44(self, p):
1360         self.vapi.cli("clear errors")
1361         self.vapi.ip_reassembly_enable_disable(
1362             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1363         )
1364
1365         try:
1366             send_pkts = self.gen_encrypt_pkts(
1367                 p,
1368                 p.scapy_tun_sa,
1369                 self.tun_if,
1370                 src=p.remote_tun_if_host,
1371                 dst=self.pg1.remote_ip4,
1372                 payload_size=1900,
1373                 count=1,
1374             )
1375             send_pkts = fragment_rfc791(send_pkts[0], 1400)
1376             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1377             self.verify_decrypted(p, recv_pkts)
1378
1379             send_pkts = self.gen_pkts(
1380                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1381             )
1382             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1383             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1384
1385         finally:
1386             self.logger.info(self.vapi.ppcli("show error"))
1387             self.logger.info(self.vapi.ppcli("show ipsec all"))
1388
1389         self.verify_counters4(p, 1, 1)
1390         self.vapi.ip_reassembly_enable_disable(
1391             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1392         )
1393
1394     def verify_tun_64(self, p, count=1):
1395         self.vapi.cli("clear errors")
1396         self.vapi.cli("clear ipsec sa")
1397         try:
1398             send_pkts = self.gen_encrypt_pkts6(
1399                 p,
1400                 p.scapy_tun_sa,
1401                 self.tun_if,
1402                 src=p.remote_tun_if_host6,
1403                 dst=self.pg1.remote_ip6,
1404                 count=count,
1405             )
1406             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1407             for recv_pkt in recv_pkts:
1408                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1409                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1410                 self.assert_packet_checksums_valid(recv_pkt)
1411             send_pkts = self.gen_pkts6(
1412                 p,
1413                 self.pg1,
1414                 src=self.pg1.remote_ip6,
1415                 dst=p.remote_tun_if_host6,
1416                 count=count,
1417             )
1418             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1419             for recv_pkt in recv_pkts:
1420                 try:
1421                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1422                     if not decrypt_pkt.haslayer(IPv6):
1423                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1424                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1425                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1426                     self.assert_packet_checksums_valid(decrypt_pkt)
1427                 except:
1428                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
1429                     try:
1430                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1431                     except:
1432                         pass
1433                     raise
1434         finally:
1435             self.logger.info(self.vapi.ppcli("show error"))
1436             self.logger.info(self.vapi.ppcli("show ipsec all"))
1437
1438         self.verify_counters4(p, count)
1439
1440     def verify_keepalive(self, p):
1441         # the sizeof Raw is calculated to pad to the minimum ehternet
1442         # frame size of 64 btyes
1443         pkt = (
1444             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1445             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1446             / UDP(sport=333, dport=4500)
1447             / Raw(b"\xff")
1448             / Padding(0 * 21)
1449         )
1450         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1451         self.assert_error_counter_equal(
1452             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
1453         )
1454
1455         pkt = (
1456             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1457             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1458             / UDP(sport=333, dport=4500)
1459             / Raw(b"\xfe")
1460         )
1461         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1462         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
1463
1464         pkt = (
1465             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1466             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1467             / UDP(sport=333, dport=4500)
1468             / Raw(b"\xfe")
1469             / Padding(0 * 21)
1470         )
1471         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1472         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
1473
1474
1475 class IpsecTun4Tests(IpsecTun4):
1476     """UT test methods for Tunnel v4"""
1477
1478     def test_tun_basic44(self):
1479         """ipsec 4o4 tunnel basic test"""
1480         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1481         self.tun_if.admin_down()
1482         self.tun_if.resolve_arp()
1483         self.tun_if.admin_up()
1484         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1485
1486     def test_tun_reass_basic44(self):
1487         """ipsec 4o4 tunnel basic reassembly test"""
1488         self.verify_tun_reass_44(self.params[socket.AF_INET])
1489
1490     def test_tun_burst44(self):
1491         """ipsec 4o4 tunnel burst test"""
1492         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1493
1494
1495 class IpsecTun6(object):
1496     """verify methods for Tunnel v6"""
1497
1498     def verify_counters6(self, p_in, p_out, count, worker=None):
1499         if hasattr(p_in, "tun_sa_in"):
1500             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1501             self.assertEqual(
1502                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1503             )
1504         if hasattr(p_out, "tun_sa_out"):
1505             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1506             self.assertEqual(
1507                 pkts,
1508                 count,
1509                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1510             )
1511         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1512         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1513
1514     def verify_decrypted6(self, p, rxs):
1515         for rx in rxs:
1516             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1517             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1518             self.assert_packet_checksums_valid(rx)
1519
1520     def verify_encrypted6(self, p, sa, rxs):
1521         for rx in rxs:
1522             self.assert_packet_checksums_valid(rx)
1523             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1524             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1525             if p.outer_flow_label:
1526                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1527             try:
1528                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1529                 if not decrypt_pkt.haslayer(IPv6):
1530                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1531                 self.assert_packet_checksums_valid(decrypt_pkt)
1532                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1533                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1534                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1535                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1536             except:
1537                 self.logger.debug(ppp("Unexpected packet:", rx))
1538                 try:
1539                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1540                 except:
1541                     pass
1542                 raise
1543
1544     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1545         self.vapi.cli("clear errors")
1546         self.vapi.cli("clear ipsec sa")
1547
1548         send_pkts = self.gen_pkts6(
1549             p_in,
1550             self.pg1,
1551             src=self.pg1.remote_ip6,
1552             dst=p_in.remote_tun_if_host,
1553             count=count,
1554             payload_size=payload_size,
1555         )
1556         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1557         self.logger.info(self.vapi.cli("sh punt stats"))
1558
1559     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1560         self.vapi.cli("clear errors")
1561         self.vapi.cli("clear ipsec sa")
1562
1563         send_pkts = self.gen_encrypt_pkts6(
1564             p_in,
1565             p_in.scapy_tun_sa,
1566             self.tun_if,
1567             src=p_in.remote_tun_if_host,
1568             dst=self.pg1.remote_ip6,
1569             count=count,
1570         )
1571         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1572
1573     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1574         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1575         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1576
1577     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1578         self.vapi.cli("clear errors")
1579         self.vapi.cli("clear ipsec sa")
1580         if not p_out:
1581             p_out = p_in
1582         try:
1583             send_pkts = self.gen_encrypt_pkts6(
1584                 p_in,
1585                 p_in.scapy_tun_sa,
1586                 self.tun_if,
1587                 src=p_in.remote_tun_if_host,
1588                 dst=self.pg1.remote_ip6,
1589                 count=count,
1590                 payload_size=payload_size,
1591             )
1592             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1593             self.verify_decrypted6(p_in, recv_pkts)
1594
1595             send_pkts = self.gen_pkts6(
1596                 p_in,
1597                 self.pg1,
1598                 src=self.pg1.remote_ip6,
1599                 dst=p_out.remote_tun_if_host,
1600                 count=count,
1601                 payload_size=payload_size,
1602             )
1603             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1604             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1605
1606             for rx in recv_pkts:
1607                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1608                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1609
1610         finally:
1611             self.logger.info(self.vapi.ppcli("show error"))
1612             self.logger.info(self.vapi.ppcli("show ipsec all"))
1613         self.verify_counters6(p_in, p_out, count)
1614
1615     def verify_tun_reass_66(self, p):
1616         self.vapi.cli("clear errors")
1617         self.vapi.ip_reassembly_enable_disable(
1618             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1619         )
1620
1621         try:
1622             send_pkts = self.gen_encrypt_pkts6(
1623                 p,
1624                 p.scapy_tun_sa,
1625                 self.tun_if,
1626                 src=p.remote_tun_if_host,
1627                 dst=self.pg1.remote_ip6,
1628                 count=1,
1629                 payload_size=1850,
1630             )
1631             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1632             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1633             self.verify_decrypted6(p, recv_pkts)
1634
1635             send_pkts = self.gen_pkts6(
1636                 p,
1637                 self.pg1,
1638                 src=self.pg1.remote_ip6,
1639                 dst=p.remote_tun_if_host,
1640                 count=1,
1641                 payload_size=64,
1642             )
1643             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1644             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1645         finally:
1646             self.logger.info(self.vapi.ppcli("show error"))
1647             self.logger.info(self.vapi.ppcli("show ipsec all"))
1648         self.verify_counters6(p, p, 1)
1649         self.vapi.ip_reassembly_enable_disable(
1650             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1651         )
1652
1653     def verify_tun_46(self, p, count=1):
1654         """ipsec 4o6 tunnel basic test"""
1655         self.vapi.cli("clear errors")
1656         self.vapi.cli("clear ipsec sa")
1657         try:
1658             send_pkts = self.gen_encrypt_pkts(
1659                 p,
1660                 p.scapy_tun_sa,
1661                 self.tun_if,
1662                 src=p.remote_tun_if_host4,
1663                 dst=self.pg1.remote_ip4,
1664                 count=count,
1665             )
1666             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1667             for recv_pkt in recv_pkts:
1668                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1669                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1670                 self.assert_packet_checksums_valid(recv_pkt)
1671             send_pkts = self.gen_pkts(
1672                 self.pg1,
1673                 src=self.pg1.remote_ip4,
1674                 dst=p.remote_tun_if_host4,
1675                 count=count,
1676             )
1677             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1678             for recv_pkt in recv_pkts:
1679                 try:
1680                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1681                     if not decrypt_pkt.haslayer(IP):
1682                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1683                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1684                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1685                     self.assert_packet_checksums_valid(decrypt_pkt)
1686                 except:
1687                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1688                     try:
1689                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1690                     except:
1691                         pass
1692                     raise
1693         finally:
1694             self.logger.info(self.vapi.ppcli("show error"))
1695             self.logger.info(self.vapi.ppcli("show ipsec all"))
1696         self.verify_counters6(p, p, count)
1697
1698     def verify_keepalive(self, p):
1699         # the sizeof Raw is calculated to pad to the minimum ehternet
1700         # frame size of 64 btyes
1701         pkt = (
1702             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1703             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1704             / UDP(sport=333, dport=4500)
1705             / Raw(b"\xff")
1706             / Padding(0 * 1)
1707         )
1708         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1709         self.assert_error_counter_equal(
1710             "/err/%s/nat_keepalive" % self.tun6_input_node, 31
1711         )
1712
1713         pkt = (
1714             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1715             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1716             / UDP(sport=333, dport=4500)
1717             / Raw(b"\xfe")
1718         )
1719         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1720         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
1721
1722         pkt = (
1723             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1724             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1725             / UDP(sport=333, dport=4500)
1726             / Raw(b"\xfe")
1727             / Padding(0 * 21)
1728         )
1729         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1730         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
1731
1732
1733 class IpsecTun6Tests(IpsecTun6):
1734     """UT test methods for Tunnel v6"""
1735
1736     def test_tun_basic66(self):
1737         """ipsec 6o6 tunnel basic test"""
1738         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1739
1740     def test_tun_reass_basic66(self):
1741         """ipsec 6o6 tunnel basic reassembly test"""
1742         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1743
1744     def test_tun_burst66(self):
1745         """ipsec 6o6 tunnel burst test"""
1746         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1747
1748
1749 class IpsecTun6HandoffTests(IpsecTun6):
1750     """UT test methods for Tunnel v6 with multiple workers"""
1751
1752     vpp_worker_count = 2
1753
1754     def test_tun_handoff_66(self):
1755         """ipsec 6o6 tunnel worker hand-off test"""
1756         self.vapi.cli("clear errors")
1757         self.vapi.cli("clear ipsec sa")
1758
1759         N_PKTS = 15
1760         p = self.params[socket.AF_INET6]
1761
1762         # inject alternately on worker 0 and 1. all counts on the SA
1763         # should be against worker 0
1764         for worker in [0, 1, 0, 1]:
1765             send_pkts = self.gen_encrypt_pkts6(
1766                 p,
1767                 p.scapy_tun_sa,
1768                 self.tun_if,
1769                 src=p.remote_tun_if_host,
1770                 dst=self.pg1.remote_ip6,
1771                 count=N_PKTS,
1772             )
1773             recv_pkts = self.send_and_expect(
1774                 self.tun_if, send_pkts, self.pg1, worker=worker
1775             )
1776             self.verify_decrypted6(p, recv_pkts)
1777
1778             send_pkts = self.gen_pkts6(
1779                 p,
1780                 self.pg1,
1781                 src=self.pg1.remote_ip6,
1782                 dst=p.remote_tun_if_host,
1783                 count=N_PKTS,
1784             )
1785             recv_pkts = self.send_and_expect(
1786                 self.pg1, send_pkts, self.tun_if, worker=worker
1787             )
1788             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1789
1790         # all counts against the first worker that was used
1791         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1792
1793
1794 class IpsecTun4HandoffTests(IpsecTun4):
1795     """UT test methods for Tunnel v4 with multiple workers"""
1796
1797     vpp_worker_count = 2
1798
1799     def test_tun_handooff_44(self):
1800         """ipsec 4o4 tunnel worker hand-off test"""
1801         self.vapi.cli("clear errors")
1802         self.vapi.cli("clear ipsec sa")
1803
1804         N_PKTS = 15
1805         p = self.params[socket.AF_INET]
1806
1807         # inject alternately on worker 0 and 1. all counts on the SA
1808         # should be against worker 0
1809         for worker in [0, 1, 0, 1]:
1810             send_pkts = self.gen_encrypt_pkts(
1811                 p,
1812                 p.scapy_tun_sa,
1813                 self.tun_if,
1814                 src=p.remote_tun_if_host,
1815                 dst=self.pg1.remote_ip4,
1816                 count=N_PKTS,
1817             )
1818             recv_pkts = self.send_and_expect(
1819                 self.tun_if, send_pkts, self.pg1, worker=worker
1820             )
1821             self.verify_decrypted(p, recv_pkts)
1822
1823             send_pkts = self.gen_pkts(
1824                 self.pg1,
1825                 src=self.pg1.remote_ip4,
1826                 dst=p.remote_tun_if_host,
1827                 count=N_PKTS,
1828             )
1829             recv_pkts = self.send_and_expect(
1830                 self.pg1, send_pkts, self.tun_if, worker=worker
1831             )
1832             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1833
1834         # all counts against the first worker that was used
1835         self.verify_counters4(p, 4 * N_PKTS, worker=0)
1836
1837
1838 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1839     """UT test methods for Tunnel v6 & v4"""
1840
1841     pass
1842
1843
1844 class IPSecIPv4Fwd(VppTestCase):
1845     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1846
1847     @classmethod
1848     def setUpConstants(cls):
1849         super(IPSecIPv4Fwd, cls).setUpConstants()
1850
1851     def setUp(self):
1852         super(IPSecIPv4Fwd, self).setUp()
1853         # store SPD objects so we can remove configs on tear down
1854         self.spd_objs = []
1855         self.spd_policies = []
1856
1857     def tearDown(self):
1858         # remove SPD policies
1859         for obj in self.spd_policies:
1860             obj.remove_vpp_config()
1861         self.spd_policies = []
1862         # remove SPD items (interface bindings first, then SPD)
1863         for obj in reversed(self.spd_objs):
1864             obj.remove_vpp_config()
1865         self.spd_objs = []
1866         # close down pg intfs
1867         for pg in self.pg_interfaces:
1868             pg.unconfig_ip4()
1869             pg.admin_down()
1870         super(IPSecIPv4Fwd, self).tearDown()
1871
1872     def create_interfaces(self, num_ifs=2):
1873         # create interfaces pg0 ... pg<num_ifs>
1874         self.create_pg_interfaces(range(num_ifs))
1875         for pg in self.pg_interfaces:
1876             # put the interface up
1877             pg.admin_up()
1878             # configure IPv4 address on the interface
1879             pg.config_ip4()
1880             # resolve ARP, so that we know VPP MAC
1881             pg.resolve_arp()
1882         self.logger.info(self.vapi.ppcli("show int addr"))
1883
1884     def spd_create_and_intf_add(self, spd_id, pg_list):
1885         spd = VppIpsecSpd(self, spd_id)
1886         spd.add_vpp_config()
1887         self.spd_objs.append(spd)
1888         for pg in pg_list:
1889             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1890             spdItf.add_vpp_config()
1891             self.spd_objs.append(spdItf)
1892
1893     def get_policy(self, policy_type):
1894         e = VppEnum.vl_api_ipsec_spd_action_t
1895         if policy_type == "protect":
1896             return e.IPSEC_API_SPD_ACTION_PROTECT
1897         elif policy_type == "bypass":
1898             return e.IPSEC_API_SPD_ACTION_BYPASS
1899         elif policy_type == "discard":
1900             return e.IPSEC_API_SPD_ACTION_DISCARD
1901         else:
1902             raise Exception("Invalid policy type: %s", policy_type)
1903
1904     def spd_add_rem_policy(
1905         self,
1906         spd_id,
1907         src_if,
1908         dst_if,
1909         proto,
1910         is_out,
1911         priority,
1912         policy_type,
1913         remove=False,
1914         all_ips=False,
1915         ip_range=False,
1916         local_ip_start=ip_address("0.0.0.0"),
1917         local_ip_stop=ip_address("255.255.255.255"),
1918         remote_ip_start=ip_address("0.0.0.0"),
1919         remote_ip_stop=ip_address("255.255.255.255"),
1920         remote_port_start=0,
1921         remote_port_stop=65535,
1922         local_port_start=0,
1923         local_port_stop=65535,
1924     ):
1925         spd = VppIpsecSpd(self, spd_id)
1926
1927         if all_ips:
1928             src_range_low = ip_address("0.0.0.0")
1929             src_range_high = ip_address("255.255.255.255")
1930             dst_range_low = ip_address("0.0.0.0")
1931             dst_range_high = ip_address("255.255.255.255")
1932
1933         elif ip_range:
1934             src_range_low = local_ip_start
1935             src_range_high = local_ip_stop
1936             dst_range_low = remote_ip_start
1937             dst_range_high = remote_ip_stop
1938
1939         else:
1940             src_range_low = src_if.remote_ip4
1941             src_range_high = src_if.remote_ip4
1942             dst_range_low = dst_if.remote_ip4
1943             dst_range_high = dst_if.remote_ip4
1944
1945         spdEntry = VppIpsecSpdEntry(
1946             self,
1947             spd,
1948             0,
1949             src_range_low,
1950             src_range_high,
1951             dst_range_low,
1952             dst_range_high,
1953             proto,
1954             priority=priority,
1955             policy=self.get_policy(policy_type),
1956             is_outbound=is_out,
1957             remote_port_start=remote_port_start,
1958             remote_port_stop=remote_port_stop,
1959             local_port_start=local_port_start,
1960             local_port_stop=local_port_stop,
1961         )
1962
1963         if remove is False:
1964             spdEntry.add_vpp_config()
1965             self.spd_policies.append(spdEntry)
1966         else:
1967             spdEntry.remove_vpp_config()
1968             self.spd_policies.remove(spdEntry)
1969         self.logger.info(self.vapi.ppcli("show ipsec all"))
1970         return spdEntry
1971
1972     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
1973         packets = []
1974         for i in range(pkt_count):
1975             # create packet info stored in the test case instance
1976             info = self.create_packet_info(src_if, dst_if)
1977             # convert the info into packet payload
1978             payload = self.info_to_payload(info)
1979             # create the packet itself
1980             p = (
1981                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1982                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
1983                 / UDP(sport=src_prt, dport=dst_prt)
1984                 / Raw(payload)
1985             )
1986             # store a copy of the packet in the packet info
1987             info.data = p.copy()
1988             # append the packet to the list
1989             packets.append(p)
1990         # return the created packet list
1991         return packets
1992
1993     def verify_capture(self, src_if, dst_if, capture):
1994         packet_info = None
1995         for packet in capture:
1996             try:
1997                 ip = packet[IP]
1998                 udp = packet[UDP]
1999                 # convert the payload to packet info object
2000                 payload_info = self.payload_to_info(packet)
2001                 # make sure the indexes match
2002                 self.assert_equal(
2003                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2004                 )
2005                 self.assert_equal(
2006                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2007                 )
2008                 packet_info = self.get_next_packet_info_for_interface2(
2009                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2010                 )
2011                 # make sure we didn't run out of saved packets
2012                 self.assertIsNotNone(packet_info)
2013                 self.assert_equal(
2014                     payload_info.index, packet_info.index, "packet info index"
2015                 )
2016                 saved_packet = packet_info.data  # fetch the saved packet
2017                 # assert the values match
2018                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2019                 # ... more assertions here
2020                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2021             except Exception as e:
2022                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2023                 raise
2024         remaining_packet = self.get_next_packet_info_for_interface2(
2025             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2026         )
2027         self.assertIsNone(
2028             remaining_packet,
2029             "Interface %s: Packet expected from interface "
2030             "%s didn't arrive" % (dst_if.name, src_if.name),
2031         )
2032
2033     def verify_policy_match(self, pkt_count, spdEntry):
2034         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2035         matched_pkts = spdEntry.get_stats().get("packets")
2036         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2037         self.assert_equal(pkt_count, matched_pkts)
2038
2039
2040 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2041     @classmethod
2042     def setUpConstants(cls):
2043         super(SpdFlowCacheTemplate, cls).setUpConstants()
2044         # Override this method with required cmdline parameters e.g.
2045         # cls.vpp_cmdline.extend(["ipsec", "{",
2046         #                         "ipv4-outbound-spd-flow-cache on",
2047         #                         "}"])
2048         # cls.logger.info("VPP modified cmdline is %s" % " "
2049         #                 .join(cls.vpp_cmdline))
2050
2051     def setUp(self):
2052         super(SpdFlowCacheTemplate, self).setUp()
2053
2054     def tearDown(self):
2055         super(SpdFlowCacheTemplate, self).tearDown()
2056
2057     def get_spd_flow_cache_entries(self, outbound):
2058         """'show ipsec spd' output:
2059         ipv4-inbound-spd-flow-cache-entries: 0
2060         ipv4-outbound-spd-flow-cache-entries: 0
2061         """
2062         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2063         # match the relevant section of 'show ipsec spd' output
2064         if outbound:
2065             regex_match = re.search(
2066                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2067                 show_ipsec_reply,
2068                 re.DOTALL,
2069             )
2070         else:
2071             regex_match = re.search(
2072                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2073             )
2074         if regex_match is None:
2075             raise Exception(
2076                 "Unable to find spd flow cache entries \
2077                 in 'show ipsec spd' CLI output - regex failed to match"
2078             )
2079         else:
2080             try:
2081                 num_entries = int(regex_match.group(1))
2082             except ValueError:
2083                 raise Exception(
2084                     "Unable to get spd flow cache entries \
2085                 from 'show ipsec spd' string: %s",
2086                     regex_match.group(0),
2087                 )
2088             self.logger.info("%s", regex_match.group(0))
2089         return num_entries
2090
2091     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2092         self.assertEqual(
2093             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2094         )
2095
2096     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2097         self.assertEqual(
2098             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2099         )
2100
2101     def crc32_supported(self):
2102         # lscpu is part of util-linux package, available on all Linux Distros
2103         stream = os.popen("lscpu")
2104         cpu_info = stream.read()
2105         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2106         # see vppinfra/crc32.h
2107         if "crc32" or "sse4_2" in cpu_info:
2108             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2109             return True
2110         else:
2111             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2112             return False
2113
2114
2115 class IPSecIPv6Fwd(VppTestCase):
2116     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2117
2118     @classmethod
2119     def setUpConstants(cls):
2120         super(IPSecIPv6Fwd, cls).setUpConstants()
2121
2122     def setUp(self):
2123         super(IPSecIPv6Fwd, self).setUp()
2124         # store SPD objects so we can remove configs on tear down
2125         self.spd_objs = []
2126         self.spd_policies = []
2127
2128     def tearDown(self):
2129         # remove SPD policies
2130         for obj in self.spd_policies:
2131             obj.remove_vpp_config()
2132         self.spd_policies = []
2133         # remove SPD items (interface bindings first, then SPD)
2134         for obj in reversed(self.spd_objs):
2135             obj.remove_vpp_config()
2136         self.spd_objs = []
2137         # close down pg intfs
2138         for pg in self.pg_interfaces:
2139             pg.unconfig_ip6()
2140             pg.admin_down()
2141         super(IPSecIPv6Fwd, self).tearDown()
2142
2143     def create_interfaces(self, num_ifs=2):
2144         # create interfaces pg0 ... pg<num_ifs>
2145         self.create_pg_interfaces(range(num_ifs))
2146         for pg in self.pg_interfaces:
2147             # put the interface up
2148             pg.admin_up()
2149             # configure IPv6 address on the interface
2150             pg.config_ip6()
2151             pg.resolve_ndp()
2152         self.logger.info(self.vapi.ppcli("show int addr"))
2153
2154     def spd_create_and_intf_add(self, spd_id, pg_list):
2155         spd = VppIpsecSpd(self, spd_id)
2156         spd.add_vpp_config()
2157         self.spd_objs.append(spd)
2158         for pg in pg_list:
2159             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2160             spdItf.add_vpp_config()
2161             self.spd_objs.append(spdItf)
2162
2163     def get_policy(self, policy_type):
2164         e = VppEnum.vl_api_ipsec_spd_action_t
2165         if policy_type == "protect":
2166             return e.IPSEC_API_SPD_ACTION_PROTECT
2167         elif policy_type == "bypass":
2168             return e.IPSEC_API_SPD_ACTION_BYPASS
2169         elif policy_type == "discard":
2170             return e.IPSEC_API_SPD_ACTION_DISCARD
2171         else:
2172             raise Exception("Invalid policy type: %s", policy_type)
2173
2174     def spd_add_rem_policy(
2175         self,
2176         spd_id,
2177         src_if,
2178         dst_if,
2179         proto,
2180         is_out,
2181         priority,
2182         policy_type,
2183         remove=False,
2184         all_ips=False,
2185         ip_range=False,
2186         local_ip_start=ip_address("0::0"),
2187         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2188         remote_ip_start=ip_address("0::0"),
2189         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2190         remote_port_start=0,
2191         remote_port_stop=65535,
2192         local_port_start=0,
2193         local_port_stop=65535,
2194     ):
2195         spd = VppIpsecSpd(self, spd_id)
2196
2197         if all_ips:
2198             src_range_low = ip_address("0::0")
2199             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2200             dst_range_low = ip_address("0::0")
2201             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2202
2203         elif ip_range:
2204             src_range_low = local_ip_start
2205             src_range_high = local_ip_stop
2206             dst_range_low = remote_ip_start
2207             dst_range_high = remote_ip_stop
2208
2209         else:
2210             src_range_low = src_if.remote_ip6
2211             src_range_high = src_if.remote_ip6
2212             dst_range_low = dst_if.remote_ip6
2213             dst_range_high = dst_if.remote_ip6
2214
2215         spdEntry = VppIpsecSpdEntry(
2216             self,
2217             spd,
2218             0,
2219             src_range_low,
2220             src_range_high,
2221             dst_range_low,
2222             dst_range_high,
2223             proto,
2224             priority=priority,
2225             policy=self.get_policy(policy_type),
2226             is_outbound=is_out,
2227             remote_port_start=remote_port_start,
2228             remote_port_stop=remote_port_stop,
2229             local_port_start=local_port_start,
2230             local_port_stop=local_port_stop,
2231         )
2232
2233         if remove is False:
2234             spdEntry.add_vpp_config()
2235             self.spd_policies.append(spdEntry)
2236         else:
2237             spdEntry.remove_vpp_config()
2238             self.spd_policies.remove(spdEntry)
2239         self.logger.info(self.vapi.ppcli("show ipsec all"))
2240         return spdEntry
2241
2242     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2243         packets = []
2244         for i in range(pkt_count):
2245             # create packet info stored in the test case instance
2246             info = self.create_packet_info(src_if, dst_if)
2247             # convert the info into packet payload
2248             payload = self.info_to_payload(info)
2249             # create the packet itself
2250             p = (
2251                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2252                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2253                 / UDP(sport=src_prt, dport=dst_prt)
2254                 / Raw(payload)
2255             )
2256             # store a copy of the packet in the packet info
2257             info.data = p.copy()
2258             # append the packet to the list
2259             packets.append(p)
2260         # return the created packet list
2261         return packets
2262
2263     def verify_capture(self, src_if, dst_if, capture):
2264         packet_info = None
2265         for packet in capture:
2266             try:
2267                 ip = packet[IPv6]
2268                 udp = packet[UDP]
2269                 # convert the payload to packet info object
2270                 payload_info = self.payload_to_info(packet)
2271                 # make sure the indexes match
2272                 self.assert_equal(
2273                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2274                 )
2275                 self.assert_equal(
2276                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2277                 )
2278                 packet_info = self.get_next_packet_info_for_interface2(
2279                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2280                 )
2281                 # make sure we didn't run out of saved packets
2282                 self.assertIsNotNone(packet_info)
2283                 self.assert_equal(
2284                     payload_info.index, packet_info.index, "packet info index"
2285                 )
2286                 saved_packet = packet_info.data  # fetch the saved packet
2287                 # assert the values match
2288                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2289                 # ... more assertions here
2290                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2291             except Exception as e:
2292                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2293                 raise
2294         remaining_packet = self.get_next_packet_info_for_interface2(
2295             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2296         )
2297         self.assertIsNone(
2298             remaining_packet,
2299             "Interface %s: Packet expected from interface "
2300             "%s didn't arrive" % (dst_if.name, src_if.name),
2301         )
2302
2303     def verify_policy_match(self, pkt_count, spdEntry):
2304         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2305         matched_pkts = spdEntry.get_stats().get("packets")
2306         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2307         self.assert_equal(pkt_count, matched_pkts)
2308
2309
2310 if __name__ == "__main__":
2311     unittest.main(testRunner=VppTestRunner)