ipsec: Use .api declared error counters
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import (
10     IPv6,
11     ICMPv6EchoRequest,
12     IPv6ExtHdrHopByHop,
13     IPv6ExtHdrFragment,
14     IPv6ExtHdrDestOpt,
15 )
16
17
18 from framework import VppTestCase, VppTestRunner
19 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
20 from vpp_papi import VppEnum
21
22 from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
23 from ipaddress import ip_address
24 from re import search
25 from os import popen
26
27
28 class IPsecIPv4Params:
29
30     addr_type = socket.AF_INET
31     addr_any = "0.0.0.0"
32     addr_bcast = "255.255.255.255"
33     addr_len = 32
34     is_ipv6 = 0
35
36     def __init__(self):
37         self.remote_tun_if_host = "1.1.1.1"
38         self.remote_tun_if_host6 = "1111::1"
39
40         self.scapy_tun_sa_id = 100
41         self.scapy_tun_spi = 1000
42         self.vpp_tun_sa_id = 200
43         self.vpp_tun_spi = 2000
44
45         self.scapy_tra_sa_id = 300
46         self.scapy_tra_spi = 3000
47         self.vpp_tra_sa_id = 400
48         self.vpp_tra_spi = 4000
49
50         self.outer_hop_limit = 64
51         self.inner_hop_limit = 255
52         self.outer_flow_label = 0
53         self.inner_flow_label = 0x12345
54
55         self.auth_algo_vpp_id = (
56             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
57         )
58         self.auth_algo = "HMAC-SHA1-96"  # scapy name
59         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
60
61         self.crypt_algo_vpp_id = (
62             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
63         )
64         self.crypt_algo = "AES-CBC"  # scapy name
65         self.crypt_key = b"JPjyOWBeVEQiMe7h"
66         self.salt = 0
67         self.flags = 0
68         self.nat_header = None
69         self.tun_flags = (
70             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
71         )
72         self.dscp = 0
73         self.async_mode = False
74
75
76 class IPsecIPv6Params:
77
78     addr_type = socket.AF_INET6
79     addr_any = "0::0"
80     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
81     addr_len = 128
82     is_ipv6 = 1
83
84     def __init__(self):
85         self.remote_tun_if_host = "1111:1111:1111:1111:1111:1111:1111:1111"
86         self.remote_tun_if_host4 = "1.1.1.1"
87
88         self.scapy_tun_sa_id = 500
89         self.scapy_tun_spi = 3001
90         self.vpp_tun_sa_id = 600
91         self.vpp_tun_spi = 3000
92
93         self.scapy_tra_sa_id = 700
94         self.scapy_tra_spi = 4001
95         self.vpp_tra_sa_id = 800
96         self.vpp_tra_spi = 4000
97
98         self.outer_hop_limit = 64
99         self.inner_hop_limit = 255
100         self.outer_flow_label = 0
101         self.inner_flow_label = 0x12345
102
103         self.auth_algo_vpp_id = (
104             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
105         )
106         self.auth_algo = "HMAC-SHA1-96"  # scapy name
107         self.auth_key = b"C91KUR9GYMm5GfkEvNjX"
108
109         self.crypt_algo_vpp_id = (
110             VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
111         )
112         self.crypt_algo = "AES-CBC"  # scapy name
113         self.crypt_key = b"JPjyOWBeVEQiMe7h"
114         self.salt = 0
115         self.flags = 0
116         self.nat_header = None
117         self.tun_flags = (
118             VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_NONE
119         )
120         self.dscp = 0
121         self.async_mode = False
122
123
124 def mk_scapy_crypt_key(p):
125     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
126         return p.crypt_key + struct.pack("!I", p.salt)
127     else:
128         return p.crypt_key
129
130
131 def config_tun_params(p, encryption_type, tun_if):
132     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
133     esn_en = bool(
134         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
135     )
136     p.tun_dst = tun_if.remote_addr[p.addr_type]
137     p.tun_src = tun_if.local_addr[p.addr_type]
138     crypt_key = mk_scapy_crypt_key(p)
139     p.scapy_tun_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.vpp_tun_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
147         nat_t_header=p.nat_header,
148         esn_en=esn_en,
149     )
150     p.vpp_tun_sa = SecurityAssociation(
151         encryption_type,
152         spi=p.scapy_tun_spi,
153         crypt_algo=p.crypt_algo,
154         crypt_key=crypt_key,
155         auth_algo=p.auth_algo,
156         auth_key=p.auth_key,
157         tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
158         nat_t_header=p.nat_header,
159         esn_en=esn_en,
160     )
161
162
163 def config_tra_params(p, encryption_type):
164     esn_en = bool(
165         p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
166     )
167     crypt_key = mk_scapy_crypt_key(p)
168     p.scapy_tra_sa = SecurityAssociation(
169         encryption_type,
170         spi=p.vpp_tra_spi,
171         crypt_algo=p.crypt_algo,
172         crypt_key=crypt_key,
173         auth_algo=p.auth_algo,
174         auth_key=p.auth_key,
175         nat_t_header=p.nat_header,
176         esn_en=esn_en,
177     )
178     p.vpp_tra_sa = SecurityAssociation(
179         encryption_type,
180         spi=p.scapy_tra_spi,
181         crypt_algo=p.crypt_algo,
182         crypt_key=crypt_key,
183         auth_algo=p.auth_algo,
184         auth_key=p.auth_key,
185         nat_t_header=p.nat_header,
186         esn_en=esn_en,
187     )
188
189
190 class TemplateIpsec(VppTestCase):
191     """
192     TRANSPORT MODE::
193
194          ------   encrypt   ---
195         |tra_if| <-------> |VPP|
196          ------   decrypt   ---
197
198     TUNNEL MODE::
199
200          ------   encrypt   ---   plain   ---
201         |tun_if| <-------  |VPP| <------ |pg1|
202          ------             ---           ---
203
204          ------   decrypt   ---   plain   ---
205         |tun_if| ------->  |VPP| ------> |pg1|
206          ------             ---           ---
207     """
208
209     tun_spd_id = 1
210     tra_spd_id = 2
211
212     def ipsec_select_backend(self):
213         """empty method to be overloaded when necessary"""
214         pass
215
216     @classmethod
217     def setUpClass(cls):
218         super(TemplateIpsec, cls).setUpClass()
219
220     @classmethod
221     def tearDownClass(cls):
222         super(TemplateIpsec, cls).tearDownClass()
223
224     def setup_params(self):
225         if not hasattr(self, "ipv4_params"):
226             self.ipv4_params = IPsecIPv4Params()
227         if not hasattr(self, "ipv6_params"):
228             self.ipv6_params = IPsecIPv6Params()
229         self.params = {
230             self.ipv4_params.addr_type: self.ipv4_params,
231             self.ipv6_params.addr_type: self.ipv6_params,
232         }
233
234     def config_interfaces(self):
235         self.create_pg_interfaces(range(3))
236         self.interfaces = list(self.pg_interfaces)
237         for i in self.interfaces:
238             i.admin_up()
239             i.config_ip4()
240             i.resolve_arp()
241             i.config_ip6()
242             i.resolve_ndp()
243
244     def setUp(self):
245         super(TemplateIpsec, self).setUp()
246
247         self.setup_params()
248
249         self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
250         self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
251
252         self.config_interfaces()
253
254         self.ipsec_select_backend()
255
256     def unconfig_interfaces(self):
257         for i in self.interfaces:
258             i.admin_down()
259             i.unconfig_ip4()
260             i.unconfig_ip6()
261
262     def tearDown(self):
263         super(TemplateIpsec, self).tearDown()
264
265         self.unconfig_interfaces()
266
267     def show_commands_at_teardown(self):
268         self.logger.info(self.vapi.cli("show hardware"))
269
270     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
271         return [
272             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
273             / sa.encrypt(IP(src=src, dst=dst) / ICMP() / Raw(b"X" * payload_size))
274             for i in range(count)
275         ]
276
277     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=54):
278         return [
279             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
280             / sa.encrypt(
281                 IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
282                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
283             )
284             for i in range(count)
285         ]
286
287     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
288         return [
289             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
290             / IP(src=src, dst=dst)
291             / ICMP()
292             / Raw(b"X" * payload_size)
293             for i in range(count)
294         ]
295
296     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
297         return [
298             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
299             / IPv6(src=src, dst=dst, hlim=p.inner_hop_limit, fl=p.inner_flow_label)
300             / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
301             for i in range(count)
302         ]
303
304
305 class IpsecTcp(object):
306     def verify_tcp_checksum(self):
307         # start http cli server listener on http://0.0.0.0:80
308         self.vapi.cli("http cli server")
309         p = self.params[socket.AF_INET]
310         send = Ether(
311             src=self.tun_if.remote_mac, dst=self.tun_if.local_mac
312         ) / p.scapy_tun_sa.encrypt(
313             IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
314             / TCP(flags="S", dport=80)
315         )
316         self.logger.debug(ppp("Sending packet:", send))
317         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
318         recv = recv[0]
319         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
320         self.assert_packet_checksums_valid(decrypted)
321
322
323 class IpsecTcpTests(IpsecTcp):
324     def test_tcp_checksum(self):
325         """verify checksum correctness for vpp generated packets"""
326         self.verify_tcp_checksum()
327
328
329 class IpsecTra4(object):
330     """verify methods for Transport v4"""
331
332     def get_replay_counts(self, p):
333         replay_node_name = "/err/%s/replay" % self.tra4_decrypt_node_name[0]
334         count = self.statistics.get_err_counter(replay_node_name)
335
336         if p.async_mode:
337             replay_post_node_name = (
338                 "/err/%s/replay" % self.tra4_decrypt_node_name[p.async_mode]
339             )
340             count += self.statistics.get_err_counter(replay_post_node_name)
341
342         return count
343
344     def get_hash_failed_counts(self, p):
345         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
346             hash_failed_node_name = (
347                 "/err/%s/decryption_failed" % self.tra4_decrypt_node_name[p.async_mode]
348             )
349         else:
350             hash_failed_node_name = (
351                 "/err/%s/integ_error" % self.tra4_decrypt_node_name[p.async_mode]
352             )
353         count = self.statistics.get_err_counter(hash_failed_node_name)
354
355         if p.async_mode:
356             count += self.statistics.get_err_counter("/err/crypto-dispatch/bad-hmac")
357
358         return count
359
360     def verify_hi_seq_num(self):
361         p = self.params[socket.AF_INET]
362         saf = VppEnum.vl_api_ipsec_sad_flags_t
363         esn_on = p.vpp_tra_sa.esn_en
364         ar_on = p.flags & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
365
366         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
367         replay_count = self.get_replay_counts(p)
368         hash_failed_count = self.get_hash_failed_counts(p)
369         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
370
371         # a few packets so we get the rx seq number above the window size and
372         # thus can simulate a wrap with an out of window packet
373         pkts = [
374             (
375                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
376                 / p.scapy_tra_sa.encrypt(
377                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
378                     seq_num=seq,
379                 )
380             )
381             for seq in range(63, 80)
382         ]
383         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
384
385         # these 4 packets will all choose seq-num 0 to decrpyt since none
386         # are out of window when first checked. however, once #200 has
387         # decrypted it will move the window to 200 and has #81 is out of
388         # window. this packet should be dropped.
389         pkts = [
390             (
391                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
392                 / p.scapy_tra_sa.encrypt(
393                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
394                     seq_num=200,
395                 )
396             ),
397             (
398                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
399                 / p.scapy_tra_sa.encrypt(
400                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
401                     seq_num=81,
402                 )
403             ),
404             (
405                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
406                 / p.scapy_tra_sa.encrypt(
407                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
408                     seq_num=201,
409                 )
410             ),
411             (
412                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
413                 / p.scapy_tra_sa.encrypt(
414                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
415                     seq_num=202,
416                 )
417             ),
418         ]
419
420         # if anti-replay is off then we won't drop #81
421         n_rx = 3 if ar_on else 4
422         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=n_rx)
423         # this packet is one before the wrap
424         pkts = [
425             (
426                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
427                 / p.scapy_tra_sa.encrypt(
428                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
429                     seq_num=203,
430                 )
431             )
432         ]
433         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
434
435         # move the window over half way to a wrap
436         pkts = [
437             (
438                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
439                 / p.scapy_tra_sa.encrypt(
440                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
441                     seq_num=0x80000001,
442                 )
443             )
444         ]
445         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
446
447         # anti-replay will drop old packets, no anti-replay will not
448         pkts = [
449             (
450                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
451                 / p.scapy_tra_sa.encrypt(
452                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
453                     seq_num=0x44000001,
454                 )
455             )
456         ]
457
458         if ar_on:
459             self.send_and_assert_no_replies(self.tra_if, pkts)
460         else:
461             recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
462
463         if esn_on:
464             #
465             # validate wrapping the ESN
466             #
467
468             # wrap scapy's TX SA SN
469             p.scapy_tra_sa.seq_num = 0x100000005
470
471             # send a packet that wraps the window for both AR and no AR
472             pkts = [
473                 (
474                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
475                     / p.scapy_tra_sa.encrypt(
476                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
477                         / ICMP(),
478                         seq_num=0x100000005,
479                     )
480                 )
481             ]
482
483             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
484             for rx in rxs:
485                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
486
487             # move the window forward to half way to the next wrap
488             pkts = [
489                 (
490                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
491                     / p.scapy_tra_sa.encrypt(
492                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
493                         / ICMP(),
494                         seq_num=0x180000005,
495                     )
496                 )
497             ]
498
499             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
500
501             # a packet less than 2^30 from the current position is:
502             #  - AR: out of window and dropped
503             #  - non-AR: accepted
504             pkts = [
505                 (
506                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
507                     / p.scapy_tra_sa.encrypt(
508                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
509                         / ICMP(),
510                         seq_num=0x170000005,
511                     )
512                 )
513             ]
514
515             if ar_on:
516                 self.send_and_assert_no_replies(self.tra_if, pkts)
517             else:
518                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
519
520             # a packet more than 2^30 from the current position is:
521             #  - AR: out of window and dropped
522             #  - non-AR: considered a wrap, but since it's not a wrap
523             #    it won't decrpyt and so will be dropped
524             pkts = [
525                 (
526                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
527                     / p.scapy_tra_sa.encrypt(
528                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
529                         / ICMP(),
530                         seq_num=0x130000005,
531                     )
532                 )
533             ]
534
535             self.send_and_assert_no_replies(self.tra_if, pkts)
536
537             # a packet less than 2^30 from the current position and is a
538             # wrap; (the seq is currently at 0x180000005).
539             #  - AR: out of window so considered a wrap, so accepted
540             #  - non-AR: not considered a wrap, so won't decrypt
541             p.scapy_tra_sa.seq_num = 0x260000005
542             pkts = [
543                 (
544                     Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
545                     / p.scapy_tra_sa.encrypt(
546                         IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
547                         / ICMP(),
548                         seq_num=0x260000005,
549                     )
550                 )
551             ]
552             if ar_on:
553                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
554             else:
555                 self.send_and_assert_no_replies(self.tra_if, pkts)
556
557             #
558             # window positions are different now for AR/non-AR
559             #  move non-AR forward
560             #
561             if not ar_on:
562                 # a packet more than 2^30 from the current position and is a
563                 # wrap; (the seq is currently at 0x180000005).
564                 #  - AR: accepted
565                 #  - non-AR: not considered a wrap, so won't decrypt
566
567                 pkts = [
568                     (
569                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
570                         / p.scapy_tra_sa.encrypt(
571                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
572                             / ICMP(),
573                             seq_num=0x200000005,
574                         )
575                     ),
576                     (
577                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
578                         / p.scapy_tra_sa.encrypt(
579                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
580                             / ICMP(),
581                             seq_num=0x200000006,
582                         )
583                     ),
584                 ]
585                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
586
587                 pkts = [
588                     (
589                         Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
590                         / p.scapy_tra_sa.encrypt(
591                             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4)
592                             / ICMP(),
593                             seq_num=0x260000005,
594                         )
595                     )
596                 ]
597                 self.send_and_expect(self.tra_if, pkts, self.tra_if)
598
599     def verify_tra_anti_replay(self):
600         p = self.params[socket.AF_INET]
601         esn_en = p.vpp_tra_sa.esn_en
602
603         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
604         replay_count = self.get_replay_counts(p)
605         hash_failed_count = self.get_hash_failed_counts(p)
606         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
607
608         if ESP == self.encryption_type:
609             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
610             undersize_count = self.statistics.get_err_counter(undersize_node_name)
611
612         #
613         # send packets with seq numbers 1->34
614         # this means the window size is still in Case B (see RFC4303
615         # Appendix A)
616         #
617         # for reasons i haven't investigated Scapy won't create a packet with
618         # seq_num=0
619         #
620         pkts = [
621             (
622                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
623                 / p.scapy_tra_sa.encrypt(
624                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
625                     seq_num=seq,
626                 )
627             )
628             for seq in range(1, 34)
629         ]
630         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
631
632         # replayed packets are dropped
633         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
634         replay_count += len(pkts)
635         self.assertEqual(self.get_replay_counts(p), replay_count)
636
637         #
638         # now send a batch of packets all with the same sequence number
639         # the first packet in the batch is legitimate, the rest bogus
640         #
641         self.vapi.cli("clear error")
642         self.vapi.cli("clear node counters")
643         pkts = Ether(
644             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
645         ) / p.scapy_tra_sa.encrypt(
646             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
647             seq_num=35,
648         )
649         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
650         replay_count += 7
651         self.assertEqual(self.get_replay_counts(p), replay_count)
652
653         #
654         # now move the window over to 257 (more than one byte) and into Case A
655         #
656         self.vapi.cli("clear error")
657         pkt = Ether(
658             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
659         ) / p.scapy_tra_sa.encrypt(
660             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
661             seq_num=257,
662         )
663         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
664
665         # replayed packets are dropped
666         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
667         replay_count += 3
668         self.assertEqual(self.get_replay_counts(p), replay_count)
669
670         # the window size is 64 packets
671         # in window are still accepted
672         pkt = Ether(
673             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
674         ) / p.scapy_tra_sa.encrypt(
675             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
676             seq_num=200,
677         )
678         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
679
680         # a packet that does not decrypt does not move the window forward
681         bogus_sa = SecurityAssociation(
682             self.encryption_type,
683             p.vpp_tra_spi,
684             crypt_algo=p.crypt_algo,
685             crypt_key=mk_scapy_crypt_key(p)[::-1],
686             auth_algo=p.auth_algo,
687             auth_key=p.auth_key[::-1],
688         )
689         pkt = Ether(
690             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
691         ) / bogus_sa.encrypt(
692             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
693             seq_num=350,
694         )
695         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
696
697         hash_failed_count += 17
698         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
699
700         # a malformed 'runt' packet
701         #  created by a mis-constructed SA
702         if ESP == self.encryption_type and p.crypt_algo != "NULL":
703             bogus_sa = SecurityAssociation(self.encryption_type, p.vpp_tra_spi)
704             pkt = Ether(
705                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
706             ) / bogus_sa.encrypt(
707                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
708                 seq_num=350,
709             )
710             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
711
712             undersize_count += 17
713             self.assert_error_counter_equal(undersize_node_name, undersize_count)
714
715         # which we can determine since this packet is still in the window
716         pkt = Ether(
717             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
718         ) / p.scapy_tra_sa.encrypt(
719             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
720             seq_num=234,
721         )
722         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
723
724         #
725         # out of window are dropped
726         #  this is Case B. So VPP will consider this to be a high seq num wrap
727         #  and so the decrypt attempt will fail
728         #
729         pkt = Ether(
730             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
731         ) / p.scapy_tra_sa.encrypt(
732             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
733             seq_num=17,
734         )
735         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
736
737         if esn_en:
738             # an out of window error with ESN looks like a high sequence
739             # wrap. but since it isn't then the verify will fail.
740             hash_failed_count += 17
741             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
742
743         else:
744             replay_count += 17
745             self.assertEqual(self.get_replay_counts(p), replay_count)
746
747         # valid packet moves the window over to 258
748         pkt = Ether(
749             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
750         ) / p.scapy_tra_sa.encrypt(
751             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
752             seq_num=258,
753         )
754         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
755         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
756
757         #
758         # move VPP's SA TX seq-num to just before the seq-number wrap.
759         # then fire in a packet that VPP should drop on TX because it
760         # causes the TX seq number to wrap; unless we're using extened sequence
761         # numbers.
762         #
763         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
764         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
765         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
766
767         pkts = [
768             (
769                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
770                 / p.scapy_tra_sa.encrypt(
771                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
772                     seq_num=seq,
773                 )
774             )
775             for seq in range(259, 280)
776         ]
777
778         if esn_en:
779             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
780
781             #
782             # in order for scapy to decrypt its SA's high order number needs
783             # to wrap
784             #
785             p.vpp_tra_sa.seq_num = 0x100000000
786             for rx in rxs:
787                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
788
789             #
790             # wrap scapy's TX high sequence number. VPP is in case B, so it
791             # will consider this a high seq wrap also.
792             # The low seq num we set it to will place VPP's RX window in Case A
793             #
794             p.scapy_tra_sa.seq_num = 0x100000005
795             pkt = Ether(
796                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
797             ) / p.scapy_tra_sa.encrypt(
798                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
799                 seq_num=0x100000005,
800             )
801             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
802
803             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
804
805             #
806             # A packet that has seq num between (2^32-64) and 5 is within
807             # the window
808             #
809             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
810             pkt = Ether(
811                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
812             ) / p.scapy_tra_sa.encrypt(
813                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
814                 seq_num=0xFFFFFFFD,
815             )
816             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
817             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
818
819             #
820             # While in case A we cannot wrap the high sequence number again
821             # because VPP will consider this packet to be one that moves the
822             # window forward
823             #
824             pkt = Ether(
825                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
826             ) / p.scapy_tra_sa.encrypt(
827                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
828                 seq_num=0x200000999,
829             )
830             self.send_and_assert_no_replies(
831                 self.tra_if, [pkt], self.tra_if, timeout=0.2
832             )
833
834             hash_failed_count += 1
835             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
836
837             #
838             # but if we move the window forward to case B, then we can wrap
839             # again
840             #
841             p.scapy_tra_sa.seq_num = 0x100000555
842             pkt = Ether(
843                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
844             ) / p.scapy_tra_sa.encrypt(
845                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
846                 seq_num=0x100000555,
847             )
848             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
849             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
850
851             p.scapy_tra_sa.seq_num = 0x200000444
852             pkt = Ether(
853                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
854             ) / p.scapy_tra_sa.encrypt(
855                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
856                 seq_num=0x200000444,
857             )
858             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
859             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
860
861         else:
862             #
863             # without ESN TX sequence numbers can't wrap and packets are
864             # dropped from here on out.
865             #
866             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
867             seq_cycle_count += len(pkts)
868             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
869
870         # move the security-associations seq number on to the last we used
871         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
872         p.scapy_tra_sa.seq_num = 351
873         p.vpp_tra_sa.seq_num = 351
874
875     def verify_tra_lost(self):
876         p = self.params[socket.AF_INET]
877         esn_en = p.vpp_tra_sa.esn_en
878
879         #
880         # send packets with seq numbers 1->34
881         # this means the window size is still in Case B (see RFC4303
882         # Appendix A)
883         #
884         # for reasons i haven't investigated Scapy won't create a packet with
885         # seq_num=0
886         #
887         pkts = [
888             (
889                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
890                 / p.scapy_tra_sa.encrypt(
891                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
892                     seq_num=seq,
893                 )
894             )
895             for seq in range(1, 3)
896         ]
897         self.send_and_expect(self.tra_if, pkts, self.tra_if)
898
899         self.assertEqual(p.tra_sa_out.get_lost(), 0)
900
901         # skip a sequence number
902         pkts = [
903             (
904                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
905                 / p.scapy_tra_sa.encrypt(
906                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
907                     seq_num=seq,
908                 )
909             )
910             for seq in range(4, 6)
911         ]
912         self.send_and_expect(self.tra_if, pkts, self.tra_if)
913
914         self.assertEqual(p.tra_sa_out.get_lost(), 0)
915
916         # the lost packet are counted untill we get up past the first
917         # sizeof(replay_window) packets
918         pkts = [
919             (
920                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
921                 / p.scapy_tra_sa.encrypt(
922                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
923                     seq_num=seq,
924                 )
925             )
926             for seq in range(6, 100)
927         ]
928         self.send_and_expect(self.tra_if, pkts, self.tra_if)
929
930         self.assertEqual(p.tra_sa_out.get_lost(), 1)
931
932         # lost of holes in the sequence
933         pkts = [
934             (
935                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
936                 / p.scapy_tra_sa.encrypt(
937                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
938                     seq_num=seq,
939                 )
940             )
941             for seq in range(100, 200, 2)
942         ]
943         self.send_and_expect(self.tra_if, pkts, self.tra_if, n_rx=50)
944
945         pkts = [
946             (
947                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
948                 / p.scapy_tra_sa.encrypt(
949                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
950                     seq_num=seq,
951                 )
952             )
953             for seq in range(200, 300)
954         ]
955         self.send_and_expect(self.tra_if, pkts, self.tra_if)
956
957         self.assertEqual(p.tra_sa_out.get_lost(), 51)
958
959         # a big hole in the seq number space
960         pkts = [
961             (
962                 Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
963                 / p.scapy_tra_sa.encrypt(
964                     IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
965                     seq_num=seq,
966                 )
967             )
968             for seq in range(400, 500)
969         ]
970         self.send_and_expect(self.tra_if, pkts, self.tra_if)
971
972         self.assertEqual(p.tra_sa_out.get_lost(), 151)
973
974     def verify_tra_basic4(self, count=1, payload_size=54):
975         """ipsec v4 transport basic test"""
976         self.vapi.cli("clear errors")
977         self.vapi.cli("clear ipsec sa")
978         try:
979             p = self.params[socket.AF_INET]
980             send_pkts = self.gen_encrypt_pkts(
981                 p,
982                 p.scapy_tra_sa,
983                 self.tra_if,
984                 src=self.tra_if.remote_ip4,
985                 dst=self.tra_if.local_ip4,
986                 count=count,
987                 payload_size=payload_size,
988             )
989             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
990             for rx in recv_pkts:
991                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
992                 self.assert_packet_checksums_valid(rx)
993                 try:
994                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
995                     self.assert_packet_checksums_valid(decrypted)
996                 except:
997                     self.logger.debug(ppp("Unexpected packet:", rx))
998                     raise
999         finally:
1000             self.logger.info(self.vapi.ppcli("show error"))
1001             self.logger.info(self.vapi.ppcli("show ipsec all"))
1002
1003         pkts = p.tra_sa_in.get_stats()["packets"]
1004         self.assertEqual(
1005             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1006         )
1007         pkts = p.tra_sa_out.get_stats()["packets"]
1008         self.assertEqual(
1009             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1010         )
1011         self.assertEqual(p.tra_sa_out.get_lost(), 0)
1012         self.assertEqual(p.tra_sa_in.get_lost(), 0)
1013
1014         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
1015         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
1016
1017
1018 class IpsecTra4Tests(IpsecTra4):
1019     """UT test methods for Transport v4"""
1020
1021     def test_tra_anti_replay(self):
1022         """ipsec v4 transport anti-replay test"""
1023         self.verify_tra_anti_replay()
1024
1025     def test_tra_lost(self):
1026         """ipsec v4 transport lost packet test"""
1027         self.verify_tra_lost()
1028
1029     def test_tra_basic(self, count=1):
1030         """ipsec v4 transport basic test"""
1031         self.verify_tra_basic4(count=1)
1032
1033     def test_tra_burst(self):
1034         """ipsec v4 transport burst test"""
1035         self.verify_tra_basic4(count=257)
1036
1037
1038 class IpsecTra6(object):
1039     """verify methods for Transport v6"""
1040
1041     def verify_tra_basic6(self, count=1, payload_size=54):
1042         self.vapi.cli("clear errors")
1043         self.vapi.cli("clear ipsec sa")
1044         try:
1045             p = self.params[socket.AF_INET6]
1046             send_pkts = self.gen_encrypt_pkts6(
1047                 p,
1048                 p.scapy_tra_sa,
1049                 self.tra_if,
1050                 src=self.tra_if.remote_ip6,
1051                 dst=self.tra_if.local_ip6,
1052                 count=count,
1053                 payload_size=payload_size,
1054             )
1055             recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if)
1056             for rx in recv_pkts:
1057                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1058                 try:
1059                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
1060                     self.assert_packet_checksums_valid(decrypted)
1061                 except:
1062                     self.logger.debug(ppp("Unexpected packet:", rx))
1063                     raise
1064         finally:
1065             self.logger.info(self.vapi.ppcli("show error"))
1066             self.logger.info(self.vapi.ppcli("show ipsec all"))
1067
1068         pkts = p.tra_sa_in.get_stats()["packets"]
1069         self.assertEqual(
1070             pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1071         )
1072         pkts = p.tra_sa_out.get_stats()["packets"]
1073         self.assertEqual(
1074             pkts, count, "incorrect SA out counts: expected %d != %d" % (count, pkts)
1075         )
1076         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
1077         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
1078
1079     def gen_encrypt_pkts_ext_hdrs6(
1080         self, sa, sw_intf, src, dst, count=1, payload_size=54
1081     ):
1082         return [
1083             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1084             / sa.encrypt(
1085                 IPv6(src=src, dst=dst)
1086                 / ICMPv6EchoRequest(id=0, seq=1, data="X" * payload_size)
1087             )
1088             for i in range(count)
1089         ]
1090
1091     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
1092         return [
1093             Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
1094             / IPv6(src=src, dst=dst)
1095             / IPv6ExtHdrHopByHop()
1096             / IPv6ExtHdrFragment(id=2, offset=200)
1097             / Raw(b"\xff" * 200)
1098             for i in range(count)
1099         ]
1100
1101     def verify_tra_encrypted6(self, p, sa, rxs):
1102         decrypted = []
1103         for rx in rxs:
1104             self.assert_packet_checksums_valid(rx)
1105             try:
1106                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
1107                 decrypted.append(decrypt_pkt)
1108                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
1109                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
1110             except:
1111                 self.logger.debug(ppp("Unexpected packet:", rx))
1112                 try:
1113                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1114                 except:
1115                     pass
1116                 raise
1117         return decrypted
1118
1119     def verify_tra_66_ext_hdrs(self, p):
1120         count = 63
1121
1122         #
1123         # check we can decrypt with options
1124         #
1125         tx = self.gen_encrypt_pkts_ext_hdrs6(
1126             p.scapy_tra_sa,
1127             self.tra_if,
1128             src=self.tra_if.remote_ip6,
1129             dst=self.tra_if.local_ip6,
1130             count=count,
1131         )
1132         self.send_and_expect(self.tra_if, tx, self.tra_if)
1133
1134         #
1135         # injecting a packet from ourselves to be routed of box is a hack
1136         # but it matches an outbout policy, alors je ne regrette rien
1137         #
1138
1139         # one extension before ESP
1140         tx = (
1141             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1142             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1143             / IPv6ExtHdrFragment(id=2, offset=200)
1144             / Raw(b"\xff" * 200)
1145         )
1146
1147         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1148         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1149
1150         for dc in dcs:
1151             # for reasons i'm not going to investigate scapy does not
1152             # created the correct headers after decrypt. but reparsing
1153             # the ipv6 packet fixes it
1154             dc = IPv6(raw(dc[IPv6]))
1155             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1156
1157         # two extensions before ESP
1158         tx = (
1159             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1160             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1161             / IPv6ExtHdrHopByHop()
1162             / IPv6ExtHdrFragment(id=2, offset=200)
1163             / Raw(b"\xff" * 200)
1164         )
1165
1166         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1167         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1168
1169         for dc in dcs:
1170             dc = IPv6(raw(dc[IPv6]))
1171             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1172             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1173
1174         # two extensions before ESP, one after
1175         tx = (
1176             Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac)
1177             / IPv6(src=self.tra_if.local_ip6, dst=self.tra_if.remote_ip6)
1178             / IPv6ExtHdrHopByHop()
1179             / IPv6ExtHdrFragment(id=2, offset=200)
1180             / IPv6ExtHdrDestOpt()
1181             / Raw(b"\xff" * 200)
1182         )
1183
1184         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
1185         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
1186
1187         for dc in dcs:
1188             dc = IPv6(raw(dc[IPv6]))
1189             self.assertTrue(dc[IPv6ExtHdrDestOpt])
1190             self.assertTrue(dc[IPv6ExtHdrHopByHop])
1191             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
1192
1193
1194 class IpsecTra6Tests(IpsecTra6):
1195     """UT test methods for Transport v6"""
1196
1197     def test_tra_basic6(self):
1198         """ipsec v6 transport basic test"""
1199         self.verify_tra_basic6(count=1)
1200
1201     def test_tra_burst6(self):
1202         """ipsec v6 transport burst test"""
1203         self.verify_tra_basic6(count=257)
1204
1205
1206 class IpsecTra6ExtTests(IpsecTra6):
1207     def test_tra_ext_hdrs_66(self):
1208         """ipsec 6o6 tra extension headers test"""
1209         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
1210
1211
1212 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
1213     """UT test methods for Transport v6 and v4"""
1214
1215     pass
1216
1217
1218 class IpsecTun4(object):
1219     """verify methods for Tunnel v4"""
1220
1221     def verify_counters4(self, p, count, n_frags=None, worker=None):
1222         if not n_frags:
1223             n_frags = count
1224         if hasattr(p, "spd_policy_in_any"):
1225             pkts = p.spd_policy_in_any.get_stats(worker)["packets"]
1226             self.assertEqual(
1227                 pkts,
1228                 count,
1229                 "incorrect SPD any policy: expected %d != %d" % (count, pkts),
1230             )
1231
1232         if hasattr(p, "tun_sa_in"):
1233             pkts = p.tun_sa_in.get_stats(worker)["packets"]
1234             self.assertEqual(
1235                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1236             )
1237             pkts = p.tun_sa_out.get_stats(worker)["packets"]
1238             self.assertEqual(
1239                 pkts,
1240                 n_frags,
1241                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1242             )
1243
1244         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
1245         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
1246
1247     def verify_decrypted(self, p, rxs):
1248         for rx in rxs:
1249             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1250             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1251             self.assert_packet_checksums_valid(rx)
1252
1253     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
1254         align = sa.crypt_algo.block_size
1255         if align < 4:
1256             align = 4
1257         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
1258         exp_len += sa.crypt_algo.iv_size
1259         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
1260         self.assertEqual(exp_len, len(esp_payload))
1261
1262     def verify_encrypted(self, p, sa, rxs):
1263         decrypt_pkts = []
1264         for rx in rxs:
1265             if p.nat_header:
1266                 self.assertEqual(rx[UDP].dport, 4500)
1267             self.assert_packet_checksums_valid(rx)
1268             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
1269             try:
1270                 rx_ip = rx[IP]
1271                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
1272                 if not decrypt_pkt.haslayer(IP):
1273                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
1274                 if rx_ip.proto == socket.IPPROTO_ESP:
1275                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
1276                 decrypt_pkts.append(decrypt_pkt)
1277                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1278                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1279             except:
1280                 self.logger.debug(ppp("Unexpected packet:", rx))
1281                 try:
1282                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1283                 except:
1284                     pass
1285                 raise
1286         pkts = reassemble4(decrypt_pkts)
1287         for pkt in pkts:
1288             self.assert_packet_checksums_valid(pkt)
1289
1290     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
1291         self.vapi.cli("clear errors")
1292         self.vapi.cli("clear ipsec counters")
1293         self.vapi.cli("clear ipsec sa")
1294         if not n_rx:
1295             n_rx = count
1296         try:
1297             send_pkts = self.gen_encrypt_pkts(
1298                 p,
1299                 p.scapy_tun_sa,
1300                 self.tun_if,
1301                 src=p.remote_tun_if_host,
1302                 dst=self.pg1.remote_ip4,
1303                 count=count,
1304                 payload_size=payload_size,
1305             )
1306             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1307             self.verify_decrypted(p, recv_pkts)
1308
1309             send_pkts = self.gen_pkts(
1310                 self.pg1,
1311                 src=self.pg1.remote_ip4,
1312                 dst=p.remote_tun_if_host,
1313                 count=count,
1314                 payload_size=payload_size,
1315             )
1316             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if, n_rx)
1317             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1318
1319             for rx in recv_pkts:
1320                 self.assertEqual(rx[IP].src, p.tun_src)
1321                 self.assertEqual(rx[IP].dst, p.tun_dst)
1322
1323         finally:
1324             self.logger.info(self.vapi.ppcli("show error"))
1325             self.logger.info(self.vapi.ppcli("show ipsec all"))
1326
1327         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
1328         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
1329         self.verify_counters4(p, count, n_rx)
1330
1331     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
1332         self.vapi.cli("clear errors")
1333         if not n_rx:
1334             n_rx = count
1335         try:
1336             send_pkts = self.gen_encrypt_pkts(
1337                 p,
1338                 p.scapy_tun_sa,
1339                 self.tun_if,
1340                 src=p.remote_tun_if_host,
1341                 dst=self.pg1.remote_ip4,
1342                 count=count,
1343             )
1344             self.send_and_assert_no_replies(self.tun_if, send_pkts)
1345
1346             send_pkts = self.gen_pkts(
1347                 self.pg1,
1348                 src=self.pg1.remote_ip4,
1349                 dst=p.remote_tun_if_host,
1350                 count=count,
1351                 payload_size=payload_size,
1352             )
1353             self.send_and_assert_no_replies(self.pg1, send_pkts)
1354
1355         finally:
1356             self.logger.info(self.vapi.ppcli("show error"))
1357             self.logger.info(self.vapi.ppcli("show ipsec all"))
1358
1359     def verify_tun_reass_44(self, p):
1360         self.vapi.cli("clear errors")
1361         self.vapi.ip_reassembly_enable_disable(
1362             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True
1363         )
1364
1365         try:
1366             send_pkts = self.gen_encrypt_pkts(
1367                 p,
1368                 p.scapy_tun_sa,
1369                 self.tun_if,
1370                 src=p.remote_tun_if_host,
1371                 dst=self.pg1.remote_ip4,
1372                 payload_size=1900,
1373                 count=1,
1374             )
1375             send_pkts = fragment_rfc791(send_pkts[0], 1400)
1376             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1377             self.verify_decrypted(p, recv_pkts)
1378
1379             send_pkts = self.gen_pkts(
1380                 self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host, count=1
1381             )
1382             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1383             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1384
1385         finally:
1386             self.logger.info(self.vapi.ppcli("show error"))
1387             self.logger.info(self.vapi.ppcli("show ipsec all"))
1388
1389         self.verify_counters4(p, 1, 1)
1390         self.vapi.ip_reassembly_enable_disable(
1391             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False
1392         )
1393
1394     def verify_tun_64(self, p, count=1):
1395         self.vapi.cli("clear errors")
1396         self.vapi.cli("clear ipsec sa")
1397         try:
1398             send_pkts = self.gen_encrypt_pkts6(
1399                 p,
1400                 p.scapy_tun_sa,
1401                 self.tun_if,
1402                 src=p.remote_tun_if_host6,
1403                 dst=self.pg1.remote_ip6,
1404                 count=count,
1405             )
1406             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1407             for recv_pkt in recv_pkts:
1408                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
1409                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
1410                 self.assert_packet_checksums_valid(recv_pkt)
1411             send_pkts = self.gen_pkts6(
1412                 p,
1413                 self.pg1,
1414                 src=self.pg1.remote_ip6,
1415                 dst=p.remote_tun_if_host6,
1416                 count=count,
1417             )
1418             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1419             for recv_pkt in recv_pkts:
1420                 try:
1421                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
1422                     if not decrypt_pkt.haslayer(IPv6):
1423                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1424                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1425                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
1426                     self.assert_packet_checksums_valid(decrypt_pkt)
1427                 except:
1428                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
1429                     try:
1430                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1431                     except:
1432                         pass
1433                     raise
1434         finally:
1435             self.logger.info(self.vapi.ppcli("show error"))
1436             self.logger.info(self.vapi.ppcli("show ipsec all"))
1437
1438         self.verify_counters4(p, count)
1439
1440     def verify_keepalive(self, p):
1441         # the sizeof Raw is calculated to pad to the minimum ehternet
1442         # frame size of 64 btyes
1443         pkt = (
1444             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1445             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1446             / UDP(sport=333, dport=4500)
1447             / Raw(b"\xff")
1448             / Padding(0 * 21)
1449         )
1450         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1451         self.assert_error_counter_equal(
1452             "/err/%s/nat_keepalive" % self.tun4_input_node, 31
1453         )
1454
1455         pkt = (
1456             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1457             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1458             / UDP(sport=333, dport=4500)
1459             / Raw(b"\xfe")
1460         )
1461         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1462         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 31)
1463
1464         pkt = (
1465             Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac)
1466             / IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4)
1467             / UDP(sport=333, dport=4500)
1468             / Raw(b"\xfe")
1469             / Padding(0 * 21)
1470         )
1471         self.send_and_assert_no_replies(self.tun_if, pkt * 31)
1472         self.assert_error_counter_equal("/err/%s/too_short" % self.tun4_input_node, 62)
1473
1474
1475 class IpsecTun4Tests(IpsecTun4):
1476     """UT test methods for Tunnel v4"""
1477
1478     def test_tun_basic44(self):
1479         """ipsec 4o4 tunnel basic test"""
1480         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1481         self.tun_if.admin_down()
1482         self.tun_if.resolve_arp()
1483         self.tun_if.admin_up()
1484         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1485
1486     def test_tun_reass_basic44(self):
1487         """ipsec 4o4 tunnel basic reassembly test"""
1488         self.verify_tun_reass_44(self.params[socket.AF_INET])
1489
1490     def test_tun_burst44(self):
1491         """ipsec 4o4 tunnel burst test"""
1492         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1493
1494
1495 class IpsecTun6(object):
1496     """verify methods for Tunnel v6"""
1497
1498     def verify_counters6(self, p_in, p_out, count, worker=None):
1499         if hasattr(p_in, "tun_sa_in"):
1500             pkts = p_in.tun_sa_in.get_stats(worker)["packets"]
1501             self.assertEqual(
1502                 pkts, count, "incorrect SA in counts: expected %d != %d" % (count, pkts)
1503             )
1504         if hasattr(p_out, "tun_sa_out"):
1505             pkts = p_out.tun_sa_out.get_stats(worker)["packets"]
1506             self.assertEqual(
1507                 pkts,
1508                 count,
1509                 "incorrect SA out counts: expected %d != %d" % (count, pkts),
1510             )
1511         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1512         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1513
1514     def verify_decrypted6(self, p, rxs):
1515         for rx in rxs:
1516             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1517             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1518             self.assert_packet_checksums_valid(rx)
1519
1520     def verify_encrypted6(self, p, sa, rxs):
1521         for rx in rxs:
1522             self.assert_packet_checksums_valid(rx)
1523             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()), rx[IPv6].plen)
1524             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1525             if p.outer_flow_label:
1526                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1527             try:
1528                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1529                 if not decrypt_pkt.haslayer(IPv6):
1530                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1531                 self.assert_packet_checksums_valid(decrypt_pkt)
1532                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1533                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1534                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1535                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1536             except:
1537                 self.logger.debug(ppp("Unexpected packet:", rx))
1538                 try:
1539                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1540                 except:
1541                     pass
1542                 raise
1543
1544     def verify_drop_tun_tx_66(self, p_in, count=1, payload_size=64):
1545         self.vapi.cli("clear errors")
1546         self.vapi.cli("clear ipsec sa")
1547
1548         send_pkts = self.gen_pkts6(
1549             p_in,
1550             self.pg1,
1551             src=self.pg1.remote_ip6,
1552             dst=p_in.remote_tun_if_host,
1553             count=count,
1554             payload_size=payload_size,
1555         )
1556         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1557         self.logger.info(self.vapi.cli("sh punt stats"))
1558
1559     def verify_drop_tun_rx_66(self, p_in, count=1, payload_size=64):
1560         self.vapi.cli("clear errors")
1561         self.vapi.cli("clear ipsec sa")
1562
1563         send_pkts = self.gen_encrypt_pkts6(
1564             p_in,
1565             p_in.scapy_tun_sa,
1566             self.tun_if,
1567             src=p_in.remote_tun_if_host,
1568             dst=self.pg1.remote_ip6,
1569             count=count,
1570         )
1571         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1572
1573     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1574         self.verify_drop_tun_tx_66(p_in, count=count, payload_size=payload_size)
1575         self.verify_drop_tun_rx_66(p_in, count=count, payload_size=payload_size)
1576
1577     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1578         self.vapi.cli("clear errors")
1579         self.vapi.cli("clear ipsec sa")
1580         if not p_out:
1581             p_out = p_in
1582         try:
1583             send_pkts = self.gen_encrypt_pkts6(
1584                 p_in,
1585                 p_in.scapy_tun_sa,
1586                 self.tun_if,
1587                 src=p_in.remote_tun_if_host,
1588                 dst=self.pg1.remote_ip6,
1589                 count=count,
1590                 payload_size=payload_size,
1591             )
1592             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1593             self.verify_decrypted6(p_in, recv_pkts)
1594
1595             send_pkts = self.gen_pkts6(
1596                 p_in,
1597                 self.pg1,
1598                 src=self.pg1.remote_ip6,
1599                 dst=p_out.remote_tun_if_host,
1600                 count=count,
1601                 payload_size=payload_size,
1602             )
1603             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1604             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1605
1606             for rx in recv_pkts:
1607                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1608                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1609
1610         finally:
1611             self.logger.info(self.vapi.ppcli("show error"))
1612             self.logger.info(self.vapi.ppcli("show ipsec all"))
1613         self.verify_counters6(p_in, p_out, count)
1614
1615     def verify_tun_reass_66(self, p):
1616         self.vapi.cli("clear errors")
1617         self.vapi.ip_reassembly_enable_disable(
1618             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True
1619         )
1620
1621         try:
1622             send_pkts = self.gen_encrypt_pkts6(
1623                 p,
1624                 p.scapy_tun_sa,
1625                 self.tun_if,
1626                 src=p.remote_tun_if_host,
1627                 dst=self.pg1.remote_ip6,
1628                 count=1,
1629                 payload_size=1850,
1630             )
1631             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1632             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1)
1633             self.verify_decrypted6(p, recv_pkts)
1634
1635             send_pkts = self.gen_pkts6(
1636                 p,
1637                 self.pg1,
1638                 src=self.pg1.remote_ip6,
1639                 dst=p.remote_tun_if_host,
1640                 count=1,
1641                 payload_size=64,
1642             )
1643             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1644             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1645         finally:
1646             self.logger.info(self.vapi.ppcli("show error"))
1647             self.logger.info(self.vapi.ppcli("show ipsec all"))
1648         self.verify_counters6(p, p, 1)
1649         self.vapi.ip_reassembly_enable_disable(
1650             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False
1651         )
1652
1653     def verify_tun_46(self, p, count=1):
1654         """ipsec 4o6 tunnel basic test"""
1655         self.vapi.cli("clear errors")
1656         self.vapi.cli("clear ipsec sa")
1657         try:
1658             send_pkts = self.gen_encrypt_pkts(
1659                 p,
1660                 p.scapy_tun_sa,
1661                 self.tun_if,
1662                 src=p.remote_tun_if_host4,
1663                 dst=self.pg1.remote_ip4,
1664                 count=count,
1665             )
1666             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1667             for recv_pkt in recv_pkts:
1668                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1669                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1670                 self.assert_packet_checksums_valid(recv_pkt)
1671             send_pkts = self.gen_pkts(
1672                 self.pg1,
1673                 src=self.pg1.remote_ip4,
1674                 dst=p.remote_tun_if_host4,
1675                 count=count,
1676             )
1677             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1678             for recv_pkt in recv_pkts:
1679                 try:
1680                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1681                     if not decrypt_pkt.haslayer(IP):
1682                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1683                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1684                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1685                     self.assert_packet_checksums_valid(decrypt_pkt)
1686                 except:
1687                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1688                     try:
1689                         self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1690                     except:
1691                         pass
1692                     raise
1693         finally:
1694             self.logger.info(self.vapi.ppcli("show error"))
1695             self.logger.info(self.vapi.ppcli("show ipsec all"))
1696         self.verify_counters6(p, p, count)
1697
1698
1699 class IpsecTun6Tests(IpsecTun6):
1700     """UT test methods for Tunnel v6"""
1701
1702     def test_tun_basic66(self):
1703         """ipsec 6o6 tunnel basic test"""
1704         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1705
1706     def test_tun_reass_basic66(self):
1707         """ipsec 6o6 tunnel basic reassembly test"""
1708         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1709
1710     def test_tun_burst66(self):
1711         """ipsec 6o6 tunnel burst test"""
1712         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1713
1714
1715 class IpsecTun6HandoffTests(IpsecTun6):
1716     """UT test methods for Tunnel v6 with multiple workers"""
1717
1718     vpp_worker_count = 2
1719
1720     def test_tun_handoff_66(self):
1721         """ipsec 6o6 tunnel worker hand-off test"""
1722         self.vapi.cli("clear errors")
1723         self.vapi.cli("clear ipsec sa")
1724
1725         N_PKTS = 15
1726         p = self.params[socket.AF_INET6]
1727
1728         # inject alternately on worker 0 and 1. all counts on the SA
1729         # should be against worker 0
1730         for worker in [0, 1, 0, 1]:
1731             send_pkts = self.gen_encrypt_pkts6(
1732                 p,
1733                 p.scapy_tun_sa,
1734                 self.tun_if,
1735                 src=p.remote_tun_if_host,
1736                 dst=self.pg1.remote_ip6,
1737                 count=N_PKTS,
1738             )
1739             recv_pkts = self.send_and_expect(
1740                 self.tun_if, send_pkts, self.pg1, worker=worker
1741             )
1742             self.verify_decrypted6(p, recv_pkts)
1743
1744             send_pkts = self.gen_pkts6(
1745                 p,
1746                 self.pg1,
1747                 src=self.pg1.remote_ip6,
1748                 dst=p.remote_tun_if_host,
1749                 count=N_PKTS,
1750             )
1751             recv_pkts = self.send_and_expect(
1752                 self.pg1, send_pkts, self.tun_if, worker=worker
1753             )
1754             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1755
1756         # all counts against the first worker that was used
1757         self.verify_counters6(p, p, 4 * N_PKTS, worker=0)
1758
1759
1760 class IpsecTun4HandoffTests(IpsecTun4):
1761     """UT test methods for Tunnel v4 with multiple workers"""
1762
1763     vpp_worker_count = 2
1764
1765     def test_tun_handooff_44(self):
1766         """ipsec 4o4 tunnel worker hand-off test"""
1767         self.vapi.cli("clear errors")
1768         self.vapi.cli("clear ipsec sa")
1769
1770         N_PKTS = 15
1771         p = self.params[socket.AF_INET]
1772
1773         # inject alternately on worker 0 and 1. all counts on the SA
1774         # should be against worker 0
1775         for worker in [0, 1, 0, 1]:
1776             send_pkts = self.gen_encrypt_pkts(
1777                 p,
1778                 p.scapy_tun_sa,
1779                 self.tun_if,
1780                 src=p.remote_tun_if_host,
1781                 dst=self.pg1.remote_ip4,
1782                 count=N_PKTS,
1783             )
1784             recv_pkts = self.send_and_expect(
1785                 self.tun_if, send_pkts, self.pg1, worker=worker
1786             )
1787             self.verify_decrypted(p, recv_pkts)
1788
1789             send_pkts = self.gen_pkts(
1790                 self.pg1,
1791                 src=self.pg1.remote_ip4,
1792                 dst=p.remote_tun_if_host,
1793                 count=N_PKTS,
1794             )
1795             recv_pkts = self.send_and_expect(
1796                 self.pg1, send_pkts, self.tun_if, worker=worker
1797             )
1798             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1799
1800         # all counts against the first worker that was used
1801         self.verify_counters4(p, 4 * N_PKTS, worker=0)
1802
1803
1804 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1805     """UT test methods for Tunnel v6 & v4"""
1806
1807     pass
1808
1809
1810 class IPSecIPv4Fwd(VppTestCase):
1811     """Test IPSec by capturing and verifying IPv4 forwarded pkts"""
1812
1813     @classmethod
1814     def setUpConstants(cls):
1815         super(IPSecIPv4Fwd, cls).setUpConstants()
1816
1817     def setUp(self):
1818         super(IPSecIPv4Fwd, self).setUp()
1819         # store SPD objects so we can remove configs on tear down
1820         self.spd_objs = []
1821         self.spd_policies = []
1822
1823     def tearDown(self):
1824         # remove SPD policies
1825         for obj in self.spd_policies:
1826             obj.remove_vpp_config()
1827         self.spd_policies = []
1828         # remove SPD items (interface bindings first, then SPD)
1829         for obj in reversed(self.spd_objs):
1830             obj.remove_vpp_config()
1831         self.spd_objs = []
1832         # close down pg intfs
1833         for pg in self.pg_interfaces:
1834             pg.unconfig_ip4()
1835             pg.admin_down()
1836         super(IPSecIPv4Fwd, self).tearDown()
1837
1838     def create_interfaces(self, num_ifs=2):
1839         # create interfaces pg0 ... pg<num_ifs>
1840         self.create_pg_interfaces(range(num_ifs))
1841         for pg in self.pg_interfaces:
1842             # put the interface up
1843             pg.admin_up()
1844             # configure IPv4 address on the interface
1845             pg.config_ip4()
1846             # resolve ARP, so that we know VPP MAC
1847             pg.resolve_arp()
1848         self.logger.info(self.vapi.ppcli("show int addr"))
1849
1850     def spd_create_and_intf_add(self, spd_id, pg_list):
1851         spd = VppIpsecSpd(self, spd_id)
1852         spd.add_vpp_config()
1853         self.spd_objs.append(spd)
1854         for pg in pg_list:
1855             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
1856             spdItf.add_vpp_config()
1857             self.spd_objs.append(spdItf)
1858
1859     def get_policy(self, policy_type):
1860         e = VppEnum.vl_api_ipsec_spd_action_t
1861         if policy_type == "protect":
1862             return e.IPSEC_API_SPD_ACTION_PROTECT
1863         elif policy_type == "bypass":
1864             return e.IPSEC_API_SPD_ACTION_BYPASS
1865         elif policy_type == "discard":
1866             return e.IPSEC_API_SPD_ACTION_DISCARD
1867         else:
1868             raise Exception("Invalid policy type: %s", policy_type)
1869
1870     def spd_add_rem_policy(
1871         self,
1872         spd_id,
1873         src_if,
1874         dst_if,
1875         proto,
1876         is_out,
1877         priority,
1878         policy_type,
1879         remove=False,
1880         all_ips=False,
1881         ip_range=False,
1882         local_ip_start=ip_address("0.0.0.0"),
1883         local_ip_stop=ip_address("255.255.255.255"),
1884         remote_ip_start=ip_address("0.0.0.0"),
1885         remote_ip_stop=ip_address("255.255.255.255"),
1886         remote_port_start=0,
1887         remote_port_stop=65535,
1888         local_port_start=0,
1889         local_port_stop=65535,
1890     ):
1891         spd = VppIpsecSpd(self, spd_id)
1892
1893         if all_ips:
1894             src_range_low = ip_address("0.0.0.0")
1895             src_range_high = ip_address("255.255.255.255")
1896             dst_range_low = ip_address("0.0.0.0")
1897             dst_range_high = ip_address("255.255.255.255")
1898
1899         elif ip_range:
1900             src_range_low = local_ip_start
1901             src_range_high = local_ip_stop
1902             dst_range_low = remote_ip_start
1903             dst_range_high = remote_ip_stop
1904
1905         else:
1906             src_range_low = src_if.remote_ip4
1907             src_range_high = src_if.remote_ip4
1908             dst_range_low = dst_if.remote_ip4
1909             dst_range_high = dst_if.remote_ip4
1910
1911         spdEntry = VppIpsecSpdEntry(
1912             self,
1913             spd,
1914             0,
1915             src_range_low,
1916             src_range_high,
1917             dst_range_low,
1918             dst_range_high,
1919             proto,
1920             priority=priority,
1921             policy=self.get_policy(policy_type),
1922             is_outbound=is_out,
1923             remote_port_start=remote_port_start,
1924             remote_port_stop=remote_port_stop,
1925             local_port_start=local_port_start,
1926             local_port_stop=local_port_stop,
1927         )
1928
1929         if remove is False:
1930             spdEntry.add_vpp_config()
1931             self.spd_policies.append(spdEntry)
1932         else:
1933             spdEntry.remove_vpp_config()
1934             self.spd_policies.remove(spdEntry)
1935         self.logger.info(self.vapi.ppcli("show ipsec all"))
1936         return spdEntry
1937
1938     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
1939         packets = []
1940         for i in range(pkt_count):
1941             # create packet info stored in the test case instance
1942             info = self.create_packet_info(src_if, dst_if)
1943             # convert the info into packet payload
1944             payload = self.info_to_payload(info)
1945             # create the packet itself
1946             p = (
1947                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1948                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
1949                 / UDP(sport=src_prt, dport=dst_prt)
1950                 / Raw(payload)
1951             )
1952             # store a copy of the packet in the packet info
1953             info.data = p.copy()
1954             # append the packet to the list
1955             packets.append(p)
1956         # return the created packet list
1957         return packets
1958
1959     def verify_capture(self, src_if, dst_if, capture):
1960         packet_info = None
1961         for packet in capture:
1962             try:
1963                 ip = packet[IP]
1964                 udp = packet[UDP]
1965                 # convert the payload to packet info object
1966                 payload_info = self.payload_to_info(packet)
1967                 # make sure the indexes match
1968                 self.assert_equal(
1969                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
1970                 )
1971                 self.assert_equal(
1972                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
1973                 )
1974                 packet_info = self.get_next_packet_info_for_interface2(
1975                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
1976                 )
1977                 # make sure we didn't run out of saved packets
1978                 self.assertIsNotNone(packet_info)
1979                 self.assert_equal(
1980                     payload_info.index, packet_info.index, "packet info index"
1981                 )
1982                 saved_packet = packet_info.data  # fetch the saved packet
1983                 # assert the values match
1984                 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
1985                 # ... more assertions here
1986                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
1987             except Exception as e:
1988                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1989                 raise
1990         remaining_packet = self.get_next_packet_info_for_interface2(
1991             src_if.sw_if_index, dst_if.sw_if_index, packet_info
1992         )
1993         self.assertIsNone(
1994             remaining_packet,
1995             "Interface %s: Packet expected from interface "
1996             "%s didn't arrive" % (dst_if.name, src_if.name),
1997         )
1998
1999     def verify_policy_match(self, pkt_count, spdEntry):
2000         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2001         matched_pkts = spdEntry.get_stats().get("packets")
2002         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2003         self.assert_equal(pkt_count, matched_pkts)
2004
2005
2006 class SpdFlowCacheTemplate(IPSecIPv4Fwd):
2007     @classmethod
2008     def setUpConstants(cls):
2009         super(SpdFlowCacheTemplate, cls).setUpConstants()
2010         # Override this method with required cmdline parameters e.g.
2011         # cls.vpp_cmdline.extend(["ipsec", "{",
2012         #                         "ipv4-outbound-spd-flow-cache on",
2013         #                         "}"])
2014         # cls.logger.info("VPP modified cmdline is %s" % " "
2015         #                 .join(cls.vpp_cmdline))
2016
2017     def setUp(self):
2018         super(SpdFlowCacheTemplate, self).setUp()
2019
2020     def tearDown(self):
2021         super(SpdFlowCacheTemplate, self).tearDown()
2022
2023     def get_spd_flow_cache_entries(self, outbound):
2024         """'show ipsec spd' output:
2025         ipv4-inbound-spd-flow-cache-entries: 0
2026         ipv4-outbound-spd-flow-cache-entries: 0
2027         """
2028         show_ipsec_reply = self.vapi.cli("show ipsec spd")
2029         # match the relevant section of 'show ipsec spd' output
2030         if outbound:
2031             regex_match = re.search(
2032                 "ipv4-outbound-spd-flow-cache-entries: (.*)",
2033                 show_ipsec_reply,
2034                 re.DOTALL,
2035             )
2036         else:
2037             regex_match = re.search(
2038                 "ipv4-inbound-spd-flow-cache-entries: (.*)", show_ipsec_reply, re.DOTALL
2039             )
2040         if regex_match is None:
2041             raise Exception(
2042                 "Unable to find spd flow cache entries \
2043                 in 'show ipsec spd' CLI output - regex failed to match"
2044             )
2045         else:
2046             try:
2047                 num_entries = int(regex_match.group(1))
2048             except ValueError:
2049                 raise Exception(
2050                     "Unable to get spd flow cache entries \
2051                 from 'show ipsec spd' string: %s",
2052                     regex_match.group(0),
2053                 )
2054             self.logger.info("%s", regex_match.group(0))
2055         return num_entries
2056
2057     def verify_num_outbound_flow_cache_entries(self, expected_elements):
2058         self.assertEqual(
2059             self.get_spd_flow_cache_entries(outbound=True), expected_elements
2060         )
2061
2062     def verify_num_inbound_flow_cache_entries(self, expected_elements):
2063         self.assertEqual(
2064             self.get_spd_flow_cache_entries(outbound=False), expected_elements
2065         )
2066
2067     def crc32_supported(self):
2068         # lscpu is part of util-linux package, available on all Linux Distros
2069         stream = os.popen("lscpu")
2070         cpu_info = stream.read()
2071         # feature/flag "crc32" on Aarch64 and "sse4_2" on x86
2072         # see vppinfra/crc32.h
2073         if "crc32" or "sse4_2" in cpu_info:
2074             self.logger.info("\ncrc32 supported:\n" + cpu_info)
2075             return True
2076         else:
2077             self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
2078             return False
2079
2080
2081 class IPSecIPv6Fwd(VppTestCase):
2082     """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
2083
2084     @classmethod
2085     def setUpConstants(cls):
2086         super(IPSecIPv6Fwd, cls).setUpConstants()
2087
2088     def setUp(self):
2089         super(IPSecIPv6Fwd, self).setUp()
2090         # store SPD objects so we can remove configs on tear down
2091         self.spd_objs = []
2092         self.spd_policies = []
2093
2094     def tearDown(self):
2095         # remove SPD policies
2096         for obj in self.spd_policies:
2097             obj.remove_vpp_config()
2098         self.spd_policies = []
2099         # remove SPD items (interface bindings first, then SPD)
2100         for obj in reversed(self.spd_objs):
2101             obj.remove_vpp_config()
2102         self.spd_objs = []
2103         # close down pg intfs
2104         for pg in self.pg_interfaces:
2105             pg.unconfig_ip6()
2106             pg.admin_down()
2107         super(IPSecIPv6Fwd, self).tearDown()
2108
2109     def create_interfaces(self, num_ifs=2):
2110         # create interfaces pg0 ... pg<num_ifs>
2111         self.create_pg_interfaces(range(num_ifs))
2112         for pg in self.pg_interfaces:
2113             # put the interface up
2114             pg.admin_up()
2115             # configure IPv6 address on the interface
2116             pg.config_ip6()
2117             pg.resolve_ndp()
2118         self.logger.info(self.vapi.ppcli("show int addr"))
2119
2120     def spd_create_and_intf_add(self, spd_id, pg_list):
2121         spd = VppIpsecSpd(self, spd_id)
2122         spd.add_vpp_config()
2123         self.spd_objs.append(spd)
2124         for pg in pg_list:
2125             spdItf = VppIpsecSpdItfBinding(self, spd, pg)
2126             spdItf.add_vpp_config()
2127             self.spd_objs.append(spdItf)
2128
2129     def get_policy(self, policy_type):
2130         e = VppEnum.vl_api_ipsec_spd_action_t
2131         if policy_type == "protect":
2132             return e.IPSEC_API_SPD_ACTION_PROTECT
2133         elif policy_type == "bypass":
2134             return e.IPSEC_API_SPD_ACTION_BYPASS
2135         elif policy_type == "discard":
2136             return e.IPSEC_API_SPD_ACTION_DISCARD
2137         else:
2138             raise Exception("Invalid policy type: %s", policy_type)
2139
2140     def spd_add_rem_policy(
2141         self,
2142         spd_id,
2143         src_if,
2144         dst_if,
2145         proto,
2146         is_out,
2147         priority,
2148         policy_type,
2149         remove=False,
2150         all_ips=False,
2151         ip_range=False,
2152         local_ip_start=ip_address("0::0"),
2153         local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2154         remote_ip_start=ip_address("0::0"),
2155         remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
2156         remote_port_start=0,
2157         remote_port_stop=65535,
2158         local_port_start=0,
2159         local_port_stop=65535,
2160     ):
2161         spd = VppIpsecSpd(self, spd_id)
2162
2163         if all_ips:
2164             src_range_low = ip_address("0::0")
2165             src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2166             dst_range_low = ip_address("0::0")
2167             dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
2168
2169         elif ip_range:
2170             src_range_low = local_ip_start
2171             src_range_high = local_ip_stop
2172             dst_range_low = remote_ip_start
2173             dst_range_high = remote_ip_stop
2174
2175         else:
2176             src_range_low = src_if.remote_ip6
2177             src_range_high = src_if.remote_ip6
2178             dst_range_low = dst_if.remote_ip6
2179             dst_range_high = dst_if.remote_ip6
2180
2181         spdEntry = VppIpsecSpdEntry(
2182             self,
2183             spd,
2184             0,
2185             src_range_low,
2186             src_range_high,
2187             dst_range_low,
2188             dst_range_high,
2189             proto,
2190             priority=priority,
2191             policy=self.get_policy(policy_type),
2192             is_outbound=is_out,
2193             remote_port_start=remote_port_start,
2194             remote_port_stop=remote_port_stop,
2195             local_port_start=local_port_start,
2196             local_port_stop=local_port_stop,
2197         )
2198
2199         if remove is False:
2200             spdEntry.add_vpp_config()
2201             self.spd_policies.append(spdEntry)
2202         else:
2203             spdEntry.remove_vpp_config()
2204             self.spd_policies.remove(spdEntry)
2205         self.logger.info(self.vapi.ppcli("show ipsec all"))
2206         return spdEntry
2207
2208     def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
2209         packets = []
2210         for i in range(pkt_count):
2211             # create packet info stored in the test case instance
2212             info = self.create_packet_info(src_if, dst_if)
2213             # convert the info into packet payload
2214             payload = self.info_to_payload(info)
2215             # create the packet itself
2216             p = (
2217                 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2218                 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
2219                 / UDP(sport=src_prt, dport=dst_prt)
2220                 / Raw(payload)
2221             )
2222             # store a copy of the packet in the packet info
2223             info.data = p.copy()
2224             # append the packet to the list
2225             packets.append(p)
2226         # return the created packet list
2227         return packets
2228
2229     def verify_capture(self, src_if, dst_if, capture):
2230         packet_info = None
2231         for packet in capture:
2232             try:
2233                 ip = packet[IPv6]
2234                 udp = packet[UDP]
2235                 # convert the payload to packet info object
2236                 payload_info = self.payload_to_info(packet)
2237                 # make sure the indexes match
2238                 self.assert_equal(
2239                     payload_info.src, src_if.sw_if_index, "source sw_if_index"
2240                 )
2241                 self.assert_equal(
2242                     payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
2243                 )
2244                 packet_info = self.get_next_packet_info_for_interface2(
2245                     src_if.sw_if_index, dst_if.sw_if_index, packet_info
2246                 )
2247                 # make sure we didn't run out of saved packets
2248                 self.assertIsNotNone(packet_info)
2249                 self.assert_equal(
2250                     payload_info.index, packet_info.index, "packet info index"
2251                 )
2252                 saved_packet = packet_info.data  # fetch the saved packet
2253                 # assert the values match
2254                 self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
2255                 # ... more assertions here
2256                 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
2257             except Exception as e:
2258                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2259                 raise
2260         remaining_packet = self.get_next_packet_info_for_interface2(
2261             src_if.sw_if_index, dst_if.sw_if_index, packet_info
2262         )
2263         self.assertIsNone(
2264             remaining_packet,
2265             "Interface %s: Packet expected from interface "
2266             "%s didn't arrive" % (dst_if.name, src_if.name),
2267         )
2268
2269     def verify_policy_match(self, pkt_count, spdEntry):
2270         self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
2271         matched_pkts = spdEntry.get_stats().get("packets")
2272         self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
2273         self.assert_equal(pkt_count, matched_pkts)
2274
2275
2276 if __name__ == "__main__":
2277     unittest.main(testRunner=VppTestRunner)