ipsec: fix SA names consistency in tests
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.auth_algo_vpp_id = (
56             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
57         )
58         self.auth_algo = "HMAC-SHA1-96"  # scapy name
59         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
60
61         self.crypt_algo_vpp_id = (
62             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
63         )
64         self.crypt_algo = "AES-CBC"  # scapy name
65         self.crypt_key = b"JPjyOWBeVEQiMe7h"
66         self.salt = 0
67         self.flags = 0
68         self.nat_header = None
69         self.tun_flags = (
70             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
71         )
72         self.dscp = 0
73         self.async_mode = False
74
75
76 class IPsecIPv6Params:
77
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.auth_algo_vpp_id = (
104             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
105         )
106         self.auth_algo = "HMAC-SHA1-96"  # scapy name
107         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
108
109         self.crypt_algo_vpp_id = (
110             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
111         )
112         self.crypt_algo = "AES-CBC"  # scapy name
113         self.crypt_key = b"JPjyOWBeVEQiMe7h"
114         self.salt = 0
115         self.flags = 0
116         self.nat_header = None
117         self.tun_flags = (
118             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
119         )
120         self.dscp = 0
121         self.async_mode = False
122
123
124 def mk_scapy_crypt_key(p):
125     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
126         return p.crypt_key + struct.pack("!I", p.salt)
127     else:
128         return p.crypt_key
129
130
131 def config_tun_params(p, encryption_type, tun_if):
132     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
133     esn_en = bool(
134         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
135     )
136     p.tun_dst = tun_if.remote_addr[p.addr_type]
137     p.tun_src = tun_if.local_addr[p.addr_type]
138     crypt_key = mk_scapy_crypt_key(p)
139     p.scapy_tun_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.scapy_tun_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147         nat_t_header=p.nat_header,
148         esn_en=esn_en,
149     )
150     p.vpp_tun_sa = SecurityAssociation(
151         encryption_type,
152         spi=p.vpp_tun_spi,
153         crypt_algo=p.crypt_algo,
154         crypt_key=crypt_key,
155         auth_algo=p.auth_algo,
156         auth_key=p.auth_key,
157         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158         nat_t_header=p.nat_header,
159         esn_en=esn_en,
160     )
161
162
163 def config_tra_params(p, encryption_type):
164     esn_en = bool(
165         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
166     )
167     crypt_key = mk_scapy_crypt_key(p)
168     p.scapy_tra_sa = SecurityAssociation(
169         encryption_type,
170         spi=p.scapy_tra_spi,
171         crypt_algo=p.crypt_algo,
172         crypt_key=crypt_key,
173         auth_algo=p.auth_algo,
174         auth_key=p.auth_key,
175         nat_t_header=p.nat_header,
176         esn_en=esn_en,
177     )
178     p.vpp_tra_sa = SecurityAssociation(
179         encryption_type,
180         spi=p.vpp_tra_spi,
181         crypt_algo=p.crypt_algo,
182         crypt_key=crypt_key,
183         auth_algo=p.auth_algo,
184         auth_key=p.auth_key,
185         nat_t_header=p.nat_header,
186         esn_en=esn_en,
187     )
188
189
190 class TemplateIpsec(VppTestCase):
191     """
192     TRANSPORT MODE::
193
194          ------   encrypt   ---
195         |tra_if| <-------> |VPP|
196          ------   decrypt   ---
197
198     TUNNEL MODE::
199
200          ------   encrypt   ---   plain   ---
201         |tun_if| <-------  |VPP| <------ |pg1|
202          ------             ---           ---
203
204          ------   decrypt   ---   plain   ---
205         |tun_if| ------->  |VPP| ------> |pg1|
206          ------             ---           ---
207     """
208
209     tun_spd_id = 1
210     tra_spd_id = 2
211
212     def ipsec_select_backend(self):
213         """empty method to be overloaded when necessary"""
214         pass
215
216     @classmethod
217     def setUpClass(cls):
218         super(TemplateIpsec, cls).setUpClass()
219
220     @classmethod
221     def tearDownClass(cls):
222         super(TemplateIpsec, cls).tearDownClass()
223
224     def setup_params(self):
225         if not hasattr(self, "ipv4_params"):
226             self.ipv4_params = IPsecIPv4Params()
227         if not hasattr(self, "ipv6_params"):
228             self.ipv6_params = IPsecIPv6Params()
229         self.params = {
230             self.ipv4_params.addr_type: self.ipv4_params,
231             self.ipv6_params.addr_type: self.ipv6_params,
232         }
233
234     def config_interfaces(self):
235         self.create_pg_interfaces(range(3))
236         self.interfaces = list(self.pg_interfaces)
237         for i in self.interfaces:
238             i.admin_up()
239             i.config_ip4()
240             i.resolve_arp()
241             i.config_ip6()
242             i.resolve_ndp()
243
244     def setUp(self):
245         super(TemplateIpsec, self).setUp()
246
247         self.setup_params()
248
249         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
251
252         self.config_interfaces()
253
254         self.ipsec_select_backend()
255
256     def unconfig_interfaces(self):
257         for i in self.interfaces:
258             i.admin_down()
259             i.unconfig_ip4()
260             i.unconfig_ip6()
261
262     def tearDown(self):
263         super(TemplateIpsec, self).tearDown()
264
265         self.unconfig_interfaces()
266
267     def show_commands_at_teardown(self):
268         self.logger.info(self.vapi.cli("show hardware"))
269
270     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
271         return [
272             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274             for i in range(count)
275         ]
276
277     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
278         return [
279             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
280             / sa.encrypt(
281                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
283             )
284             for i in range(count)
285         ]
286
287     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
288         return [
289             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290             / IP(src=src, dst=dst)
291             / ICMP()
292             / Raw(b"X" * payload_size)
293             for i in range(count)
294         ]
295
296     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
297         return [
298             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301             for i in range(count)
302         ]
303
304
305 class IpsecTcp(object):
306     def verify_tcp_checksum(self):
307         # start http cli server listener on http://0.0.0.0:80
308         self.vapi.cli("http cli server")
309         p = self.params[socket.AF_INET]
310         send = Ether(
311             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312         ) / p.scapy_tun_sa.encrypt(
313             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314             / TCP(flags="S", dport=80)
315         )
316         self.logger.debug(ppp("Sending packet:", send))
317         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
318         recv = recv[0]
319         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320         self.assert_packet_checksums_valid(decrypted)
321
322
323 class IpsecTcpTests(IpsecTcp):
324     def test_tcp_checksum(self):
325         """verify checksum correctness for vpp generated packets"""
326         self.verify_tcp_checksum()
327
328
329 class IpsecTra4(object):
330     """verify methods for Transport v4"""
331
332     def get_replay_counts(self, p):
333         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
334         count = self.statistics.get_err_counter(replay_node_name)
335
336         if p.async_mode:
337             replay_post_node_name = (
338                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
339             )
340             count += self.statistics.get_err_counter(replay_post_node_name)
341
342         return count
343
344     def get_hash_failed_counts(self, p):
345         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
346             hash_failed_node_name = (
347                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
348             )
349         else:
350             hash_failed_node_name = (
351                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
352             )
353         count = self.statistics.get_err_counter(hash_failed_node_name)
354
355         if p.async_mode:
356             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
357
358         return count
359
360     def verify_hi_seq_num(self):
361         p = self.params[socket.AF_INET]
362         saf = VppEnum.vl_api_ipsec_sad_flags_t
363         esn_on = p.vpp_tra_sa.esn_en
364         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
365
366         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
367         replay_count = self.get_replay_counts(p)
368         hash_failed_count = self.get_hash_failed_counts(p)
369         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
370
371         # a few packets so we get the rx seq number above the window size and
372         # thus can simulate a wrap with an out of window packet
373         pkts = [
374             (
375                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
376                 / p.scapy_tra_sa.encrypt(
377                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
378                     seq_num=seq,
379                 )
380             )
381             for seq in range(63, 80)
382         ]
383         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
384
385         # these 4 packets will all choose seq-num 0 to decrpyt since none
386         # are out of window when first checked. however, once #200 has
387         # decrypted it will move the window to 200 and has #81 is out of
388         # window. this packet should be dropped.
389         pkts = [
390             (
391                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
392                 / p.scapy_tra_sa.encrypt(
393                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
394                     seq_num=200,
395                 )
396             ),
397             (
398                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
399                 / p.scapy_tra_sa.encrypt(
400                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
401                     seq_num=81,
402                 )
403             ),
404             (
405                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
406                 / p.scapy_tra_sa.encrypt(
407                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
408                     seq_num=201,
409                 )
410             ),
411             (
412                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
413                 / p.scapy_tra_sa.encrypt(
414                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
415                     seq_num=202,
416                 )
417             ),
418         ]
419
420         # if anti-replay is off then we won't drop #81
421         n_rx = 3 if ar_on else 4
422         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
423         # this packet is one before the wrap
424         pkts = [
425             (
426                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
427                 / p.scapy_tra_sa.encrypt(
428                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
429                     seq_num=203,
430                 )
431             )
432         ]
433         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
434
435         # a replayed packet, then an out of window, then a legit
436         # tests that a early failure on the batch doesn't affect subsequent packets.
437         pkts = [
438             (
439                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
440                 / p.scapy_tra_sa.encrypt(
441                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
442                     seq_num=203,
443                 )
444             ),
445             (
446                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
447                 / p.scapy_tra_sa.encrypt(
448                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
449                     seq_num=81,
450                 )
451             ),
452             (
453                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
454                 / p.scapy_tra_sa.encrypt(
455                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
456                     seq_num=204,
457                 )
458             ),
459         ]
460         n_rx = 1 if ar_on else 3
461         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
462
463         # move the window over half way to a wrap
464         pkts = [
465             (
466                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
467                 / p.scapy_tra_sa.encrypt(
468                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
469                     seq_num=0x80000001,
470                 )
471             )
472         ]
473         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
474
475         # anti-replay will drop old packets, no anti-replay will not
476         pkts = [
477             (
478                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
479                 / p.scapy_tra_sa.encrypt(
480                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
481                     seq_num=0x44000001,
482                 )
483             )
484         ]
485
486         if ar_on:
487             self.send_and_assert_no_replies(self.tra_if, pkts)
488         else:
489             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
490
491         if esn_on:
492             #
493             # validate wrapping the ESN
494             #
495
496             # wrap scapy's TX SA SN
497             p.scapy_tra_sa.seq_num = 0x100000005
498
499             # send a packet that wraps the window for both AR and no AR
500             pkts = [
501                 (
502                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
503                     / p.scapy_tra_sa.encrypt(
504                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
505                         / ICMP(),
506                         seq_num=0x100000005,
507                     )
508                 )
509             ]
510
511             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
512             for rx in rxs:
513                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
514
515             # move the window forward to half way to the next wrap
516             pkts = [
517                 (
518                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
519                     / p.scapy_tra_sa.encrypt(
520                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
521                         / ICMP(),
522                         seq_num=0x180000005,
523                     )
524                 )
525             ]
526
527             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
528
529             # a packet less than 2^30 from the current position is:
530             #  - AR: out of window and dropped
531             #  - non-AR: accepted
532             pkts = [
533                 (
534                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
535                     / p.scapy_tra_sa.encrypt(
536                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
537                         / ICMP(),
538                         seq_num=0x170000005,
539                     )
540                 )
541             ]
542
543             if ar_on:
544                 self.send_and_assert_no_replies(self.tra_if, pkts)
545             else:
546                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
547
548             # a packet more than 2^30 from the current position is:
549             #  - AR: out of window and dropped
550             #  - non-AR: considered a wrap, but since it's not a wrap
551             #    it won't decrpyt and so will be dropped
552             pkts = [
553                 (
554                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
555                     / p.scapy_tra_sa.encrypt(
556                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
557                         / ICMP(),
558                         seq_num=0x130000005,
559                     )
560                 )
561             ]
562
563             self.send_and_assert_no_replies(self.tra_if, pkts)
564
565             # a packet less than 2^30 from the current position and is a
566             # wrap; (the seq is currently at 0x180000005).
567             #  - AR: out of window so considered a wrap, so accepted
568             #  - non-AR: not considered a wrap, so won't decrypt
569             p.scapy_tra_sa.seq_num = 0x260000005
570             pkts = [
571                 (
572                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
573                     / p.scapy_tra_sa.encrypt(
574                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
575                         / ICMP(),
576                         seq_num=0x260000005,
577                     )
578                 )
579             ]
580             if ar_on:
581                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
582             else:
583                 self.send_and_assert_no_replies(self.tra_if, pkts)
584
585             #
586             # window positions are different now for AR/non-AR
587             #  move non-AR forward
588             #
589             if not ar_on:
590                 # a packet more than 2^30 from the current position and is a
591                 # wrap; (the seq is currently at 0x180000005).
592                 #  - AR: accepted
593                 #  - non-AR: not considered a wrap, so won't decrypt
594
595                 pkts = [
596                     (
597                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
598                         / p.scapy_tra_sa.encrypt(
599                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
600                             / ICMP(),
601                             seq_num=0x200000005,
602                         )
603                     ),
604                     (
605                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
606                         / p.scapy_tra_sa.encrypt(
607                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
608                             / ICMP(),
609                             seq_num=0x200000006,
610                         )
611                     ),
612                 ]
613                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
614
615                 pkts = [
616                     (
617                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
618                         / p.scapy_tra_sa.encrypt(
619                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
620                             / ICMP(),
621                             seq_num=0x260000005,
622                         )
623                     )
624                 ]
625                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
626
627     def verify_tra_anti_replay(self):
628         p = self.params[socket.AF_INET]
629         esn_en = p.vpp_tra_sa.esn_en
630
631         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
632         replay_count = self.get_replay_counts(p)
633         hash_failed_count = self.get_hash_failed_counts(p)
634         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
635
636         if ESP == self.encryption_type:
637             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
638             undersize_count = self.statistics.get_err_counter(undersize_node_name)
639
640         #
641         # send packets with seq numbers 1->34
642         # this means the window size is still in Case B (see RFC4303
643         # Appendix A)
644         #
645         # for reasons i haven't investigated Scapy won't create a packet with
646         # seq_num=0
647         #
648         pkts = [
649             (
650                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
651                 / p.scapy_tra_sa.encrypt(
652                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
653                     seq_num=seq,
654                 )
655             )
656             for seq in range(1, 34)
657         ]
658         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
659
660         # replayed packets are dropped
661         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
662         replay_count += len(pkts)
663         self.assertEqual(self.get_replay_counts(p), replay_count)
664
665         #
666         # now send a batch of packets all with the same sequence number
667         # the first packet in the batch is legitimate, the rest bogus
668         #
669         self.vapi.cli("clear error")
670         self.vapi.cli("clear node counters")
671         pkts = Ether(
672             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
673         ) / p.scapy_tra_sa.encrypt(
674             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
675             seq_num=35,
676         )
677         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
678         replay_count += 7
679         self.assertEqual(self.get_replay_counts(p), replay_count)
680
681         #
682         # now move the window over to 257 (more than one byte) and into Case A
683         #
684         self.vapi.cli("clear error")
685         pkt = Ether(
686             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
687         ) / p.scapy_tra_sa.encrypt(
688             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
689             seq_num=257,
690         )
691         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
692
693         # replayed packets are dropped
694         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
695         replay_count += 3
696         self.assertEqual(self.get_replay_counts(p), replay_count)
697
698         # the window size is 64 packets
699         # in window are still accepted
700         pkt = Ether(
701             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
702         ) / p.scapy_tra_sa.encrypt(
703             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
704             seq_num=200,
705         )
706         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
707
708         # a packet that does not decrypt does not move the window forward
709         bogus_sa = SecurityAssociation(
710             self.encryption_type,
711             p.scapy_tra_spi,
712             crypt_algo=p.crypt_algo,
713             crypt_key=mk_scapy_crypt_key(p)[::-1],
714             auth_algo=p.auth_algo,
715             auth_key=p.auth_key[::-1],
716         )
717         pkt = Ether(
718             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
719         ) / bogus_sa.encrypt(
720             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
721             seq_num=350,
722         )
723         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
724
725         hash_failed_count += 17
726         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
727
728         # a malformed 'runt' packet
729         #  created by a mis-constructed SA
730         if ESP == self.encryption_type and p.crypt_algo != "NULL":
731             bogus_sa = SecurityAssociation(self.encryption_type, p.scapy_tra_spi)
732             pkt = Ether(
733                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
734             ) / bogus_sa.encrypt(
735                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
736                 seq_num=350,
737             )
738             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
739
740             undersize_count += 17
741             self.assert_error_counter_equal(undersize_node_name, undersize_count)
742
743         # which we can determine since this packet is still in the window
744         pkt = Ether(
745             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
746         ) / p.scapy_tra_sa.encrypt(
747             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
748             seq_num=234,
749         )
750         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
751
752         #
753         # out of window are dropped
754         #  this is Case B. So VPP will consider this to be a high seq num wrap
755         #  and so the decrypt attempt will fail
756         #
757         pkt = Ether(
758             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
759         ) / p.scapy_tra_sa.encrypt(
760             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
761             seq_num=17,
762         )
763         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
764
765         if esn_en:
766             # an out of window error with ESN looks like a high sequence
767             # wrap. but since it isn't then the verify will fail.
768             hash_failed_count += 17
769             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
770
771         else:
772             replay_count += 17
773             self.assertEqual(self.get_replay_counts(p), replay_count)
774
775         # valid packet moves the window over to 258
776         pkt = Ether(
777             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
778         ) / p.scapy_tra_sa.encrypt(
779             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
780             seq_num=258,
781         )
782         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
783         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
784
785         #
786         # move VPP's SA TX seq-num to just before the seq-number wrap.
787         # then fire in a packet that VPP should drop on TX because it
788         # causes the TX seq number to wrap; unless we're using extened sequence
789         # numbers.
790         #
791         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.vpp_tra_sa_id)
792         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
793         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
794
795         pkts = [
796             (
797                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
798                 / p.scapy_tra_sa.encrypt(
799                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
800                     seq_num=seq,
801                 )
802             )
803             for seq in range(259, 280)
804         ]
805
806         if esn_en:
807             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
808
809             #
810             # in order for scapy to decrypt its SA's high order number needs
811             # to wrap
812             #
813             p.vpp_tra_sa.seq_num = 0x100000000
814             for rx in rxs:
815                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
816
817             #
818             # wrap scapy's TX high sequence number. VPP is in case B, so it
819             # will consider this a high seq wrap also.
820             # The low seq num we set it to will place VPP's RX window in Case A
821             #
822             p.scapy_tra_sa.seq_num = 0x100000005
823             pkt = Ether(
824                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
825             ) / p.scapy_tra_sa.encrypt(
826                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
827                 seq_num=0x100000005,
828             )
829             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
830
831             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
832
833             #
834             # A packet that has seq num between (2^32-64) and 5 is within
835             # the window
836             #
837             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
838             pkt = Ether(
839                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
840             ) / p.scapy_tra_sa.encrypt(
841                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
842                 seq_num=0xFFFFFFFD,
843             )
844             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
845             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
846
847             #
848             # While in case A we cannot wrap the high sequence number again
849             # because VPP will consider this packet to be one that moves the
850             # window forward
851             #
852             pkt = Ether(
853                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
854             ) / p.scapy_tra_sa.encrypt(
855                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
856                 seq_num=0x200000999,
857             )
858             self.send_and_assert_no_replies(
859                 self.tra_if, [pkt], self.tra_if, timeout=0.2
860             )
861
862             hash_failed_count += 1
863             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
864
865             #
866             # but if we move the window forward to case B, then we can wrap
867             # again
868             #
869             p.scapy_tra_sa.seq_num = 0x100000555
870             pkt = Ether(
871                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
872             ) / p.scapy_tra_sa.encrypt(
873                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
874                 seq_num=0x100000555,
875             )
876             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
877             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
878
879             p.scapy_tra_sa.seq_num = 0x200000444
880             pkt = Ether(
881                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
882             ) / p.scapy_tra_sa.encrypt(
883                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
884                 seq_num=0x200000444,
885             )
886             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
887             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
888
889         else:
890             #
891             # without ESN TX sequence numbers can't wrap and packets are
892             # dropped from here on out.
893             #
894             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
895             seq_cycle_count += len(pkts)
896             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
897
898         # move the security-associations seq number on to the last we used
899         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
900         p.scapy_tra_sa.seq_num = 351
901         p.vpp_tra_sa.seq_num = 351
902
903     def verify_tra_lost(self):
904         p = self.params[socket.AF_INET]
905         esn_en = p.vpp_tra_sa.esn_en
906
907         #
908         # send packets with seq numbers 1->34
909         # this means the window size is still in Case B (see RFC4303
910         # Appendix A)
911         #
912         # for reasons i haven't investigated Scapy won't create a packet with
913         # seq_num=0
914         #
915         pkts = [
916             (
917                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
918                 / p.scapy_tra_sa.encrypt(
919                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
920                     seq_num=seq,
921                 )
922             )
923             for seq in range(1, 3)
924         ]
925         self.send_and_expect(self.tra_if, pkts, self.tra_if)
926
927         self.assertEqual(p.tra_sa_in.get_lost(), 0)
928
929         # skip a sequence number
930         pkts = [
931             (
932                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
933                 / p.scapy_tra_sa.encrypt(
934                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
935                     seq_num=seq,
936                 )
937             )
938             for seq in range(4, 6)
939         ]
940         self.send_and_expect(self.tra_if, pkts, self.tra_if)
941
942         self.assertEqual(p.tra_sa_in.get_lost(), 0)
943
944         # the lost packet are counted untill we get up past the first
945         # sizeof(replay_window) packets
946         pkts = [
947             (
948                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
949                 / p.scapy_tra_sa.encrypt(
950                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
951                     seq_num=seq,
952                 )
953             )
954             for seq in range(6, 100)
955         ]
956         self.send_and_expect(self.tra_if, pkts, self.tra_if)
957
958         self.assertEqual(p.tra_sa_in.get_lost(), 1)
959
960         # lost of holes in the sequence
961         pkts = [
962             (
963                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
964                 / p.scapy_tra_sa.encrypt(
965                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
966                     seq_num=seq,
967                 )
968             )
969             for seq in range(100, 200, 2)
970         ]
971         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
972
973         pkts = [
974             (
975                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
976                 / p.scapy_tra_sa.encrypt(
977                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
978                     seq_num=seq,
979                 )
980             )
981             for seq in range(200, 300)
982         ]
983         self.send_and_expect(self.tra_if, pkts, self.tra_if)
984
985         self.assertEqual(p.tra_sa_in.get_lost(), 51)
986
987         # a big hole in the seq number space
988         pkts = [
989             (
990                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
991                 / p.scapy_tra_sa.encrypt(
992                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
993                     seq_num=seq,
994                 )
995             )
996             for seq in range(400, 500)
997         ]
998         self.send_and_expect(self.tra_if, pkts, self.tra_if)
999
1000         self.assertEqual(p.tra_sa_in.get_lost(), 151)
1001
1002     def verify_tra_basic4(self, count=1, payload_size=54):
1003         """ipsec v4 transport basic test"""
1004         self.vapi.cli("clear errors")
1005         self.vapi.cli("clear ipsec sa")
1006         try:
1007             p = self.params[socket.AF_INET]
1008             send_pkts = self.gen_encrypt_pkts(
1009                 p,
1010                 p.scapy_tra_sa,
1011                 self.tra_if,
1012                 src=self.tra_if.remote_ip4,
1013                 dst=self.tra_if.local_ip4,
1014                 count=count,
1015                 payload_size=payload_size,
1016             )
1017             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1018             for rx in recv_pkts:
1019                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1020                 self.assert_packet_checksums_valid(rx)
1021                 try:
1022                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
1023                     self.assert_packet_checksums_valid(decrypted)
1024                 except:
1025                     self.logger.debug(ppp("Unexpected packet:", rx))
1026                     raise
1027         finally:
1028             self.logger.info(self.vapi.ppcli("show error"))
1029             self.logger.info(self.vapi.ppcli("show ipsec all"))
1030
1031         pkts = p.tra_sa_in.get_stats()["packets"]
1032         self.assertEqual(
1033             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1034         )
1035         pkts = p.tra_sa_out.get_stats()["packets"]
1036         self.assertEqual(
1037             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1038         )
1039         self.assertEqual(p.tra_sa_out.get_lost(), 0)
1040         self.assertEqual(p.tra_sa_in.get_lost(), 0)
1041
1042         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1043         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1044
1045
1046 class IpsecTra4Tests(IpsecTra4):
1047     """UT test methods for Transport v4"""
1048
1049     def test_tra_anti_replay(self):
1050         """ipsec v4 transport anti-replay test"""
1051         self.verify_tra_anti_replay()
1052
1053     def test_tra_lost(self):
1054         """ipsec v4 transport lost packet test"""
1055         self.verify_tra_lost()
1056
1057     def test_tra_basic(self, count=1):
1058         """ipsec v4 transport basic test"""
1059         self.verify_tra_basic4(count=1)
1060
1061     def test_tra_burst(self):
1062         """ipsec v4 transport burst test"""
1063         self.verify_tra_basic4(count=257)
1064
1065
1066 class IpsecTra6(object):
1067     """verify methods for Transport v6"""
1068
1069     def verify_tra_basic6(self, count=1, payload_size=54):
1070         self.vapi.cli("clear errors")
1071         self.vapi.cli("clear ipsec sa")
1072         try:
1073             p = self.params[socket.AF_INET6]
1074             send_pkts = self.gen_encrypt_pkts6(
1075                 p,
1076                 p.scapy_tra_sa,
1077                 self.tra_if,
1078                 src=self.tra_if.remote_ip6,
1079                 dst=self.tra_if.local_ip6,
1080                 count=count,
1081                 payload_size=payload_size,
1082             )
1083             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1084             for rx in recv_pkts:
1085                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1086                 try:
1087                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1088                     self.assert_packet_checksums_valid(decrypted)
1089                 except:
1090                     self.logger.debug(ppp("Unexpected packet:", rx))
1091                     raise
1092         finally:
1093             self.logger.info(self.vapi.ppcli("show error"))
1094             self.logger.info(self.vapi.ppcli("show ipsec all"))
1095
1096         pkts = p.tra_sa_in.get_stats()["packets"]
1097         self.assertEqual(
1098             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1099         )
1100         pkts = p.tra_sa_out.get_stats()["packets"]
1101         self.assertEqual(
1102             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1103         )
1104         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1105         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1106
1107     def gen_encrypt_pkts_ext_hdrs6(
1108         self, sa, sw_intf, src, dst, count=1, payload_size=54
1109     ):
1110         return [
1111             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1112             / sa.encrypt(
1113                 IPv6(src=src, dst=dst)
1114                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1115             )
1116             for i in range(count)
1117         ]
1118
1119     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1120         return [
1121             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1122             / IPv6(src=src, dst=dst)
1123             / IPv6ExtHdrHopByHop()
1124             / IPv6ExtHdrFragment(id=2, offset=200)
1125             / Raw(b"\xff" * 200)
1126             for i in range(count)
1127         ]
1128
1129     def verify_tra_encrypted6(self, p, sa, rxs):
1130         decrypted = []
1131         for rx in rxs:
1132             self.assert_packet_checksums_valid(rx)
1133             try:
1134                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1135                 decrypted.append(decrypt_pkt)
1136                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1137                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1138             except:
1139                 self.logger.debug(ppp("Unexpected packet:", rx))
1140                 try:
1141                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1142                 except:
1143                     pass
1144                 raise
1145         return decrypted
1146
1147     def verify_tra_66_ext_hdrs(self, p):
1148         count = 63
1149
1150         #
1151         # check we can decrypt with options
1152         #
1153         tx = self.gen_encrypt_pkts_ext_hdrs6(
1154             p.scapy_tra_sa,
1155             self.tra_if,
1156             src=self.tra_if.remote_ip6,
1157             dst=self.tra_if.local_ip6,
1158             count=count,
1159         )
1160         self.send_and_expect(self.tra_if, tx, self.tra_if)
1161
1162         #
1163         # injecting a packet from ourselves to be routed of box is a hack
1164         # but it matches an outbout policy, alors je ne regrette rien
1165         #
1166
1167         # one extension before ESP
1168         tx = (
1169             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1170             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1171             / IPv6ExtHdrFragment(id=2, offset=200)
1172             / Raw(b"\xff" * 200)
1173         )
1174
1175         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1176         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1177
1178         for dc in dcs:
1179             # for reasons i'm not going to investigate scapy does not
1180             # created the correct headers after decrypt. but reparsing
1181             # the ipv6 packet fixes it
1182             dc = IPv6(raw(dc[IPv6]))
1183             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1184
1185         # two extensions before ESP
1186         tx = (
1187             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1188             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1189             / IPv6ExtHdrHopByHop()
1190             / IPv6ExtHdrFragment(id=2, offset=200)
1191             / Raw(b"\xff" * 200)
1192         )
1193
1194         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1195         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1196
1197         for dc in dcs:
1198             dc = IPv6(raw(dc[IPv6]))
1199             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1200             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1201
1202         # two extensions before ESP, one after
1203         tx = (
1204             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1205             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1206             / IPv6ExtHdrHopByHop()
1207             / IPv6ExtHdrFragment(id=2, offset=200)
1208             / IPv6ExtHdrDestOpt()
1209             / Raw(b"\xff" * 200)
1210         )
1211
1212         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1213         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1214
1215         for dc in dcs:
1216             dc = IPv6(raw(dc[IPv6]))
1217             self.assertTrue(dc[IPv6ExtHdrDestOpt])
1218             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1219             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1220
1221
1222 class IpsecTra6Tests(IpsecTra6):
1223     """UT test methods for Transport v6"""
1224
1225     def test_tra_basic6(self):
1226         """ipsec v6 transport basic test"""
1227         self.verify_tra_basic6(count=1)
1228
1229     def test_tra_burst6(self):
1230         """ipsec v6 transport burst test"""
1231         self.verify_tra_basic6(count=257)
1232
1233
1234 class IpsecTra6ExtTests(IpsecTra6):
1235     def test_tra_ext_hdrs_66(self):
1236         """ipsec 6o6 tra extension headers test"""
1237         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1238
1239
1240 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1241     """UT test methods for Transport v6 and v4"""
1242
1243     pass
1244
1245
1246 class IpsecTun4(object):
1247     """verify methods for Tunnel v4"""
1248
1249     def verify_counters4(self, p, count, n_frags=None, worker=None):
1250         if not n_frags:
1251             n_frags = count
1252         if hasattr(p, "spd_policy_in_any"):
1253             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1254             self.assertEqual(
1255                 pkts,
1256                 count,
1257                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1258             )
1259
1260         if hasattr(p, "tun_sa_in"):
1261             pkts = p.tun_sa_in.get_stats(worker)["packets"]
1262             self.assertEqual(
1263                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1264             )
1265             pkts = p.tun_sa_out.get_stats(worker)["packets"]
1266             self.assertEqual(
1267                 pkts,
1268                 n_frags,
1269                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1270             )
1271
1272         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1273         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1274
1275     def verify_decrypted(self, p, rxs):
1276         for rx in rxs:
1277             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1278             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1279             self.assert_packet_checksums_valid(rx)
1280
1281     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1282         align = sa.crypt_algo.block_size
1283         if align < 4:
1284             align = 4
1285         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1286         exp_len += sa.crypt_algo.iv_size
1287         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1288         self.assertEqual(exp_len, len(esp_payload))
1289
1290     def verify_encrypted(self, p, sa, rxs):
1291         decrypt_pkts = []
1292         for rx in rxs:
1293             if p.nat_header:
1294                 self.assertEqual(rx[UDP].dport, p.nat_header.dport)
1295             self.assert_packet_checksums_valid(rx)
1296             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1297             try:
1298                 rx_ip = rx[IP]
1299                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1300                 if not decrypt_pkt.haslayer(IP):
1301                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
1302                 if rx_ip.proto == socket.IPPROTO_ESP:
1303                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1304                 decrypt_pkts.append(decrypt_pkt)
1305                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1306                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1307             except:
1308                 self.logger.debug(ppp("Unexpected packet:", rx))
1309                 try:
1310                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1311                 except:
1312                     pass
1313                 raise
1314         pkts = reassemble4(decrypt_pkts)
1315         for pkt in pkts:
1316             self.assert_packet_checksums_valid(pkt)
1317
1318     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1319         self.vapi.cli("clear errors")
1320         self.vapi.cli("clear ipsec counters")
1321         self.vapi.cli("clear ipsec sa")
1322         if not n_rx:
1323             n_rx = count
1324         try:
1325             send_pkts = self.gen_encrypt_pkts(
1326                 p,
1327                 p.scapy_tun_sa,
1328                 self.tun_if,
1329                 src=p.remote_tun_if_host,
1330                 dst=self.pg1.remote_ip4,
1331                 count=count,
1332                 payload_size=payload_size,
1333             )
1334             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1335             self.verify_decrypted(p, recv_pkts)
1336
1337             send_pkts = self.gen_pkts(
1338                 self.pg1,
1339                 src=self.pg1.remote_ip4,
1340                 dst=p.remote_tun_if_host,
1341                 count=count,
1342                 payload_size=payload_size,
1343             )
1344             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1345             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1346
1347             for rx in recv_pkts:
1348                 self.assertEqual(rx[IP].src, p.tun_src)
1349                 self.assertEqual(rx[IP].dst, p.tun_dst)
1350
1351         finally:
1352             self.logger.info(self.vapi.ppcli("show error"))
1353             self.logger.info(self.vapi.ppcli("show ipsec all"))
1354
1355         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1356         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1357         self.verify_counters4(p, count, n_rx)
1358
1359     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1360         self.vapi.cli("clear errors")
1361         if not n_rx:
1362             n_rx = count
1363         try:
1364             send_pkts = self.gen_encrypt_pkts(
1365                 p,
1366                 p.scapy_tun_sa,
1367                 self.tun_if,
1368                 src=p.remote_tun_if_host,
1369                 dst=self.pg1.remote_ip4,
1370                 count=count,
1371             )
1372             self.send_and_assert_no_replies(self.tun_if, send_pkts)
1373
1374             send_pkts = self.gen_pkts(
1375                 self.pg1,
1376                 src=self.pg1.remote_ip4,
1377                 dst=p.remote_tun_if_host,
1378                 count=count,
1379                 payload_size=payload_size,
1380             )
1381             self.send_and_assert_no_replies(self.pg1, send_pkts)
1382
1383         finally:
1384             self.logger.info(self.vapi.ppcli("show error"))
1385             self.logger.info(self.vapi.ppcli("show ipsec all"))
1386
1387     def verify_tun_reass_44(self, p):
1388         self.vapi.cli("clear errors")
1389         self.vapi.ip_reassembly_enable_disable(
1390             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1391         )
1392
1393         try:
1394             send_pkts = self.gen_encrypt_pkts(
1395                 p,
1396                 p.scapy_tun_sa,
1397                 self.tun_if,
1398                 src=p.remote_tun_if_host,
1399                 dst=self.pg1.remote_ip4,
1400                 payload_size=1900,
1401                 count=1,
1402             )
1403             send_pkts = fragment_rfc791(send_pkts[0], 1400)
1404             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1405             self.verify_decrypted(p, recv_pkts)
1406
1407             send_pkts = self.gen_pkts(
1408                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1409             )
1410             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1411             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1412
1413         finally:
1414             self.logger.info(self.vapi.ppcli("show error"))
1415             self.logger.info(self.vapi.ppcli("show ipsec all"))
1416
1417         self.verify_counters4(p, 1, 1)
1418         self.vapi.ip_reassembly_enable_disable(
1419             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1420         )
1421
1422     def verify_tun_64(self, p, count=1):
1423         self.vapi.cli("clear errors")
1424         self.vapi.cli("clear ipsec sa")
1425         try:
1426             send_pkts = self.gen_encrypt_pkts6(
1427                 p,
1428                 p.scapy_tun_sa,
1429                 self.tun_if,
1430                 src=p.remote_tun_if_host6,
1431                 dst=self.pg1.remote_ip6,
1432                 count=count,
1433             )
1434             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1435             for recv_pkt in recv_pkts:
1436                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1437                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1438                 self.assert_packet_checksums_valid(recv_pkt)
1439             send_pkts = self.gen_pkts6(
1440                 p,
1441                 self.pg1,
1442                 src=self.pg1.remote_ip6,
1443                 dst=p.remote_tun_if_host6,
1444                 count=count,
1445             )
1446             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1447             for recv_pkt in recv_pkts:
1448                 try:
1449                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1450                     if not decrypt_pkt.haslayer(IPv6):
1451                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1452                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1453                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1454                     self.assert_packet_checksums_valid(decrypt_pkt)
1455                 except:
1456                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
1457                     try:
1458                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1459                     except:
1460                         pass
1461                     raise
1462         finally:
1463             self.logger.info(self.vapi.ppcli("show error"))
1464             self.logger.info(self.vapi.ppcli("show ipsec all"))
1465
1466         self.verify_counters4(p, count)
1467
1468     def verify_keepalive(self, p):
1469         # the sizeof Raw is calculated to pad to the minimum ehternet
1470         # frame size of 64 btyes
1471         pkt = (
1472             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1473             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1474             / UDP(sport=333, dport=4500)
1475             / Raw(b"\xff")
1476             / Padding(0 * 21)
1477         )
1478         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1479         self.assert_error_counter_equal(
1480             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
1481         )
1482
1483         pkt = (
1484             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1485             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1486             / UDP(sport=333, dport=4500)
1487             / Raw(b"\xfe")
1488         )
1489         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1490         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
1491
1492         pkt = (
1493             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1494             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1495             / UDP(sport=333, dport=4500)
1496             / Raw(b"\xfe")
1497             / Padding(0 * 21)
1498         )
1499         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1500         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
1501
1502
1503 class IpsecTun4Tests(IpsecTun4):
1504     """UT test methods for Tunnel v4"""
1505
1506     def test_tun_basic44(self):
1507         """ipsec 4o4 tunnel basic test"""
1508         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1509         self.tun_if.admin_down()
1510         self.tun_if.resolve_arp()
1511         self.tun_if.admin_up()
1512         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1513
1514     def test_tun_reass_basic44(self):
1515         """ipsec 4o4 tunnel basic reassembly test"""
1516         self.verify_tun_reass_44(self.params[socket.AF_INET])
1517
1518     def test_tun_burst44(self):
1519         """ipsec 4o4 tunnel burst test"""
1520         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1521
1522
1523 class IpsecTun6(object):
1524     """verify methods for Tunnel v6"""
1525
1526     def verify_counters6(self, p_in, p_out, count, worker=None):
1527         if hasattr(p_in, "tun_sa_in"):
1528             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1529             self.assertEqual(
1530                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1531             )
1532         if hasattr(p_out, "tun_sa_out"):
1533             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1534             self.assertEqual(
1535                 pkts,
1536                 count,
1537                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1538             )
1539         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1540         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1541
1542     def verify_decrypted6(self, p, rxs):
1543         for rx in rxs:
1544             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1545             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1546             self.assert_packet_checksums_valid(rx)
1547
1548     def verify_encrypted6(self, p, sa, rxs):
1549         for rx in rxs:
1550             self.assert_packet_checksums_valid(rx)
1551             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1552             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1553             if p.outer_flow_label:
1554                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1555             try:
1556                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1557                 if not decrypt_pkt.haslayer(IPv6):
1558                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1559                 self.assert_packet_checksums_valid(decrypt_pkt)
1560                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1561                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1562                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1563                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1564             except:
1565                 self.logger.debug(ppp("Unexpected packet:", rx))
1566                 try:
1567                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1568                 except:
1569                     pass
1570                 raise
1571
1572     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1573         self.vapi.cli("clear errors")
1574         self.vapi.cli("clear ipsec sa")
1575
1576         send_pkts = self.gen_pkts6(
1577             p_in,
1578             self.pg1,
1579             src=self.pg1.remote_ip6,
1580             dst=p_in.remote_tun_if_host,
1581             count=count,
1582             payload_size=payload_size,
1583         )
1584         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1585         self.logger.info(self.vapi.cli("sh punt stats"))
1586
1587     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1588         self.vapi.cli("clear errors")
1589         self.vapi.cli("clear ipsec sa")
1590
1591         send_pkts = self.gen_encrypt_pkts6(
1592             p_in,
1593             p_in.scapy_tun_sa,
1594             self.tun_if,
1595             src=p_in.remote_tun_if_host,
1596             dst=self.pg1.remote_ip6,
1597             count=count,
1598         )
1599         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1600
1601     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1602         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1603         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1604
1605     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1606         self.vapi.cli("clear errors")
1607         self.vapi.cli("clear ipsec sa")
1608         if not p_out:
1609             p_out = p_in
1610         try:
1611             send_pkts = self.gen_encrypt_pkts6(
1612                 p_in,
1613                 p_in.scapy_tun_sa,
1614                 self.tun_if,
1615                 src=p_in.remote_tun_if_host,
1616                 dst=self.pg1.remote_ip6,
1617                 count=count,
1618                 payload_size=payload_size,
1619             )
1620             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1621             self.verify_decrypted6(p_in, recv_pkts)
1622
1623             send_pkts = self.gen_pkts6(
1624                 p_in,
1625                 self.pg1,
1626                 src=self.pg1.remote_ip6,
1627                 dst=p_out.remote_tun_if_host,
1628                 count=count,
1629                 payload_size=payload_size,
1630             )
1631             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1632             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1633
1634             for rx in recv_pkts:
1635                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1636                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1637
1638         finally:
1639             self.logger.info(self.vapi.ppcli("show error"))
1640             self.logger.info(self.vapi.ppcli("show ipsec all"))
1641         self.verify_counters6(p_in, p_out, count)
1642
1643     def verify_tun_reass_66(self, p):
1644         self.vapi.cli("clear errors")
1645         self.vapi.ip_reassembly_enable_disable(
1646             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1647         )
1648
1649         try:
1650             send_pkts = self.gen_encrypt_pkts6(
1651                 p,
1652                 p.scapy_tun_sa,
1653                 self.tun_if,
1654                 src=p.remote_tun_if_host,
1655                 dst=self.pg1.remote_ip6,
1656                 count=1,
1657                 payload_size=1850,
1658             )
1659             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1660             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1661             self.verify_decrypted6(p, recv_pkts)
1662
1663             send_pkts = self.gen_pkts6(
1664                 p,
1665                 self.pg1,
1666                 src=self.pg1.remote_ip6,
1667                 dst=p.remote_tun_if_host,
1668                 count=1,
1669                 payload_size=64,
1670             )
1671             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1672             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1673         finally:
1674             self.logger.info(self.vapi.ppcli("show error"))
1675             self.logger.info(self.vapi.ppcli("show ipsec all"))
1676         self.verify_counters6(p, p, 1)
1677         self.vapi.ip_reassembly_enable_disable(
1678             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1679         )
1680
1681     def verify_tun_46(self, p, count=1):
1682         """ipsec 4o6 tunnel basic test"""
1683         self.vapi.cli("clear errors")
1684         self.vapi.cli("clear ipsec sa")
1685         try:
1686             send_pkts = self.gen_encrypt_pkts(
1687                 p,
1688                 p.scapy_tun_sa,
1689                 self.tun_if,
1690                 src=p.remote_tun_if_host4,
1691                 dst=self.pg1.remote_ip4,
1692                 count=count,
1693             )
1694             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1695             for recv_pkt in recv_pkts:
1696                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1697                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1698                 self.assert_packet_checksums_valid(recv_pkt)
1699             send_pkts = self.gen_pkts(
1700                 self.pg1,
1701                 src=self.pg1.remote_ip4,
1702                 dst=p.remote_tun_if_host4,
1703                 count=count,
1704             )
1705             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1706             for recv_pkt in recv_pkts:
1707                 try:
1708                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1709                     if not decrypt_pkt.haslayer(IP):
1710                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1711                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1712                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1713                     self.assert_packet_checksums_valid(decrypt_pkt)
1714                 except:
1715                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1716                     try:
1717                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1718                     except:
1719                         pass
1720                     raise
1721         finally:
1722             self.logger.info(self.vapi.ppcli("show error"))
1723             self.logger.info(self.vapi.ppcli("show ipsec all"))
1724         self.verify_counters6(p, p, count)
1725
1726     def verify_keepalive(self, p):
1727         # the sizeof Raw is calculated to pad to the minimum ehternet
1728         # frame size of 64 btyes
1729         pkt = (
1730             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1731             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1732             / UDP(sport=333, dport=4500)
1733             / Raw(b"\xff")
1734             / Padding(0 * 1)
1735         )
1736         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1737         self.assert_error_counter_equal(
1738             "/err/%s/nat_keepalive" % self.tun6_input_node, 31
1739         )
1740
1741         pkt = (
1742             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1743             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1744             / UDP(sport=333, dport=4500)
1745             / Raw(b"\xfe")
1746         )
1747         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1748         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 31)
1749
1750         pkt = (
1751             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1752             / IPv6(src=p.remote_tun_if_host, dst=self.tun_if.local_ip6)
1753             / UDP(sport=333, dport=4500)
1754             / Raw(b"\xfe")
1755             / Padding(0 * 21)
1756         )
1757         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1758         self.assert_error_counter_equal("/err/%s/too_short" % self.tun6_input_node, 62)
1759
1760
1761 class IpsecTun6Tests(IpsecTun6):
1762     """UT test methods for Tunnel v6"""
1763
1764     def test_tun_basic66(self):
1765         """ipsec 6o6 tunnel basic test"""
1766         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1767
1768     def test_tun_reass_basic66(self):
1769         """ipsec 6o6 tunnel basic reassembly test"""
1770         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1771
1772     def test_tun_burst66(self):
1773         """ipsec 6o6 tunnel burst test"""
1774         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1775
1776
1777 class IpsecTun6HandoffTests(IpsecTun6):
1778     """UT test methods for Tunnel v6 with multiple workers"""
1779
1780     vpp_worker_count = 2
1781
1782     def test_tun_handoff_66(self):
1783         """ipsec 6o6 tunnel worker hand-off test"""
1784         self.vapi.cli("clear errors")
1785         self.vapi.cli("clear ipsec sa")
1786
1787         N_PKTS = 15
1788         p = self.params[socket.AF_INET6]
1789
1790         # inject alternately on worker 0 and 1. all counts on the SA
1791         # should be against worker 0
1792         for worker in [0, 1, 0, 1]:
1793             send_pkts = self.gen_encrypt_pkts6(
1794                 p,
1795                 p.scapy_tun_sa,
1796                 self.tun_if,
1797                 src=p.remote_tun_if_host,
1798                 dst=self.pg1.remote_ip6,
1799                 count=N_PKTS,
1800             )
1801             recv_pkts = self.send_and_expect(
1802                 self.tun_if, send_pkts, self.pg1, worker=worker
1803             )
1804             self.verify_decrypted6(p, recv_pkts)
1805
1806             send_pkts = self.gen_pkts6(
1807                 p,
1808                 self.pg1,
1809                 src=self.pg1.remote_ip6,
1810                 dst=p.remote_tun_if_host,
1811                 count=N_PKTS,
1812             )
1813             recv_pkts = self.send_and_expect(
1814                 self.pg1, send_pkts, self.tun_if, worker=worker
1815             )
1816             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1817
1818         # all counts against the first worker that was used
1819         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1820
1821
1822 class IpsecTun4HandoffTests(IpsecTun4):
1823     """UT test methods for Tunnel v4 with multiple workers"""
1824
1825     vpp_worker_count = 2
1826
1827     def test_tun_handooff_44(self):
1828         """ipsec 4o4 tunnel worker hand-off test"""
1829         self.vapi.cli("clear errors")
1830         self.vapi.cli("clear ipsec sa")
1831
1832         N_PKTS = 15
1833         p = self.params[socket.AF_INET]
1834
1835         # inject alternately on worker 0 and 1. all counts on the SA
1836         # should be against worker 0
1837         for worker in [0, 1, 0, 1]:
1838             send_pkts = self.gen_encrypt_pkts(
1839                 p,
1840                 p.scapy_tun_sa,
1841                 self.tun_if,
1842                 src=p.remote_tun_if_host,
1843                 dst=self.pg1.remote_ip4,
1844                 count=N_PKTS,
1845             )
1846             recv_pkts = self.send_and_expect(
1847                 self.tun_if, send_pkts, self.pg1, worker=worker
1848             )
1849             self.verify_decrypted(p, recv_pkts)
1850
1851             send_pkts = self.gen_pkts(
1852                 self.pg1,
1853                 src=self.pg1.remote_ip4,
1854                 dst=p.remote_tun_if_host,
1855                 count=N_PKTS,
1856             )
1857             recv_pkts = self.send_and_expect(
1858                 self.pg1, send_pkts, self.tun_if, worker=worker
1859             )
1860             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1861
1862         # all counts against the first worker that was used
1863         self.verify_counters4(p, 4 * N_PKTS, worker=0)
1864
1865
1866 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1867     """UT test methods for Tunnel v6 & v4"""
1868
1869     pass
1870
1871
1872 class IPSecIPv4Fwd(VppTestCase):
1873     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1874
1875     @classmethod
1876     def setUpConstants(cls):
1877         super(IPSecIPv4Fwd, cls).setUpConstants()
1878
1879     def setUp(self):
1880         super(IPSecIPv4Fwd, self).setUp()
1881         # store SPD objects so we can remove configs on tear down
1882         self.spd_objs = []
1883         self.spd_policies = []
1884
1885     def tearDown(self):
1886         # remove SPD policies
1887         for obj in self.spd_policies:
1888             obj.remove_vpp_config()
1889         self.spd_policies = []
1890         # remove SPD items (interface bindings first, then SPD)
1891         for obj in reversed(self.spd_objs):
1892             obj.remove_vpp_config()
1893         self.spd_objs = []
1894         # close down pg intfs
1895         for pg in self.pg_interfaces:
1896             pg.unconfig_ip4()
1897             pg.admin_down()
1898         super(IPSecIPv4Fwd, self).tearDown()
1899
1900     def create_interfaces(self, num_ifs=2):
1901         # create interfaces pg0 ... pg<num_ifs>
1902         self.create_pg_interfaces(range(num_ifs))
1903         for pg in self.pg_interfaces:
1904             # put the interface up
1905             pg.admin_up()
1906             # configure IPv4 address on the interface
1907             pg.config_ip4()
1908             # resolve ARP, so that we know VPP MAC
1909             pg.resolve_arp()
1910         self.logger.info(self.vapi.ppcli("show int addr"))
1911
1912     def spd_create_and_intf_add(self, spd_id, pg_list):
1913         spd = VppIpsecSpd(self, spd_id)
1914         spd.add_vpp_config()
1915         self.spd_objs.append(spd)
1916         for pg in pg_list:
1917             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1918             spdItf.add_vpp_config()
1919             self.spd_objs.append(spdItf)
1920
1921     def get_policy(self, policy_type):
1922         e = VppEnum.vl_api_ipsec_spd_action_t
1923         if policy_type == "protect":
1924             return e.IPSEC_API_SPD_ACTION_PROTECT
1925         elif policy_type == "bypass":
1926             return e.IPSEC_API_SPD_ACTION_BYPASS
1927         elif policy_type == "discard":
1928             return e.IPSEC_API_SPD_ACTION_DISCARD
1929         else:
1930             raise Exception("Invalid policy type: %s", policy_type)
1931
1932     def spd_add_rem_policy(
1933         self,
1934         spd_id,
1935         src_if,
1936         dst_if,
1937         proto,
1938         is_out,
1939         priority,
1940         policy_type,
1941         remove=False,
1942         all_ips=False,
1943         ip_range=False,
1944         local_ip_start=ip_address("0.0.0.0"),
1945         local_ip_stop=ip_address("255.255.255.255"),
1946         remote_ip_start=ip_address("0.0.0.0"),
1947         remote_ip_stop=ip_address("255.255.255.255"),
1948         remote_port_start=0,
1949         remote_port_stop=65535,
1950         local_port_start=0,
1951         local_port_stop=65535,
1952     ):
1953         spd = VppIpsecSpd(self, spd_id)
1954
1955         if all_ips:
1956             src_range_low = ip_address("0.0.0.0")
1957             src_range_high = ip_address("255.255.255.255")
1958             dst_range_low = ip_address("0.0.0.0")
1959             dst_range_high = ip_address("255.255.255.255")
1960
1961         elif ip_range:
1962             src_range_low = local_ip_start
1963             src_range_high = local_ip_stop
1964             dst_range_low = remote_ip_start
1965             dst_range_high = remote_ip_stop
1966
1967         else:
1968             src_range_low = src_if.remote_ip4
1969             src_range_high = src_if.remote_ip4
1970             dst_range_low = dst_if.remote_ip4
1971             dst_range_high = dst_if.remote_ip4
1972
1973         spdEntry = VppIpsecSpdEntry(
1974             self,
1975             spd,
1976             0,
1977             src_range_low,
1978             src_range_high,
1979             dst_range_low,
1980             dst_range_high,
1981             proto,
1982             priority=priority,
1983             policy=self.get_policy(policy_type),
1984             is_outbound=is_out,
1985             remote_port_start=remote_port_start,
1986             remote_port_stop=remote_port_stop,
1987             local_port_start=local_port_start,
1988             local_port_stop=local_port_stop,
1989         )
1990
1991         if remove is False:
1992             spdEntry.add_vpp_config()
1993             self.spd_policies.append(spdEntry)
1994         else:
1995             spdEntry.remove_vpp_config()
1996             self.spd_policies.remove(spdEntry)
1997         self.logger.info(self.vapi.ppcli("show ipsec all"))
1998         return spdEntry
1999
2000     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2001         packets = []
2002         for i in range(pkt_count):
2003             # create packet info stored in the test case instance
2004             info = self.create_packet_info(src_if, dst_if)
2005             # convert the info into packet payload
2006             payload = self.info_to_payload(info)
2007             # create the packet itself
2008             p = (
2009                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2010                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
2011                 / UDP(sport=src_prt, dport=dst_prt)
2012                 / Raw(payload)
2013             )
2014             # store a copy of the packet in the packet info
2015             info.data = p.copy()
2016             # append the packet to the list
2017             packets.append(p)
2018         # return the created packet list
2019         return packets
2020
2021     def verify_capture(self, src_if, dst_if, capture):
2022         packet_info = None
2023         for packet in capture:
2024             try:
2025                 ip = packet[IP]
2026                 udp = packet[UDP]
2027                 # convert the payload to packet info object
2028                 payload_info = self.payload_to_info(packet)
2029                 # make sure the indexes match
2030                 self.assert_equal(
2031                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2032                 )
2033                 self.assert_equal(
2034                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2035                 )
2036                 packet_info = self.get_next_packet_info_for_interface2(
2037                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2038                 )
2039                 # make sure we didn't run out of saved packets
2040                 self.assertIsNotNone(packet_info)
2041                 self.assert_equal(
2042                     payload_info.index, packet_info.index, "packet info index"
2043                 )
2044                 saved_packet = packet_info.data  # fetch the saved packet
2045                 # assert the values match
2046                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
2047                 # ... more assertions here
2048                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2049             except Exception as e:
2050                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2051                 raise
2052         remaining_packet = self.get_next_packet_info_for_interface2(
2053             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2054         )
2055         self.assertIsNone(
2056             remaining_packet,
2057             "Interface %s: Packet expected from interface "
2058             "%s didn't arrive" % (dst_if.name, src_if.name),
2059         )
2060
2061     def verify_policy_match(self, pkt_count, spdEntry):
2062         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2063         matched_pkts = spdEntry.get_stats().get("packets")
2064         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2065         self.assert_equal(pkt_count, matched_pkts)
2066
2067
2068 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2069     @classmethod
2070     def setUpConstants(cls):
2071         super(SpdFlowCacheTemplate, cls).setUpConstants()
2072         # Override this method with required cmdline parameters e.g.
2073         # cls.vpp_cmdline.extend(["ipsec", "{",
2074         #                         "ipv4-outbound-spd-flow-cache on",
2075         #                         "}"])
2076         # cls.logger.info("VPP modified cmdline is %s" % " "
2077         #                 .join(cls.vpp_cmdline))
2078
2079     def setUp(self):
2080         super(SpdFlowCacheTemplate, self).setUp()
2081
2082     def tearDown(self):
2083         super(SpdFlowCacheTemplate, self).tearDown()
2084
2085     def get_spd_flow_cache_entries(self, outbound):
2086         """'show ipsec spd' output:
2087         ipv4-inbound-spd-flow-cache-entries: 0
2088         ipv4-outbound-spd-flow-cache-entries: 0
2089         """
2090         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2091         # match the relevant section of 'show ipsec spd' output
2092         if outbound:
2093             regex_match = re.search(
2094                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2095                 show_ipsec_reply,
2096                 re.DOTALL,
2097             )
2098         else:
2099             regex_match = re.search(
2100                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2101             )
2102         if regex_match is None:
2103             raise Exception(
2104                 "Unable to find spd flow cache entries \
2105                 in 'show ipsec spd' CLI output - regex failed to match"
2106             )
2107         else:
2108             try:
2109                 num_entries = int(regex_match.group(1))
2110             except ValueError:
2111                 raise Exception(
2112                     "Unable to get spd flow cache entries \
2113                 from 'show ipsec spd' string: %s",
2114                     regex_match.group(0),
2115                 )
2116             self.logger.info("%s", regex_match.group(0))
2117         return num_entries
2118
2119     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2120         self.assertEqual(
2121             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2122         )
2123
2124     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2125         self.assertEqual(
2126             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2127         )
2128
2129     def crc32_supported(self):
2130         # lscpu is part of util-linux package, available on all Linux Distros
2131         stream = os.popen("lscpu")
2132         cpu_info = stream.read()
2133         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2134         # see vppinfra/crc32.h
2135         if "crc32" or "sse4_2" in cpu_info:
2136             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2137             return True
2138         else:
2139             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2140             return False
2141
2142
2143 class IPSecIPv6Fwd(VppTestCase):
2144     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2145
2146     @classmethod
2147     def setUpConstants(cls):
2148         super(IPSecIPv6Fwd, cls).setUpConstants()
2149
2150     def setUp(self):
2151         super(IPSecIPv6Fwd, self).setUp()
2152         # store SPD objects so we can remove configs on tear down
2153         self.spd_objs = []
2154         self.spd_policies = []
2155
2156     def tearDown(self):
2157         # remove SPD policies
2158         for obj in self.spd_policies:
2159             obj.remove_vpp_config()
2160         self.spd_policies = []
2161         # remove SPD items (interface bindings first, then SPD)
2162         for obj in reversed(self.spd_objs):
2163             obj.remove_vpp_config()
2164         self.spd_objs = []
2165         # close down pg intfs
2166         for pg in self.pg_interfaces:
2167             pg.unconfig_ip6()
2168             pg.admin_down()
2169         super(IPSecIPv6Fwd, self).tearDown()
2170
2171     def create_interfaces(self, num_ifs=2):
2172         # create interfaces pg0 ... pg<num_ifs>
2173         self.create_pg_interfaces(range(num_ifs))
2174         for pg in self.pg_interfaces:
2175             # put the interface up
2176             pg.admin_up()
2177             # configure IPv6 address on the interface
2178             pg.config_ip6()
2179             pg.resolve_ndp()
2180         self.logger.info(self.vapi.ppcli("show int addr"))
2181
2182     def spd_create_and_intf_add(self, spd_id, pg_list):
2183         spd = VppIpsecSpd(self, spd_id)
2184         spd.add_vpp_config()
2185         self.spd_objs.append(spd)
2186         for pg in pg_list:
2187             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2188             spdItf.add_vpp_config()
2189             self.spd_objs.append(spdItf)
2190
2191     def get_policy(self, policy_type):
2192         e = VppEnum.vl_api_ipsec_spd_action_t
2193         if policy_type == "protect":
2194             return e.IPSEC_API_SPD_ACTION_PROTECT
2195         elif policy_type == "bypass":
2196             return e.IPSEC_API_SPD_ACTION_BYPASS
2197         elif policy_type == "discard":
2198             return e.IPSEC_API_SPD_ACTION_DISCARD
2199         else:
2200             raise Exception("Invalid policy type: %s", policy_type)
2201
2202     def spd_add_rem_policy(
2203         self,
2204         spd_id,
2205         src_if,
2206         dst_if,
2207         proto,
2208         is_out,
2209         priority,
2210         policy_type,
2211         remove=False,
2212         all_ips=False,
2213         ip_range=False,
2214         local_ip_start=ip_address("0::0"),
2215         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2216         remote_ip_start=ip_address("0::0"),
2217         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2218         remote_port_start=0,
2219         remote_port_stop=65535,
2220         local_port_start=0,
2221         local_port_stop=65535,
2222     ):
2223         spd = VppIpsecSpd(self, spd_id)
2224
2225         if all_ips:
2226             src_range_low = ip_address("0::0")
2227             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2228             dst_range_low = ip_address("0::0")
2229             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2230
2231         elif ip_range:
2232             src_range_low = local_ip_start
2233             src_range_high = local_ip_stop
2234             dst_range_low = remote_ip_start
2235             dst_range_high = remote_ip_stop
2236
2237         else:
2238             src_range_low = src_if.remote_ip6
2239             src_range_high = src_if.remote_ip6
2240             dst_range_low = dst_if.remote_ip6
2241             dst_range_high = dst_if.remote_ip6
2242
2243         spdEntry = VppIpsecSpdEntry(
2244             self,
2245             spd,
2246             0,
2247             src_range_low,
2248             src_range_high,
2249             dst_range_low,
2250             dst_range_high,
2251             proto,
2252             priority=priority,
2253             policy=self.get_policy(policy_type),
2254             is_outbound=is_out,
2255             remote_port_start=remote_port_start,
2256             remote_port_stop=remote_port_stop,
2257             local_port_start=local_port_start,
2258             local_port_stop=local_port_stop,
2259         )
2260
2261         if remove is False:
2262             spdEntry.add_vpp_config()
2263             self.spd_policies.append(spdEntry)
2264         else:
2265             spdEntry.remove_vpp_config()
2266             self.spd_policies.remove(spdEntry)
2267         self.logger.info(self.vapi.ppcli("show ipsec all"))
2268         return spdEntry
2269
2270     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2271         packets = []
2272         for i in range(pkt_count):
2273             # create packet info stored in the test case instance
2274             info = self.create_packet_info(src_if, dst_if)
2275             # convert the info into packet payload
2276             payload = self.info_to_payload(info)
2277             # create the packet itself
2278             p = (
2279                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2280                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2281                 / UDP(sport=src_prt, dport=dst_prt)
2282                 / Raw(payload)
2283             )
2284             # store a copy of the packet in the packet info
2285             info.data = p.copy()
2286             # append the packet to the list
2287             packets.append(p)
2288         # return the created packet list
2289         return packets
2290
2291     def verify_capture(self, src_if, dst_if, capture):
2292         packet_info = None
2293         for packet in capture:
2294             try:
2295                 ip = packet[IPv6]
2296                 udp = packet[UDP]
2297                 # convert the payload to packet info object
2298                 payload_info = self.payload_to_info(packet)
2299                 # make sure the indexes match
2300                 self.assert_equal(
2301                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2302                 )
2303                 self.assert_equal(
2304                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2305                 )
2306                 packet_info = self.get_next_packet_info_for_interface2(
2307                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2308                 )
2309                 # make sure we didn't run out of saved packets
2310                 self.assertIsNotNone(packet_info)
2311                 self.assert_equal(
2312                     payload_info.index, packet_info.index, "packet info index"
2313                 )
2314                 saved_packet = packet_info.data  # fetch the saved packet
2315                 # assert the values match
2316                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2317                 # ... more assertions here
2318                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2319             except Exception as e:
2320                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2321                 raise
2322         remaining_packet = self.get_next_packet_info_for_interface2(
2323             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2324         )
2325         self.assertIsNone(
2326             remaining_packet,
2327             "Interface %s: Packet expected from interface "
2328             "%s didn't arrive" % (dst_if.name, src_if.name),
2329         )
2330
2331     def verify_policy_match(self, pkt_count, spdEntry):
2332         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2333         matched_pkts = spdEntry.get_stats().get("packets")
2334         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2335         self.assert_equal(pkt_count, matched_pkts)
2336
2337
2338 if __name__ == "__main__":
2339     unittest.main(testRunner=VppTestRunner)