5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
24 from vpp_papi_provider import CliFailedCommandError
25 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
29 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
30 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
31 IPSEC_API_SAD_FLAG_USE_ESN))
32 crypt_key = mk_scapy_crypt_key(p)
34 p.tun_dst = tun_if.remote_ip
35 p.tun_src = tun_if.local_ip
40 p.scapy_tun_sa = SecurityAssociation(
41 encryption_type, spi=p.vpp_tun_spi,
42 crypt_algo=p.crypt_algo,
44 auth_algo=p.auth_algo, auth_key=p.auth_key,
45 tunnel_header=ip_class_by_addr_type[p.addr_type](
48 nat_t_header=p.nat_header,
50 p.vpp_tun_sa = SecurityAssociation(
51 encryption_type, spi=p.scapy_tun_spi,
52 crypt_algo=p.crypt_algo,
54 auth_algo=p.auth_algo, auth_key=p.auth_key,
55 tunnel_header=ip_class_by_addr_type[p.addr_type](
58 nat_t_header=p.nat_header,
62 def config_tra_params(p, encryption_type, tun_if):
63 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
64 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
65 IPSEC_API_SAD_FLAG_USE_ESN))
66 crypt_key = mk_scapy_crypt_key(p)
67 p.tun_dst = tun_if.remote_ip
68 p.tun_src = tun_if.local_ip
69 p.scapy_tun_sa = SecurityAssociation(
70 encryption_type, spi=p.vpp_tun_spi,
71 crypt_algo=p.crypt_algo,
73 auth_algo=p.auth_algo, auth_key=p.auth_key,
75 nat_t_header=p.nat_header)
76 p.vpp_tun_sa = SecurityAssociation(
77 encryption_type, spi=p.scapy_tun_spi,
78 crypt_algo=p.crypt_algo,
80 auth_algo=p.auth_algo, auth_key=p.auth_key,
82 nat_t_header=p.nat_header)
85 class TemplateIpsec4TunIfEsp(TemplateIpsec):
86 """ IPsec tunnel interface tests """
92 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
95 def tearDownClass(cls):
96 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
99 super(TemplateIpsec4TunIfEsp, self).setUp()
101 self.tun_if = self.pg0
105 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
106 p.scapy_tun_spi, p.crypt_algo_vpp_id,
107 p.crypt_key, p.crypt_key,
108 p.auth_algo_vpp_id, p.auth_key,
110 p.tun_if.add_vpp_config()
112 p.tun_if.config_ip4()
113 p.tun_if.config_ip6()
114 config_tun_params(p, self.encryption_type, p.tun_if)
116 r = VppIpRoute(self, p.remote_tun_if_host, 32,
117 [VppRoutePath(p.tun_if.remote_ip4,
120 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
121 [VppRoutePath(p.tun_if.remote_ip6,
123 proto=DpoProto.DPO_PROTO_IP6)])
127 super(TemplateIpsec4TunIfEsp, self).tearDown()
130 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
131 """ IPsec UDP tunnel interface tests """
133 tun4_encrypt_node_name = "esp4-encrypt-tun"
134 tun4_decrypt_node_name = "esp4-decrypt-tun"
135 encryption_type = ESP
139 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
142 def tearDownClass(cls):
143 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
145 def verify_encrypted(self, p, sa, rxs):
148 # ensure the UDP ports are correct before we decrypt
150 self.assertTrue(rx.haslayer(UDP))
151 self.assert_equal(rx[UDP].sport, 4500)
152 self.assert_equal(rx[UDP].dport, 4500)
154 pkt = sa.decrypt(rx[IP])
155 if not pkt.haslayer(IP):
156 pkt = IP(pkt[Raw].load)
158 self.assert_packet_checksums_valid(pkt)
159 self.assert_equal(pkt[IP].dst, "1.1.1.1")
160 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
161 except (IndexError, AssertionError):
162 self.logger.debug(ppp("Unexpected packet:", rx))
164 self.logger.debug(ppp("Decrypted packet:", pkt))
170 super(TemplateIpsec4TunIfEspUdp, self).setUp()
173 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
174 IPSEC_API_SAD_FLAG_UDP_ENCAP)
175 p.nat_header = UDP(sport=5454, dport=4500)
177 def config_network(self):
179 self.tun_if = self.pg0
181 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
182 p.scapy_tun_spi, p.crypt_algo_vpp_id,
183 p.crypt_key, p.crypt_key,
184 p.auth_algo_vpp_id, p.auth_key,
185 p.auth_key, udp_encap=True)
186 p.tun_if.add_vpp_config()
188 p.tun_if.config_ip4()
189 p.tun_if.config_ip6()
190 config_tun_params(p, self.encryption_type, p.tun_if)
192 r = VppIpRoute(self, p.remote_tun_if_host, 32,
193 [VppRoutePath(p.tun_if.remote_ip4,
196 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
197 [VppRoutePath(p.tun_if.remote_ip6,
199 proto=DpoProto.DPO_PROTO_IP6)])
203 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
206 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
207 """ Ipsec ESP - TUN tests """
208 tun4_encrypt_node_name = "esp4-encrypt-tun"
209 tun4_decrypt_node_name = "esp4-decrypt-tun"
211 def test_tun_basic64(self):
212 """ ipsec 6o4 tunnel basic test """
213 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
215 self.verify_tun_64(self.params[socket.AF_INET], count=1)
217 def test_tun_burst64(self):
218 """ ipsec 6o4 tunnel basic test """
219 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
221 self.verify_tun_64(self.params[socket.AF_INET], count=257)
223 def test_tun_basic_frag44(self):
224 """ ipsec 4o4 tunnel frag basic test """
225 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
229 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
231 self.verify_tun_44(self.params[socket.AF_INET],
232 count=1, payload_size=1800, n_rx=2)
233 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
237 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
238 """ Ipsec ESP UDP tests """
240 tun4_input_node = "ipsec4-tun-input"
243 super(TemplateIpsec4TunIfEspUdp, self).setUp()
244 self.config_network()
246 def test_keepalive(self):
247 """ IPSEC NAT Keepalive """
248 self.verify_keepalive(self.ipv4_params)
251 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
252 """ Ipsec ESP UDP GCM tests """
254 tun4_input_node = "ipsec4-tun-input"
257 super(TemplateIpsec4TunIfEspUdp, self).setUp()
259 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
260 IPSEC_API_INTEG_ALG_NONE)
261 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
262 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
263 p.crypt_algo = "AES-GCM"
265 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
267 self.config_network()
270 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
271 """ Ipsec ESP - TCP tests """
275 class TemplateIpsec6TunIfEsp(TemplateIpsec):
276 """ IPsec tunnel interface tests """
278 encryption_type = ESP
281 super(TemplateIpsec6TunIfEsp, self).setUp()
283 self.tun_if = self.pg0
286 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
287 p.scapy_tun_spi, p.crypt_algo_vpp_id,
288 p.crypt_key, p.crypt_key,
289 p.auth_algo_vpp_id, p.auth_key,
290 p.auth_key, is_ip6=True)
291 p.tun_if.add_vpp_config()
293 p.tun_if.config_ip6()
294 p.tun_if.config_ip4()
295 config_tun_params(p, self.encryption_type, p.tun_if)
297 r = VppIpRoute(self, p.remote_tun_if_host, 128,
298 [VppRoutePath(p.tun_if.remote_ip6,
300 proto=DpoProto.DPO_PROTO_IP6)])
302 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
303 [VppRoutePath(p.tun_if.remote_ip4,
308 super(TemplateIpsec6TunIfEsp, self).tearDown()
311 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
313 """ Ipsec ESP - TUN tests """
314 tun6_encrypt_node_name = "esp6-encrypt-tun"
315 tun6_decrypt_node_name = "esp6-decrypt-tun"
317 def test_tun_basic46(self):
318 """ ipsec 4o6 tunnel basic test """
319 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
320 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
322 def test_tun_burst46(self):
323 """ ipsec 4o6 tunnel burst test """
324 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
325 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
328 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
329 IpsecTun6HandoffTests):
330 """ Ipsec ESP 6 Handoff tests """
331 tun6_encrypt_node_name = "esp6-encrypt-tun"
332 tun6_decrypt_node_name = "esp6-decrypt-tun"
335 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
336 IpsecTun4HandoffTests):
337 """ Ipsec ESP 4 Handoff tests """
338 tun4_encrypt_node_name = "esp4-encrypt-tun"
339 tun4_decrypt_node_name = "esp4-decrypt-tun"
342 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
343 """ IPsec IPv4 Multi Tunnel interface """
345 encryption_type = ESP
346 tun4_encrypt_node_name = "esp4-encrypt-tun"
347 tun4_decrypt_node_name = "esp4-decrypt-tun"
350 super(TestIpsec4MultiTunIfEsp, self).setUp()
352 self.tun_if = self.pg0
354 self.multi_params = []
355 self.pg0.generate_remote_hosts(10)
356 self.pg0.configure_ipv4_neighbors()
359 p = copy.copy(self.ipv4_params)
361 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
362 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
363 p.scapy_tun_spi = p.scapy_tun_spi + ii
364 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
365 p.vpp_tun_spi = p.vpp_tun_spi + ii
367 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
368 p.scapy_tra_spi = p.scapy_tra_spi + ii
369 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
370 p.vpp_tra_spi = p.vpp_tra_spi + ii
371 p.tun_dst = self.pg0.remote_hosts[ii].ip4
373 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
376 p.crypt_key, p.crypt_key,
377 p.auth_algo_vpp_id, p.auth_key,
380 p.tun_if.add_vpp_config()
382 p.tun_if.config_ip4()
383 config_tun_params(p, self.encryption_type, p.tun_if)
384 self.multi_params.append(p)
386 VppIpRoute(self, p.remote_tun_if_host, 32,
387 [VppRoutePath(p.tun_if.remote_ip4,
388 0xffffffff)]).add_vpp_config()
391 super(TestIpsec4MultiTunIfEsp, self).tearDown()
393 def test_tun_44(self):
394 """Multiple IPSEC tunnel interfaces """
395 for p in self.multi_params:
396 self.verify_tun_44(p, count=127)
397 c = p.tun_if.get_rx_stats()
398 self.assertEqual(c['packets'], 127)
399 c = p.tun_if.get_tx_stats()
400 self.assertEqual(c['packets'], 127)
402 def test_tun_rr_44(self):
403 """ Round-robin packets acrros multiple interface """
405 for p in self.multi_params:
406 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
407 src=p.remote_tun_if_host,
408 dst=self.pg1.remote_ip4)
409 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
411 for rx, p in zip(rxs, self.multi_params):
412 self.verify_decrypted(p, [rx])
415 for p in self.multi_params:
416 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
417 dst=p.remote_tun_if_host)
418 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
420 for rx, p in zip(rxs, self.multi_params):
421 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
424 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
425 """ IPsec IPv4 Tunnel interface all Algos """
427 encryption_type = ESP
428 tun4_encrypt_node_name = "esp4-encrypt-tun"
429 tun4_decrypt_node_name = "esp4-decrypt-tun"
431 def config_network(self, p):
433 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
436 p.crypt_key, p.crypt_key,
437 p.auth_algo_vpp_id, p.auth_key,
440 p.tun_if.add_vpp_config()
442 p.tun_if.config_ip4()
443 config_tun_params(p, self.encryption_type, p.tun_if)
444 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
445 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
447 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
448 [VppRoutePath(p.tun_if.remote_ip4,
450 p.route.add_vpp_config()
452 def unconfig_network(self, p):
453 p.tun_if.unconfig_ip4()
454 p.tun_if.remove_vpp_config()
455 p.route.remove_vpp_config()
458 super(TestIpsec4TunIfEspAll, self).setUp()
460 self.tun_if = self.pg0
463 super(TestIpsec4TunIfEspAll, self).tearDown()
467 # change the key and the SPI
469 p.crypt_key = b'X' + p.crypt_key[1:]
471 p.scapy_tun_sa_id += 1
474 p.tun_if.local_spi = p.vpp_tun_spi
475 p.tun_if.remote_spi = p.scapy_tun_spi
477 config_tun_params(p, self.encryption_type, p.tun_if)
479 p.tun_sa_in = VppIpsecSA(self,
486 self.vpp_esp_protocol,
489 p.tun_sa_out = VppIpsecSA(self,
496 self.vpp_esp_protocol,
499 p.tun_sa_in.add_vpp_config()
500 p.tun_sa_out.add_vpp_config()
502 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
503 sa_id=p.tun_sa_in.id,
505 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
506 sa_id=p.tun_sa_out.id,
508 self.logger.info(self.vapi.cli("sh ipsec sa"))
510 def test_tun_44(self):
511 """IPSEC tunnel all algos """
513 # foreach VPP crypto engine
514 engines = ["ia32", "ipsecmb", "openssl"]
516 # foreach crypto algorithm
517 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
518 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
519 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
520 IPSEC_API_INTEG_ALG_NONE),
521 'scapy-crypto': "AES-GCM",
522 'scapy-integ': "NULL",
523 'key': b"JPjyOWBeVEQiMe7h",
525 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
526 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
527 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
528 IPSEC_API_INTEG_ALG_NONE),
529 'scapy-crypto': "AES-GCM",
530 'scapy-integ': "NULL",
531 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
533 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
534 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
535 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
536 IPSEC_API_INTEG_ALG_NONE),
537 'scapy-crypto': "AES-GCM",
538 'scapy-integ': "NULL",
539 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
541 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
542 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
543 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
544 IPSEC_API_INTEG_ALG_SHA1_96),
545 'scapy-crypto': "AES-CBC",
546 'scapy-integ': "HMAC-SHA1-96",
548 'key': b"JPjyOWBeVEQiMe7h"},
549 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
550 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
551 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
552 IPSEC_API_INTEG_ALG_SHA_512_256),
553 'scapy-crypto': "AES-CBC",
554 'scapy-integ': "SHA2-512-256",
556 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
557 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
558 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
559 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
560 IPSEC_API_INTEG_ALG_SHA_256_128),
561 'scapy-crypto': "AES-CBC",
562 'scapy-integ': "SHA2-256-128",
564 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
565 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
566 IPSEC_API_CRYPTO_ALG_NONE),
567 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
568 IPSEC_API_INTEG_ALG_SHA1_96),
569 'scapy-crypto': "NULL",
570 'scapy-integ': "HMAC-SHA1-96",
572 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
574 for engine in engines:
575 self.vapi.cli("set crypto handler all %s" % engine)
578 # loop through each of the algorithms
581 # with self.subTest(algo=algo['scapy']):
583 p = copy.copy(self.ipv4_params)
584 p.auth_algo_vpp_id = algo['vpp-integ']
585 p.crypt_algo_vpp_id = algo['vpp-crypto']
586 p.crypt_algo = algo['scapy-crypto']
587 p.auth_algo = algo['scapy-integ']
588 p.crypt_key = algo['key']
589 p.salt = algo['salt']
591 self.config_network(p)
593 self.verify_tun_44(p, count=127)
594 c = p.tun_if.get_rx_stats()
595 self.assertEqual(c['packets'], 127)
596 c = p.tun_if.get_tx_stats()
597 self.assertEqual(c['packets'], 127)
603 self.verify_tun_44(p, count=127)
605 self.unconfig_network(p)
606 p.tun_sa_out.remove_vpp_config()
607 p.tun_sa_in.remove_vpp_config()
610 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
611 """ IPsec IPv4 Tunnel interface no Algos """
613 encryption_type = ESP
614 tun4_encrypt_node_name = "esp4-encrypt-tun"
615 tun4_decrypt_node_name = "esp4-decrypt-tun"
617 def config_network(self, p):
619 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
620 IPSEC_API_INTEG_ALG_NONE)
624 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
625 IPSEC_API_CRYPTO_ALG_NONE)
626 p.crypt_algo = 'NULL'
629 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
632 p.crypt_key, p.crypt_key,
633 p.auth_algo_vpp_id, p.auth_key,
636 p.tun_if.add_vpp_config()
638 p.tun_if.config_ip4()
639 config_tun_params(p, self.encryption_type, p.tun_if)
640 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
641 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
643 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
644 [VppRoutePath(p.tun_if.remote_ip4,
646 p.route.add_vpp_config()
648 def unconfig_network(self, p):
649 p.tun_if.unconfig_ip4()
650 p.tun_if.remove_vpp_config()
651 p.route.remove_vpp_config()
654 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
656 self.tun_if = self.pg0
659 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
661 def test_tun_44(self):
662 """ IPSec SA with NULL algos """
665 self.config_network(p)
667 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
668 dst=p.remote_tun_if_host)
669 self.send_and_assert_no_replies(self.pg1, tx)
671 self.unconfig_network(p)
674 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
675 """ IPsec IPv6 Multi Tunnel interface """
677 encryption_type = ESP
678 tun6_encrypt_node_name = "esp6-encrypt-tun"
679 tun6_decrypt_node_name = "esp6-decrypt-tun"
682 super(TestIpsec6MultiTunIfEsp, self).setUp()
684 self.tun_if = self.pg0
686 self.multi_params = []
687 self.pg0.generate_remote_hosts(10)
688 self.pg0.configure_ipv6_neighbors()
691 p = copy.copy(self.ipv6_params)
693 p.remote_tun_if_host = "1111::%d" % (ii + 1)
694 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
695 p.scapy_tun_spi = p.scapy_tun_spi + ii
696 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
697 p.vpp_tun_spi = p.vpp_tun_spi + ii
699 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
700 p.scapy_tra_spi = p.scapy_tra_spi + ii
701 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
702 p.vpp_tra_spi = p.vpp_tra_spi + ii
704 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
707 p.crypt_key, p.crypt_key,
708 p.auth_algo_vpp_id, p.auth_key,
709 p.auth_key, is_ip6=True,
710 dst=self.pg0.remote_hosts[ii].ip6)
711 p.tun_if.add_vpp_config()
713 p.tun_if.config_ip6()
714 config_tun_params(p, self.encryption_type, p.tun_if)
715 self.multi_params.append(p)
717 r = VppIpRoute(self, p.remote_tun_if_host, 128,
718 [VppRoutePath(p.tun_if.remote_ip6,
720 proto=DpoProto.DPO_PROTO_IP6)])
724 super(TestIpsec6MultiTunIfEsp, self).tearDown()
726 def test_tun_66(self):
727 """Multiple IPSEC tunnel interfaces """
728 for p in self.multi_params:
729 self.verify_tun_66(p, count=127)
730 c = p.tun_if.get_rx_stats()
731 self.assertEqual(c['packets'], 127)
732 c = p.tun_if.get_tx_stats()
733 self.assertEqual(c['packets'], 127)
736 class TestIpsecGreTebIfEsp(TemplateIpsec,
738 """ Ipsec GRE TEB ESP - TUN tests """
739 tun4_encrypt_node_name = "esp4-encrypt-tun"
740 tun4_decrypt_node_name = "esp4-decrypt-tun"
741 encryption_type = ESP
742 omac = "00:11:22:33:44:55"
744 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
746 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
747 sa.encrypt(IP(src=self.pg0.remote_ip4,
748 dst=self.pg0.local_ip4) /
750 Ether(dst=self.omac) /
751 IP(src="1.1.1.1", dst="1.1.1.2") /
752 UDP(sport=1144, dport=2233) /
753 Raw(b'X' * payload_size))
754 for i in range(count)]
756 def gen_pkts(self, sw_intf, src, dst, count=1,
758 return [Ether(dst=self.omac) /
759 IP(src="1.1.1.1", dst="1.1.1.2") /
760 UDP(sport=1144, dport=2233) /
761 Raw(b'X' * payload_size)
762 for i in range(count)]
764 def verify_decrypted(self, p, rxs):
766 self.assert_equal(rx[Ether].dst, self.omac)
767 self.assert_equal(rx[IP].dst, "1.1.1.2")
769 def verify_encrypted(self, p, sa, rxs):
772 pkt = sa.decrypt(rx[IP])
773 if not pkt.haslayer(IP):
774 pkt = IP(pkt[Raw].load)
775 self.assert_packet_checksums_valid(pkt)
776 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
777 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
778 self.assertTrue(pkt.haslayer(GRE))
780 self.assertEqual(e[Ether].dst, self.omac)
781 self.assertEqual(e[IP].dst, "1.1.1.2")
782 except (IndexError, AssertionError):
783 self.logger.debug(ppp("Unexpected packet:", rx))
785 self.logger.debug(ppp("Decrypted packet:", pkt))
791 super(TestIpsecGreTebIfEsp, self).setUp()
793 self.tun_if = self.pg0
797 bd1 = VppBridgeDomain(self, 1)
800 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
801 p.auth_algo_vpp_id, p.auth_key,
802 p.crypt_algo_vpp_id, p.crypt_key,
803 self.vpp_esp_protocol,
806 p.tun_sa_out.add_vpp_config()
808 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
809 p.auth_algo_vpp_id, p.auth_key,
810 p.crypt_algo_vpp_id, p.crypt_key,
811 self.vpp_esp_protocol,
814 p.tun_sa_in.add_vpp_config()
816 p.tun_if = VppGreInterface(self,
819 type=(VppEnum.vl_api_gre_tunnel_type_t.
820 GRE_API_TUNNEL_TYPE_TEB))
821 p.tun_if.add_vpp_config()
823 p.tun_protect = VppIpsecTunProtect(self,
828 p.tun_protect.add_vpp_config()
831 p.tun_if.config_ip4()
832 config_tun_params(p, self.encryption_type, p.tun_if)
834 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
835 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
837 self.vapi.cli("clear ipsec sa")
838 self.vapi.cli("sh adj")
839 self.vapi.cli("sh ipsec tun")
843 p.tun_if.unconfig_ip4()
844 super(TestIpsecGreTebIfEsp, self).tearDown()
847 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
849 """ Ipsec GRE TEB ESP - TUN tests """
850 tun4_encrypt_node_name = "esp4-encrypt-tun"
851 tun4_decrypt_node_name = "esp4-decrypt-tun"
852 encryption_type = ESP
853 omac = "00:11:22:33:44:55"
855 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
857 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
858 sa.encrypt(IP(src=self.pg0.remote_ip4,
859 dst=self.pg0.local_ip4) /
861 Ether(dst=self.omac) /
862 IP(src="1.1.1.1", dst="1.1.1.2") /
863 UDP(sport=1144, dport=2233) /
864 Raw(b'X' * payload_size))
865 for i in range(count)]
867 def gen_pkts(self, sw_intf, src, dst, count=1,
869 return [Ether(dst=self.omac) /
871 IP(src="1.1.1.1", dst="1.1.1.2") /
872 UDP(sport=1144, dport=2233) /
873 Raw(b'X' * payload_size)
874 for i in range(count)]
876 def verify_decrypted(self, p, rxs):
878 self.assert_equal(rx[Ether].dst, self.omac)
879 self.assert_equal(rx[Dot1Q].vlan, 11)
880 self.assert_equal(rx[IP].dst, "1.1.1.2")
882 def verify_encrypted(self, p, sa, rxs):
885 pkt = sa.decrypt(rx[IP])
886 if not pkt.haslayer(IP):
887 pkt = IP(pkt[Raw].load)
888 self.assert_packet_checksums_valid(pkt)
889 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
890 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
891 self.assertTrue(pkt.haslayer(GRE))
893 self.assertEqual(e[Ether].dst, self.omac)
894 self.assertFalse(e.haslayer(Dot1Q))
895 self.assertEqual(e[IP].dst, "1.1.1.2")
896 except (IndexError, AssertionError):
897 self.logger.debug(ppp("Unexpected packet:", rx))
899 self.logger.debug(ppp("Decrypted packet:", pkt))
905 super(TestIpsecGreTebVlanIfEsp, self).setUp()
907 self.tun_if = self.pg0
911 bd1 = VppBridgeDomain(self, 1)
914 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
915 self.vapi.l2_interface_vlan_tag_rewrite(
916 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
918 self.pg1_11.admin_up()
920 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
921 p.auth_algo_vpp_id, p.auth_key,
922 p.crypt_algo_vpp_id, p.crypt_key,
923 self.vpp_esp_protocol,
926 p.tun_sa_out.add_vpp_config()
928 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
929 p.auth_algo_vpp_id, p.auth_key,
930 p.crypt_algo_vpp_id, p.crypt_key,
931 self.vpp_esp_protocol,
934 p.tun_sa_in.add_vpp_config()
936 p.tun_if = VppGreInterface(self,
939 type=(VppEnum.vl_api_gre_tunnel_type_t.
940 GRE_API_TUNNEL_TYPE_TEB))
941 p.tun_if.add_vpp_config()
943 p.tun_protect = VppIpsecTunProtect(self,
948 p.tun_protect.add_vpp_config()
951 p.tun_if.config_ip4()
952 config_tun_params(p, self.encryption_type, p.tun_if)
954 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
955 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
957 self.vapi.cli("clear ipsec sa")
961 p.tun_if.unconfig_ip4()
962 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
963 self.pg1_11.admin_down()
964 self.pg1_11.remove_vpp_config()
967 class TestIpsecGreTebIfEspTra(TemplateIpsec,
969 """ Ipsec GRE TEB ESP - Tra tests """
970 tun4_encrypt_node_name = "esp4-encrypt-tun"
971 tun4_decrypt_node_name = "esp4-decrypt-tun"
972 encryption_type = ESP
973 omac = "00:11:22:33:44:55"
975 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
977 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
978 sa.encrypt(IP(src=self.pg0.remote_ip4,
979 dst=self.pg0.local_ip4) /
981 Ether(dst=self.omac) /
982 IP(src="1.1.1.1", dst="1.1.1.2") /
983 UDP(sport=1144, dport=2233) /
984 Raw(b'X' * payload_size))
985 for i in range(count)]
987 def gen_pkts(self, sw_intf, src, dst, count=1,
989 return [Ether(dst=self.omac) /
990 IP(src="1.1.1.1", dst="1.1.1.2") /
991 UDP(sport=1144, dport=2233) /
992 Raw(b'X' * payload_size)
993 for i in range(count)]
995 def verify_decrypted(self, p, rxs):
997 self.assert_equal(rx[Ether].dst, self.omac)
998 self.assert_equal(rx[IP].dst, "1.1.1.2")
1000 def verify_encrypted(self, p, sa, rxs):
1003 pkt = sa.decrypt(rx[IP])
1004 if not pkt.haslayer(IP):
1005 pkt = IP(pkt[Raw].load)
1006 self.assert_packet_checksums_valid(pkt)
1007 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1008 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1009 self.assertTrue(pkt.haslayer(GRE))
1011 self.assertEqual(e[Ether].dst, self.omac)
1012 self.assertEqual(e[IP].dst, "1.1.1.2")
1013 except (IndexError, AssertionError):
1014 self.logger.debug(ppp("Unexpected packet:", rx))
1016 self.logger.debug(ppp("Decrypted packet:", pkt))
1022 super(TestIpsecGreTebIfEspTra, self).setUp()
1024 self.tun_if = self.pg0
1026 p = self.ipv4_params
1028 bd1 = VppBridgeDomain(self, 1)
1029 bd1.add_vpp_config()
1031 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1032 p.auth_algo_vpp_id, p.auth_key,
1033 p.crypt_algo_vpp_id, p.crypt_key,
1034 self.vpp_esp_protocol)
1035 p.tun_sa_out.add_vpp_config()
1037 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1038 p.auth_algo_vpp_id, p.auth_key,
1039 p.crypt_algo_vpp_id, p.crypt_key,
1040 self.vpp_esp_protocol)
1041 p.tun_sa_in.add_vpp_config()
1043 p.tun_if = VppGreInterface(self,
1045 self.pg0.remote_ip4,
1046 type=(VppEnum.vl_api_gre_tunnel_type_t.
1047 GRE_API_TUNNEL_TYPE_TEB))
1048 p.tun_if.add_vpp_config()
1050 p.tun_protect = VppIpsecTunProtect(self,
1055 p.tun_protect.add_vpp_config()
1058 p.tun_if.config_ip4()
1059 config_tra_params(p, self.encryption_type, p.tun_if)
1061 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1062 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1064 self.vapi.cli("clear ipsec sa")
1067 p = self.ipv4_params
1068 p.tun_if.unconfig_ip4()
1069 super(TestIpsecGreTebIfEspTra, self).tearDown()
1072 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1074 """ Ipsec GRE TEB UDP ESP - Tra tests """
1075 tun4_encrypt_node_name = "esp4-encrypt-tun"
1076 tun4_decrypt_node_name = "esp4-decrypt-tun"
1077 encryption_type = ESP
1078 omac = "00:11:22:33:44:55"
1080 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1082 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1083 sa.encrypt(IP(src=self.pg0.remote_ip4,
1084 dst=self.pg0.local_ip4) /
1086 Ether(dst=self.omac) /
1087 IP(src="1.1.1.1", dst="1.1.1.2") /
1088 UDP(sport=1144, dport=2233) /
1089 Raw(b'X' * payload_size))
1090 for i in range(count)]
1092 def gen_pkts(self, sw_intf, src, dst, count=1,
1094 return [Ether(dst=self.omac) /
1095 IP(src="1.1.1.1", dst="1.1.1.2") /
1096 UDP(sport=1144, dport=2233) /
1097 Raw(b'X' * payload_size)
1098 for i in range(count)]
1100 def verify_decrypted(self, p, rxs):
1102 self.assert_equal(rx[Ether].dst, self.omac)
1103 self.assert_equal(rx[IP].dst, "1.1.1.2")
1105 def verify_encrypted(self, p, sa, rxs):
1107 self.assertTrue(rx.haslayer(UDP))
1108 self.assertEqual(rx[UDP].dport, 4545)
1109 self.assertEqual(rx[UDP].sport, 5454)
1111 pkt = sa.decrypt(rx[IP])
1112 if not pkt.haslayer(IP):
1113 pkt = IP(pkt[Raw].load)
1114 self.assert_packet_checksums_valid(pkt)
1115 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1116 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1117 self.assertTrue(pkt.haslayer(GRE))
1119 self.assertEqual(e[Ether].dst, self.omac)
1120 self.assertEqual(e[IP].dst, "1.1.1.2")
1121 except (IndexError, AssertionError):
1122 self.logger.debug(ppp("Unexpected packet:", rx))
1124 self.logger.debug(ppp("Decrypted packet:", pkt))
1130 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1132 self.tun_if = self.pg0
1134 p = self.ipv4_params
1135 p = self.ipv4_params
1136 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1137 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1138 p.nat_header = UDP(sport=5454, dport=4545)
1140 bd1 = VppBridgeDomain(self, 1)
1141 bd1.add_vpp_config()
1143 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1144 p.auth_algo_vpp_id, p.auth_key,
1145 p.crypt_algo_vpp_id, p.crypt_key,
1146 self.vpp_esp_protocol,
1150 p.tun_sa_out.add_vpp_config()
1152 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1153 p.auth_algo_vpp_id, p.auth_key,
1154 p.crypt_algo_vpp_id, p.crypt_key,
1155 self.vpp_esp_protocol,
1157 VppEnum.vl_api_ipsec_sad_flags_t.
1158 IPSEC_API_SAD_FLAG_IS_INBOUND),
1161 p.tun_sa_in.add_vpp_config()
1163 p.tun_if = VppGreInterface(self,
1165 self.pg0.remote_ip4,
1166 type=(VppEnum.vl_api_gre_tunnel_type_t.
1167 GRE_API_TUNNEL_TYPE_TEB))
1168 p.tun_if.add_vpp_config()
1170 p.tun_protect = VppIpsecTunProtect(self,
1175 p.tun_protect.add_vpp_config()
1178 p.tun_if.config_ip4()
1179 config_tra_params(p, self.encryption_type, p.tun_if)
1181 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1182 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1184 self.vapi.cli("clear ipsec sa")
1185 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1188 p = self.ipv4_params
1189 p.tun_if.unconfig_ip4()
1190 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1193 class TestIpsecGreIfEsp(TemplateIpsec,
1195 """ Ipsec GRE ESP - TUN tests """
1196 tun4_encrypt_node_name = "esp4-encrypt-tun"
1197 tun4_decrypt_node_name = "esp4-decrypt-tun"
1198 encryption_type = ESP
1200 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1202 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1203 sa.encrypt(IP(src=self.pg0.remote_ip4,
1204 dst=self.pg0.local_ip4) /
1206 IP(src=self.pg1.local_ip4,
1207 dst=self.pg1.remote_ip4) /
1208 UDP(sport=1144, dport=2233) /
1209 Raw(b'X' * payload_size))
1210 for i in range(count)]
1212 def gen_pkts(self, sw_intf, src, dst, count=1,
1214 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1215 IP(src="1.1.1.1", dst="1.1.1.2") /
1216 UDP(sport=1144, dport=2233) /
1217 Raw(b'X' * payload_size)
1218 for i in range(count)]
1220 def verify_decrypted(self, p, rxs):
1222 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1223 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1225 def verify_encrypted(self, p, sa, rxs):
1228 pkt = sa.decrypt(rx[IP])
1229 if not pkt.haslayer(IP):
1230 pkt = IP(pkt[Raw].load)
1231 self.assert_packet_checksums_valid(pkt)
1232 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1233 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1234 self.assertTrue(pkt.haslayer(GRE))
1236 self.assertEqual(e[IP].dst, "1.1.1.2")
1237 except (IndexError, AssertionError):
1238 self.logger.debug(ppp("Unexpected packet:", rx))
1240 self.logger.debug(ppp("Decrypted packet:", pkt))
1246 super(TestIpsecGreIfEsp, self).setUp()
1248 self.tun_if = self.pg0
1250 p = self.ipv4_params
1252 bd1 = VppBridgeDomain(self, 1)
1253 bd1.add_vpp_config()
1255 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1256 p.auth_algo_vpp_id, p.auth_key,
1257 p.crypt_algo_vpp_id, p.crypt_key,
1258 self.vpp_esp_protocol,
1260 self.pg0.remote_ip4)
1261 p.tun_sa_out.add_vpp_config()
1263 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1264 p.auth_algo_vpp_id, p.auth_key,
1265 p.crypt_algo_vpp_id, p.crypt_key,
1266 self.vpp_esp_protocol,
1267 self.pg0.remote_ip4,
1269 p.tun_sa_in.add_vpp_config()
1271 p.tun_if = VppGreInterface(self,
1273 self.pg0.remote_ip4)
1274 p.tun_if.add_vpp_config()
1276 p.tun_protect = VppIpsecTunProtect(self,
1280 p.tun_protect.add_vpp_config()
1283 p.tun_if.config_ip4()
1284 config_tun_params(p, self.encryption_type, p.tun_if)
1286 VppIpRoute(self, "1.1.1.2", 32,
1287 [VppRoutePath(p.tun_if.remote_ip4,
1288 0xffffffff)]).add_vpp_config()
1291 p = self.ipv4_params
1292 p.tun_if.unconfig_ip4()
1293 super(TestIpsecGreIfEsp, self).tearDown()
1296 class TestIpsecGreIfEspTra(TemplateIpsec,
1298 """ Ipsec GRE ESP - TRA tests """
1299 tun4_encrypt_node_name = "esp4-encrypt-tun"
1300 tun4_decrypt_node_name = "esp4-decrypt-tun"
1301 encryption_type = ESP
1303 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1305 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1306 sa.encrypt(IP(src=self.pg0.remote_ip4,
1307 dst=self.pg0.local_ip4) /
1309 IP(src=self.pg1.local_ip4,
1310 dst=self.pg1.remote_ip4) /
1311 UDP(sport=1144, dport=2233) /
1312 Raw(b'X' * payload_size))
1313 for i in range(count)]
1315 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1317 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1318 sa.encrypt(IP(src=self.pg0.remote_ip4,
1319 dst=self.pg0.local_ip4) /
1321 UDP(sport=1144, dport=2233) /
1322 Raw(b'X' * payload_size))
1323 for i in range(count)]
1325 def gen_pkts(self, sw_intf, src, dst, count=1,
1327 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1328 IP(src="1.1.1.1", dst="1.1.1.2") /
1329 UDP(sport=1144, dport=2233) /
1330 Raw(b'X' * payload_size)
1331 for i in range(count)]
1333 def verify_decrypted(self, p, rxs):
1335 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1336 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1338 def verify_encrypted(self, p, sa, rxs):
1341 pkt = sa.decrypt(rx[IP])
1342 if not pkt.haslayer(IP):
1343 pkt = IP(pkt[Raw].load)
1344 self.assert_packet_checksums_valid(pkt)
1345 self.assertTrue(pkt.haslayer(GRE))
1347 self.assertEqual(e[IP].dst, "1.1.1.2")
1348 except (IndexError, AssertionError):
1349 self.logger.debug(ppp("Unexpected packet:", rx))
1351 self.logger.debug(ppp("Decrypted packet:", pkt))
1357 super(TestIpsecGreIfEspTra, self).setUp()
1359 self.tun_if = self.pg0
1361 p = self.ipv4_params
1363 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1364 p.auth_algo_vpp_id, p.auth_key,
1365 p.crypt_algo_vpp_id, p.crypt_key,
1366 self.vpp_esp_protocol)
1367 p.tun_sa_out.add_vpp_config()
1369 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1370 p.auth_algo_vpp_id, p.auth_key,
1371 p.crypt_algo_vpp_id, p.crypt_key,
1372 self.vpp_esp_protocol)
1373 p.tun_sa_in.add_vpp_config()
1375 p.tun_if = VppGreInterface(self,
1377 self.pg0.remote_ip4)
1378 p.tun_if.add_vpp_config()
1380 p.tun_protect = VppIpsecTunProtect(self,
1384 p.tun_protect.add_vpp_config()
1387 p.tun_if.config_ip4()
1388 config_tra_params(p, self.encryption_type, p.tun_if)
1390 VppIpRoute(self, "1.1.1.2", 32,
1391 [VppRoutePath(p.tun_if.remote_ip4,
1392 0xffffffff)]).add_vpp_config()
1395 p = self.ipv4_params
1396 p.tun_if.unconfig_ip4()
1397 super(TestIpsecGreIfEspTra, self).tearDown()
1399 def test_gre_non_ip(self):
1400 p = self.ipv4_params
1401 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1402 src=p.remote_tun_if_host,
1403 dst=self.pg1.remote_ip6)
1404 self.send_and_assert_no_replies(self.tun_if, tx)
1405 node_name = ('/err/%s/unsupported payload' %
1406 self.tun4_decrypt_node_name)
1407 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1410 class TestIpsecGre6IfEspTra(TemplateIpsec,
1412 """ Ipsec GRE ESP - TRA tests """
1413 tun6_encrypt_node_name = "esp6-encrypt-tun"
1414 tun6_decrypt_node_name = "esp6-decrypt-tun"
1415 encryption_type = ESP
1417 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1419 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1420 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1421 dst=self.pg0.local_ip6) /
1423 IPv6(src=self.pg1.local_ip6,
1424 dst=self.pg1.remote_ip6) /
1425 UDP(sport=1144, dport=2233) /
1426 Raw(b'X' * payload_size))
1427 for i in range(count)]
1429 def gen_pkts6(self, sw_intf, src, dst, count=1,
1431 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1432 IPv6(src="1::1", dst="1::2") /
1433 UDP(sport=1144, dport=2233) /
1434 Raw(b'X' * payload_size)
1435 for i in range(count)]
1437 def verify_decrypted6(self, p, rxs):
1439 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1440 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1442 def verify_encrypted6(self, p, sa, rxs):
1445 pkt = sa.decrypt(rx[IPv6])
1446 if not pkt.haslayer(IPv6):
1447 pkt = IPv6(pkt[Raw].load)
1448 self.assert_packet_checksums_valid(pkt)
1449 self.assertTrue(pkt.haslayer(GRE))
1451 self.assertEqual(e[IPv6].dst, "1::2")
1452 except (IndexError, AssertionError):
1453 self.logger.debug(ppp("Unexpected packet:", rx))
1455 self.logger.debug(ppp("Decrypted packet:", pkt))
1461 super(TestIpsecGre6IfEspTra, self).setUp()
1463 self.tun_if = self.pg0
1465 p = self.ipv6_params
1467 bd1 = VppBridgeDomain(self, 1)
1468 bd1.add_vpp_config()
1470 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1471 p.auth_algo_vpp_id, p.auth_key,
1472 p.crypt_algo_vpp_id, p.crypt_key,
1473 self.vpp_esp_protocol)
1474 p.tun_sa_out.add_vpp_config()
1476 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1477 p.auth_algo_vpp_id, p.auth_key,
1478 p.crypt_algo_vpp_id, p.crypt_key,
1479 self.vpp_esp_protocol)
1480 p.tun_sa_in.add_vpp_config()
1482 p.tun_if = VppGreInterface(self,
1484 self.pg0.remote_ip6)
1485 p.tun_if.add_vpp_config()
1487 p.tun_protect = VppIpsecTunProtect(self,
1491 p.tun_protect.add_vpp_config()
1494 p.tun_if.config_ip6()
1495 config_tra_params(p, self.encryption_type, p.tun_if)
1497 r = VppIpRoute(self, "1::2", 128,
1498 [VppRoutePath(p.tun_if.remote_ip6,
1500 proto=DpoProto.DPO_PROTO_IP6)])
1504 p = self.ipv6_params
1505 p.tun_if.unconfig_ip6()
1506 super(TestIpsecGre6IfEspTra, self).tearDown()
1509 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1510 """ Ipsec mGRE ESP v4 TRA tests """
1511 tun4_encrypt_node_name = "esp4-encrypt-tun"
1512 tun4_decrypt_node_name = "esp4-decrypt-tun"
1513 encryption_type = ESP
1515 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1517 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1518 sa.encrypt(IP(src=p.tun_dst,
1519 dst=self.pg0.local_ip4) /
1521 IP(src=self.pg1.local_ip4,
1522 dst=self.pg1.remote_ip4) /
1523 UDP(sport=1144, dport=2233) /
1524 Raw(b'X' * payload_size))
1525 for i in range(count)]
1527 def gen_pkts(self, sw_intf, src, dst, count=1,
1529 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1530 IP(src="1.1.1.1", dst=dst) /
1531 UDP(sport=1144, dport=2233) /
1532 Raw(b'X' * payload_size)
1533 for i in range(count)]
1535 def verify_decrypted(self, p, rxs):
1537 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1538 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1540 def verify_encrypted(self, p, sa, rxs):
1543 pkt = sa.decrypt(rx[IP])
1544 if not pkt.haslayer(IP):
1545 pkt = IP(pkt[Raw].load)
1546 self.assert_packet_checksums_valid(pkt)
1547 self.assertTrue(pkt.haslayer(GRE))
1549 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1550 except (IndexError, AssertionError):
1551 self.logger.debug(ppp("Unexpected packet:", rx))
1553 self.logger.debug(ppp("Decrypted packet:", pkt))
1559 super(TestIpsecMGreIfEspTra4, self).setUp()
1562 self.tun_if = self.pg0
1563 p = self.ipv4_params
1564 p.tun_if = VppGreInterface(self,
1567 mode=(VppEnum.vl_api_tunnel_mode_t.
1568 TUNNEL_API_MODE_MP))
1569 p.tun_if.add_vpp_config()
1571 p.tun_if.config_ip4()
1572 p.tun_if.generate_remote_hosts(N_NHS)
1573 self.pg0.generate_remote_hosts(N_NHS)
1574 self.pg0.configure_ipv4_neighbors()
1576 # setup some SAs for several next-hops on the interface
1577 self.multi_params = []
1579 for ii in range(N_NHS):
1580 p = copy.copy(self.ipv4_params)
1582 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1583 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1584 p.scapy_tun_spi = p.scapy_tun_spi + ii
1585 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1586 p.vpp_tun_spi = p.vpp_tun_spi + ii
1588 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1589 p.scapy_tra_spi = p.scapy_tra_spi + ii
1590 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1591 p.vpp_tra_spi = p.vpp_tra_spi + ii
1592 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1593 p.auth_algo_vpp_id, p.auth_key,
1594 p.crypt_algo_vpp_id, p.crypt_key,
1595 self.vpp_esp_protocol)
1596 p.tun_sa_out.add_vpp_config()
1598 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1599 p.auth_algo_vpp_id, p.auth_key,
1600 p.crypt_algo_vpp_id, p.crypt_key,
1601 self.vpp_esp_protocol)
1602 p.tun_sa_in.add_vpp_config()
1604 p.tun_protect = VppIpsecTunProtect(
1609 nh=p.tun_if.remote_hosts[ii].ip4)
1610 p.tun_protect.add_vpp_config()
1611 config_tra_params(p, self.encryption_type, p.tun_if)
1612 self.multi_params.append(p)
1614 VppIpRoute(self, p.remote_tun_if_host, 32,
1615 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1616 p.tun_if.sw_if_index)]).add_vpp_config()
1618 # in this v4 variant add the teibs after the protect
1619 p.teib = VppTeib(self, p.tun_if,
1620 p.tun_if.remote_hosts[ii].ip4,
1621 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1622 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1623 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1626 p = self.ipv4_params
1627 p.tun_if.unconfig_ip4()
1628 super(TestIpsecMGreIfEspTra4, self).tearDown()
1630 def test_tun_44(self):
1633 for p in self.multi_params:
1634 self.verify_tun_44(p, count=N_PKTS)
1635 p.teib.remove_vpp_config()
1636 self.verify_tun_dropped_44(p, count=N_PKTS)
1637 p.teib.add_vpp_config()
1638 self.verify_tun_44(p, count=N_PKTS)
1641 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1642 """ Ipsec mGRE ESP v6 TRA tests """
1643 tun6_encrypt_node_name = "esp6-encrypt-tun"
1644 tun6_decrypt_node_name = "esp6-decrypt-tun"
1645 encryption_type = ESP
1647 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1649 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1650 sa.encrypt(IPv6(src=p.tun_dst,
1651 dst=self.pg0.local_ip6) /
1653 IPv6(src=self.pg1.local_ip6,
1654 dst=self.pg1.remote_ip6) /
1655 UDP(sport=1144, dport=2233) /
1656 Raw(b'X' * payload_size))
1657 for i in range(count)]
1659 def gen_pkts6(self, sw_intf, src, dst, count=1,
1661 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1662 IPv6(src="1::1", dst=dst) /
1663 UDP(sport=1144, dport=2233) /
1664 Raw(b'X' * payload_size)
1665 for i in range(count)]
1667 def verify_decrypted6(self, p, rxs):
1669 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1670 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1672 def verify_encrypted6(self, p, sa, rxs):
1675 pkt = sa.decrypt(rx[IPv6])
1676 if not pkt.haslayer(IPv6):
1677 pkt = IPv6(pkt[Raw].load)
1678 self.assert_packet_checksums_valid(pkt)
1679 self.assertTrue(pkt.haslayer(GRE))
1681 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1682 except (IndexError, AssertionError):
1683 self.logger.debug(ppp("Unexpected packet:", rx))
1685 self.logger.debug(ppp("Decrypted packet:", pkt))
1691 super(TestIpsecMGreIfEspTra6, self).setUp()
1693 self.vapi.cli("set logging class ipsec level debug")
1696 self.tun_if = self.pg0
1697 p = self.ipv6_params
1698 p.tun_if = VppGreInterface(self,
1701 mode=(VppEnum.vl_api_tunnel_mode_t.
1702 TUNNEL_API_MODE_MP))
1703 p.tun_if.add_vpp_config()
1705 p.tun_if.config_ip6()
1706 p.tun_if.generate_remote_hosts(N_NHS)
1707 self.pg0.generate_remote_hosts(N_NHS)
1708 self.pg0.configure_ipv6_neighbors()
1710 # setup some SAs for several next-hops on the interface
1711 self.multi_params = []
1713 for ii in range(N_NHS):
1714 p = copy.copy(self.ipv6_params)
1716 p.remote_tun_if_host = "1::%d" % (ii + 1)
1717 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1718 p.scapy_tun_spi = p.scapy_tun_spi + ii
1719 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1720 p.vpp_tun_spi = p.vpp_tun_spi + ii
1722 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1723 p.scapy_tra_spi = p.scapy_tra_spi + ii
1724 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1725 p.vpp_tra_spi = p.vpp_tra_spi + ii
1726 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1727 p.auth_algo_vpp_id, p.auth_key,
1728 p.crypt_algo_vpp_id, p.crypt_key,
1729 self.vpp_esp_protocol)
1730 p.tun_sa_out.add_vpp_config()
1732 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1733 p.auth_algo_vpp_id, p.auth_key,
1734 p.crypt_algo_vpp_id, p.crypt_key,
1735 self.vpp_esp_protocol)
1736 p.tun_sa_in.add_vpp_config()
1738 # in this v6 variant add the teibs first then the protection
1739 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1740 VppTeib(self, p.tun_if,
1741 p.tun_if.remote_hosts[ii].ip6,
1742 p.tun_dst).add_vpp_config()
1744 p.tun_protect = VppIpsecTunProtect(
1749 nh=p.tun_if.remote_hosts[ii].ip6)
1750 p.tun_protect.add_vpp_config()
1751 config_tra_params(p, self.encryption_type, p.tun_if)
1752 self.multi_params.append(p)
1754 VppIpRoute(self, p.remote_tun_if_host, 128,
1755 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1756 p.tun_if.sw_if_index)]).add_vpp_config()
1757 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1759 self.logger.info(self.vapi.cli("sh log"))
1760 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1761 self.logger.info(self.vapi.cli("sh adj 41"))
1764 p = self.ipv6_params
1765 p.tun_if.unconfig_ip6()
1766 super(TestIpsecMGreIfEspTra6, self).tearDown()
1768 def test_tun_66(self):
1770 for p in self.multi_params:
1771 self.verify_tun_66(p, count=63)
1774 class TemplateIpsec4TunProtect(object):
1775 """ IPsec IPv4 Tunnel protect """
1777 encryption_type = ESP
1778 tun4_encrypt_node_name = "esp4-encrypt-tun"
1779 tun4_decrypt_node_name = "esp4-decrypt-tun"
1780 tun4_input_node = "ipsec4-tun-input"
1782 def config_sa_tra(self, p):
1783 config_tun_params(p, self.encryption_type, p.tun_if)
1785 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1786 p.auth_algo_vpp_id, p.auth_key,
1787 p.crypt_algo_vpp_id, p.crypt_key,
1788 self.vpp_esp_protocol,
1790 p.tun_sa_out.add_vpp_config()
1792 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1793 p.auth_algo_vpp_id, p.auth_key,
1794 p.crypt_algo_vpp_id, p.crypt_key,
1795 self.vpp_esp_protocol,
1797 p.tun_sa_in.add_vpp_config()
1799 def config_sa_tun(self, p):
1800 config_tun_params(p, self.encryption_type, p.tun_if)
1802 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1803 p.auth_algo_vpp_id, p.auth_key,
1804 p.crypt_algo_vpp_id, p.crypt_key,
1805 self.vpp_esp_protocol,
1806 self.tun_if.local_addr[p.addr_type],
1807 self.tun_if.remote_addr[p.addr_type],
1809 p.tun_sa_out.add_vpp_config()
1811 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1812 p.auth_algo_vpp_id, p.auth_key,
1813 p.crypt_algo_vpp_id, p.crypt_key,
1814 self.vpp_esp_protocol,
1815 self.tun_if.remote_addr[p.addr_type],
1816 self.tun_if.local_addr[p.addr_type],
1818 p.tun_sa_in.add_vpp_config()
1820 def config_protect(self, p):
1821 p.tun_protect = VppIpsecTunProtect(self,
1825 p.tun_protect.add_vpp_config()
1827 def config_network(self, p):
1828 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1830 self.pg0.remote_ip4)
1831 p.tun_if.add_vpp_config()
1833 p.tun_if.config_ip4()
1834 p.tun_if.config_ip6()
1836 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1837 [VppRoutePath(p.tun_if.remote_ip4,
1839 p.route.add_vpp_config()
1840 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1841 [VppRoutePath(p.tun_if.remote_ip6,
1843 proto=DpoProto.DPO_PROTO_IP6)])
1846 def unconfig_network(self, p):
1847 p.route.remove_vpp_config()
1848 p.tun_if.remove_vpp_config()
1850 def unconfig_protect(self, p):
1851 p.tun_protect.remove_vpp_config()
1853 def unconfig_sa(self, p):
1854 p.tun_sa_out.remove_vpp_config()
1855 p.tun_sa_in.remove_vpp_config()
1858 class TestIpsec4TunProtect(TemplateIpsec,
1859 TemplateIpsec4TunProtect,
1861 """ IPsec IPv4 Tunnel protect - transport mode"""
1864 super(TestIpsec4TunProtect, self).setUp()
1866 self.tun_if = self.pg0
1869 super(TestIpsec4TunProtect, self).tearDown()
1871 def test_tun_44(self):
1872 """IPSEC tunnel protect"""
1874 p = self.ipv4_params
1876 self.config_network(p)
1877 self.config_sa_tra(p)
1878 self.config_protect(p)
1880 self.verify_tun_44(p, count=127)
1881 c = p.tun_if.get_rx_stats()
1882 self.assertEqual(c['packets'], 127)
1883 c = p.tun_if.get_tx_stats()
1884 self.assertEqual(c['packets'], 127)
1886 self.vapi.cli("clear ipsec sa")
1887 self.verify_tun_64(p, count=127)
1888 c = p.tun_if.get_rx_stats()
1889 self.assertEqual(c['packets'], 254)
1890 c = p.tun_if.get_tx_stats()
1891 self.assertEqual(c['packets'], 254)
1893 # rekey - create new SAs and update the tunnel protection
1895 np.crypt_key = b'X' + p.crypt_key[1:]
1896 np.scapy_tun_spi += 100
1897 np.scapy_tun_sa_id += 1
1898 np.vpp_tun_spi += 100
1899 np.vpp_tun_sa_id += 1
1900 np.tun_if.local_spi = p.vpp_tun_spi
1901 np.tun_if.remote_spi = p.scapy_tun_spi
1903 self.config_sa_tra(np)
1904 self.config_protect(np)
1907 self.verify_tun_44(np, count=127)
1908 c = p.tun_if.get_rx_stats()
1909 self.assertEqual(c['packets'], 381)
1910 c = p.tun_if.get_tx_stats()
1911 self.assertEqual(c['packets'], 381)
1914 self.unconfig_protect(np)
1915 self.unconfig_sa(np)
1916 self.unconfig_network(p)
1919 class TestIpsec4TunProtectUdp(TemplateIpsec,
1920 TemplateIpsec4TunProtect,
1922 """ IPsec IPv4 Tunnel protect - transport mode"""
1925 super(TestIpsec4TunProtectUdp, self).setUp()
1927 self.tun_if = self.pg0
1929 p = self.ipv4_params
1930 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1931 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1932 p.nat_header = UDP(sport=4500, dport=4500)
1933 self.config_network(p)
1934 self.config_sa_tra(p)
1935 self.config_protect(p)
1938 p = self.ipv4_params
1939 self.unconfig_protect(p)
1941 self.unconfig_network(p)
1942 super(TestIpsec4TunProtectUdp, self).tearDown()
1944 def verify_encrypted(self, p, sa, rxs):
1945 # ensure encrypted packets are recieved with the default UDP ports
1947 self.assertEqual(rx[UDP].sport, 4500)
1948 self.assertEqual(rx[UDP].dport, 4500)
1949 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1951 def test_tun_44(self):
1952 """IPSEC UDP tunnel protect"""
1954 p = self.ipv4_params
1956 self.verify_tun_44(p, count=127)
1957 c = p.tun_if.get_rx_stats()
1958 self.assertEqual(c['packets'], 127)
1959 c = p.tun_if.get_tx_stats()
1960 self.assertEqual(c['packets'], 127)
1962 def test_keepalive(self):
1963 """ IPSEC NAT Keepalive """
1964 self.verify_keepalive(self.ipv4_params)
1967 class TestIpsec4TunProtectTun(TemplateIpsec,
1968 TemplateIpsec4TunProtect,
1970 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1972 encryption_type = ESP
1973 tun4_encrypt_node_name = "esp4-encrypt-tun"
1974 tun4_decrypt_node_name = "esp4-decrypt-tun"
1977 super(TestIpsec4TunProtectTun, self).setUp()
1979 self.tun_if = self.pg0
1982 super(TestIpsec4TunProtectTun, self).tearDown()
1984 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1986 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1987 sa.encrypt(IP(src=sw_intf.remote_ip4,
1988 dst=sw_intf.local_ip4) /
1989 IP(src=src, dst=dst) /
1990 UDP(sport=1144, dport=2233) /
1991 Raw(b'X' * payload_size))
1992 for i in range(count)]
1994 def gen_pkts(self, sw_intf, src, dst, count=1,
1996 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1997 IP(src=src, dst=dst) /
1998 UDP(sport=1144, dport=2233) /
1999 Raw(b'X' * payload_size)
2000 for i in range(count)]
2002 def verify_decrypted(self, p, rxs):
2004 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2005 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2006 self.assert_packet_checksums_valid(rx)
2008 def verify_encrypted(self, p, sa, rxs):
2011 pkt = sa.decrypt(rx[IP])
2012 if not pkt.haslayer(IP):
2013 pkt = IP(pkt[Raw].load)
2014 self.assert_packet_checksums_valid(pkt)
2015 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2016 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2017 inner = pkt[IP].payload
2018 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2020 except (IndexError, AssertionError):
2021 self.logger.debug(ppp("Unexpected packet:", rx))
2023 self.logger.debug(ppp("Decrypted packet:", pkt))
2028 def test_tun_44(self):
2029 """IPSEC tunnel protect """
2031 p = self.ipv4_params
2033 self.config_network(p)
2034 self.config_sa_tun(p)
2035 self.config_protect(p)
2037 # also add an output features on the tunnel and physical interface
2038 # so we test they still work
2039 r_all = AclRule(True,
2040 src_prefix="0.0.0.0/0",
2041 dst_prefix="0.0.0.0/0",
2043 a = VppAcl(self, [r_all]).add_vpp_config()
2045 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2046 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2048 self.verify_tun_44(p, count=127)
2050 c = p.tun_if.get_rx_stats()
2051 self.assertEqual(c['packets'], 127)
2052 c = p.tun_if.get_tx_stats()
2053 self.assertEqual(c['packets'], 127)
2055 # rekey - create new SAs and update the tunnel protection
2057 np.crypt_key = b'X' + p.crypt_key[1:]
2058 np.scapy_tun_spi += 100
2059 np.scapy_tun_sa_id += 1
2060 np.vpp_tun_spi += 100
2061 np.vpp_tun_sa_id += 1
2062 np.tun_if.local_spi = p.vpp_tun_spi
2063 np.tun_if.remote_spi = p.scapy_tun_spi
2065 self.config_sa_tun(np)
2066 self.config_protect(np)
2069 self.verify_tun_44(np, count=127)
2070 c = p.tun_if.get_rx_stats()
2071 self.assertEqual(c['packets'], 254)
2072 c = p.tun_if.get_tx_stats()
2073 self.assertEqual(c['packets'], 254)
2076 self.unconfig_protect(np)
2077 self.unconfig_sa(np)
2078 self.unconfig_network(p)
2081 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2082 TemplateIpsec4TunProtect,
2084 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2086 encryption_type = ESP
2087 tun4_encrypt_node_name = "esp4-encrypt-tun"
2088 tun4_decrypt_node_name = "esp4-decrypt-tun"
2091 super(TestIpsec4TunProtectTunDrop, self).setUp()
2093 self.tun_if = self.pg0
2096 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2098 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2100 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2101 sa.encrypt(IP(src=sw_intf.remote_ip4,
2103 IP(src=src, dst=dst) /
2104 UDP(sport=1144, dport=2233) /
2105 Raw(b'X' * payload_size))
2106 for i in range(count)]
2108 def test_tun_drop_44(self):
2109 """IPSEC tunnel protect bogus tunnel header """
2111 p = self.ipv4_params
2113 self.config_network(p)
2114 self.config_sa_tun(p)
2115 self.config_protect(p)
2117 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2118 src=p.remote_tun_if_host,
2119 dst=self.pg1.remote_ip4,
2121 self.send_and_assert_no_replies(self.tun_if, tx)
2124 self.unconfig_protect(p)
2126 self.unconfig_network(p)
2129 class TemplateIpsec6TunProtect(object):
2130 """ IPsec IPv6 Tunnel protect """
2132 def config_sa_tra(self, p):
2133 config_tun_params(p, self.encryption_type, p.tun_if)
2135 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2136 p.auth_algo_vpp_id, p.auth_key,
2137 p.crypt_algo_vpp_id, p.crypt_key,
2138 self.vpp_esp_protocol)
2139 p.tun_sa_out.add_vpp_config()
2141 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2142 p.auth_algo_vpp_id, p.auth_key,
2143 p.crypt_algo_vpp_id, p.crypt_key,
2144 self.vpp_esp_protocol)
2145 p.tun_sa_in.add_vpp_config()
2147 def config_sa_tun(self, p):
2148 config_tun_params(p, self.encryption_type, p.tun_if)
2150 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2151 p.auth_algo_vpp_id, p.auth_key,
2152 p.crypt_algo_vpp_id, p.crypt_key,
2153 self.vpp_esp_protocol,
2154 self.tun_if.local_addr[p.addr_type],
2155 self.tun_if.remote_addr[p.addr_type])
2156 p.tun_sa_out.add_vpp_config()
2158 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2159 p.auth_algo_vpp_id, p.auth_key,
2160 p.crypt_algo_vpp_id, p.crypt_key,
2161 self.vpp_esp_protocol,
2162 self.tun_if.remote_addr[p.addr_type],
2163 self.tun_if.local_addr[p.addr_type])
2164 p.tun_sa_in.add_vpp_config()
2166 def config_protect(self, p):
2167 p.tun_protect = VppIpsecTunProtect(self,
2171 p.tun_protect.add_vpp_config()
2173 def config_network(self, p):
2174 p.tun_if = VppIpIpTunInterface(self, self.pg0,
2176 self.pg0.remote_ip6)
2177 p.tun_if.add_vpp_config()
2179 p.tun_if.config_ip6()
2180 p.tun_if.config_ip4()
2182 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2183 [VppRoutePath(p.tun_if.remote_ip6,
2185 proto=DpoProto.DPO_PROTO_IP6)])
2186 p.route.add_vpp_config()
2187 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2188 [VppRoutePath(p.tun_if.remote_ip4,
2192 def unconfig_network(self, p):
2193 p.route.remove_vpp_config()
2194 p.tun_if.remove_vpp_config()
2196 def unconfig_protect(self, p):
2197 p.tun_protect.remove_vpp_config()
2199 def unconfig_sa(self, p):
2200 p.tun_sa_out.remove_vpp_config()
2201 p.tun_sa_in.remove_vpp_config()
2204 class TestIpsec6TunProtect(TemplateIpsec,
2205 TemplateIpsec6TunProtect,
2207 """ IPsec IPv6 Tunnel protect - transport mode"""
2209 encryption_type = ESP
2210 tun6_encrypt_node_name = "esp6-encrypt-tun"
2211 tun6_decrypt_node_name = "esp6-decrypt-tun"
2214 super(TestIpsec6TunProtect, self).setUp()
2216 self.tun_if = self.pg0
2219 super(TestIpsec6TunProtect, self).tearDown()
2221 def test_tun_66(self):
2222 """IPSEC tunnel protect 6o6"""
2224 p = self.ipv6_params
2226 self.config_network(p)
2227 self.config_sa_tra(p)
2228 self.config_protect(p)
2230 self.verify_tun_66(p, count=127)
2231 c = p.tun_if.get_rx_stats()
2232 self.assertEqual(c['packets'], 127)
2233 c = p.tun_if.get_tx_stats()
2234 self.assertEqual(c['packets'], 127)
2236 # rekey - create new SAs and update the tunnel protection
2238 np.crypt_key = b'X' + p.crypt_key[1:]
2239 np.scapy_tun_spi += 100
2240 np.scapy_tun_sa_id += 1
2241 np.vpp_tun_spi += 100
2242 np.vpp_tun_sa_id += 1
2243 np.tun_if.local_spi = p.vpp_tun_spi
2244 np.tun_if.remote_spi = p.scapy_tun_spi
2246 self.config_sa_tra(np)
2247 self.config_protect(np)
2250 self.verify_tun_66(np, count=127)
2251 c = p.tun_if.get_rx_stats()
2252 self.assertEqual(c['packets'], 254)
2253 c = p.tun_if.get_tx_stats()
2254 self.assertEqual(c['packets'], 254)
2256 # bounce the interface state
2257 p.tun_if.admin_down()
2258 self.verify_drop_tun_66(np, count=127)
2259 node = ('/err/ipsec6-tun-input/%s' %
2260 'ipsec packets received on disabled interface')
2261 self.assertEqual(127, self.statistics.get_err_counter(node))
2263 self.verify_tun_66(np, count=127)
2266 # 1) add two input SAs [old, new]
2267 # 2) swap output SA to [new]
2268 # 3) use only [new] input SA
2270 np3.crypt_key = b'Z' + p.crypt_key[1:]
2271 np3.scapy_tun_spi += 100
2272 np3.scapy_tun_sa_id += 1
2273 np3.vpp_tun_spi += 100
2274 np3.vpp_tun_sa_id += 1
2275 np3.tun_if.local_spi = p.vpp_tun_spi
2276 np3.tun_if.remote_spi = p.scapy_tun_spi
2278 self.config_sa_tra(np3)
2281 p.tun_protect.update_vpp_config(np.tun_sa_out,
2282 [np.tun_sa_in, np3.tun_sa_in])
2283 self.verify_tun_66(np, np, count=127)
2284 self.verify_tun_66(np3, np, count=127)
2287 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2288 [np.tun_sa_in, np3.tun_sa_in])
2289 self.verify_tun_66(np, np3, count=127)
2290 self.verify_tun_66(np3, np3, count=127)
2293 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2295 self.verify_tun_66(np3, np3, count=127)
2296 self.verify_drop_tun_66(np, count=127)
2298 c = p.tun_if.get_rx_stats()
2299 self.assertEqual(c['packets'], 127*9)
2300 c = p.tun_if.get_tx_stats()
2301 self.assertEqual(c['packets'], 127*8)
2302 self.unconfig_sa(np)
2305 self.unconfig_protect(np3)
2306 self.unconfig_sa(np3)
2307 self.unconfig_network(p)
2309 def test_tun_46(self):
2310 """IPSEC tunnel protect 4o6"""
2312 p = self.ipv6_params
2314 self.config_network(p)
2315 self.config_sa_tra(p)
2316 self.config_protect(p)
2318 self.verify_tun_46(p, count=127)
2319 c = p.tun_if.get_rx_stats()
2320 self.assertEqual(c['packets'], 127)
2321 c = p.tun_if.get_tx_stats()
2322 self.assertEqual(c['packets'], 127)
2325 self.unconfig_protect(p)
2327 self.unconfig_network(p)
2330 class TestIpsec6TunProtectTun(TemplateIpsec,
2331 TemplateIpsec6TunProtect,
2333 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2335 encryption_type = ESP
2336 tun6_encrypt_node_name = "esp6-encrypt-tun"
2337 tun6_decrypt_node_name = "esp6-decrypt-tun"
2340 super(TestIpsec6TunProtectTun, self).setUp()
2342 self.tun_if = self.pg0
2345 super(TestIpsec6TunProtectTun, self).tearDown()
2347 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2349 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2350 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2351 dst=sw_intf.local_ip6) /
2352 IPv6(src=src, dst=dst) /
2353 UDP(sport=1166, dport=2233) /
2354 Raw(b'X' * payload_size))
2355 for i in range(count)]
2357 def gen_pkts6(self, sw_intf, src, dst, count=1,
2359 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2360 IPv6(src=src, dst=dst) /
2361 UDP(sport=1166, dport=2233) /
2362 Raw(b'X' * payload_size)
2363 for i in range(count)]
2365 def verify_decrypted6(self, p, rxs):
2367 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2368 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2369 self.assert_packet_checksums_valid(rx)
2371 def verify_encrypted6(self, p, sa, rxs):
2374 pkt = sa.decrypt(rx[IPv6])
2375 if not pkt.haslayer(IPv6):
2376 pkt = IPv6(pkt[Raw].load)
2377 self.assert_packet_checksums_valid(pkt)
2378 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2379 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2380 inner = pkt[IPv6].payload
2381 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2383 except (IndexError, AssertionError):
2384 self.logger.debug(ppp("Unexpected packet:", rx))
2386 self.logger.debug(ppp("Decrypted packet:", pkt))
2391 def test_tun_66(self):
2392 """IPSEC tunnel protect """
2394 p = self.ipv6_params
2396 self.config_network(p)
2397 self.config_sa_tun(p)
2398 self.config_protect(p)
2400 self.verify_tun_66(p, count=127)
2402 c = p.tun_if.get_rx_stats()
2403 self.assertEqual(c['packets'], 127)
2404 c = p.tun_if.get_tx_stats()
2405 self.assertEqual(c['packets'], 127)
2407 # rekey - create new SAs and update the tunnel protection
2409 np.crypt_key = b'X' + p.crypt_key[1:]
2410 np.scapy_tun_spi += 100
2411 np.scapy_tun_sa_id += 1
2412 np.vpp_tun_spi += 100
2413 np.vpp_tun_sa_id += 1
2414 np.tun_if.local_spi = p.vpp_tun_spi
2415 np.tun_if.remote_spi = p.scapy_tun_spi
2417 self.config_sa_tun(np)
2418 self.config_protect(np)
2421 self.verify_tun_66(np, count=127)
2422 c = p.tun_if.get_rx_stats()
2423 self.assertEqual(c['packets'], 254)
2424 c = p.tun_if.get_tx_stats()
2425 self.assertEqual(c['packets'], 254)
2428 self.unconfig_protect(np)
2429 self.unconfig_sa(np)
2430 self.unconfig_network(p)
2433 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2434 TemplateIpsec6TunProtect,
2436 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2438 encryption_type = ESP
2439 tun6_encrypt_node_name = "esp6-encrypt-tun"
2440 tun6_decrypt_node_name = "esp6-decrypt-tun"
2443 super(TestIpsec6TunProtectTunDrop, self).setUp()
2445 self.tun_if = self.pg0
2448 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2450 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2452 # the IP destination of the revelaed packet does not match
2453 # that assigned to the tunnel
2454 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2455 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2457 IPv6(src=src, dst=dst) /
2458 UDP(sport=1144, dport=2233) /
2459 Raw(b'X' * payload_size))
2460 for i in range(count)]
2462 def test_tun_drop_66(self):
2463 """IPSEC 6 tunnel protect bogus tunnel header """
2465 p = self.ipv6_params
2467 self.config_network(p)
2468 self.config_sa_tun(p)
2469 self.config_protect(p)
2471 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2472 src=p.remote_tun_if_host,
2473 dst=self.pg1.remote_ip6,
2475 self.send_and_assert_no_replies(self.tun_if, tx)
2477 self.unconfig_protect(p)
2479 self.unconfig_network(p)
2482 class TemplateIpsecItf4(object):
2483 """ IPsec Interface IPv4 """
2485 encryption_type = ESP
2486 tun4_encrypt_node_name = "esp4-encrypt-tun"
2487 tun4_decrypt_node_name = "esp4-decrypt-tun"
2488 tun4_input_node = "ipsec4-tun-input"
2490 def config_sa_tun(self, p, src, dst):
2491 config_tun_params(p, self.encryption_type, None, src, dst)
2493 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2494 p.auth_algo_vpp_id, p.auth_key,
2495 p.crypt_algo_vpp_id, p.crypt_key,
2496 self.vpp_esp_protocol,
2499 p.tun_sa_out.add_vpp_config()
2501 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2502 p.auth_algo_vpp_id, p.auth_key,
2503 p.crypt_algo_vpp_id, p.crypt_key,
2504 self.vpp_esp_protocol,
2507 p.tun_sa_in.add_vpp_config()
2509 def config_protect(self, p):
2510 p.tun_protect = VppIpsecTunProtect(self,
2514 p.tun_protect.add_vpp_config()
2516 def config_network(self, p, instance=0xffffffff):
2517 p.tun_if = VppIpsecInterface(self, instance=instance)
2519 p.tun_if.add_vpp_config()
2521 p.tun_if.config_ip4()
2522 p.tun_if.config_ip6()
2524 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2525 [VppRoutePath(p.tun_if.remote_ip4,
2527 p.route.add_vpp_config()
2528 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2529 [VppRoutePath(p.tun_if.remote_ip6,
2531 proto=DpoProto.DPO_PROTO_IP6)])
2534 def unconfig_network(self, p):
2535 p.route.remove_vpp_config()
2536 p.tun_if.remove_vpp_config()
2538 def unconfig_protect(self, p):
2539 p.tun_protect.remove_vpp_config()
2541 def unconfig_sa(self, p):
2542 p.tun_sa_out.remove_vpp_config()
2543 p.tun_sa_in.remove_vpp_config()
2546 class TestIpsecItf4(TemplateIpsec,
2549 """ IPsec Interface IPv4 """
2552 super(TestIpsecItf4, self).setUp()
2554 self.tun_if = self.pg0
2557 super(TestIpsecItf4, self).tearDown()
2559 def test_tun_instance_44(self):
2560 p = self.ipv4_params
2561 self.config_network(p, instance=3)
2563 with self.assertRaises(CliFailedCommandError):
2564 self.vapi.cli("show interface ipsec0")
2566 output = self.vapi.cli("show interface ipsec3")
2567 self.assertTrue("unknown" not in output)
2569 self.unconfig_network(p)
2571 def test_tun_44(self):
2572 """IPSEC interface IPv4"""
2575 p = self.ipv4_params
2577 self.config_network(p)
2578 self.config_sa_tun(p,
2580 self.pg0.remote_ip4)
2581 self.config_protect(p)
2583 self.verify_tun_44(p, count=n_pkts)
2584 c = p.tun_if.get_rx_stats()
2585 self.assertEqual(c['packets'], n_pkts)
2586 c = p.tun_if.get_tx_stats()
2587 self.assertEqual(c['packets'], n_pkts)
2589 p.tun_if.admin_down()
2590 self.verify_tun_dropped_44(p, count=n_pkts)
2592 self.verify_tun_44(p, count=n_pkts)
2594 c = p.tun_if.get_rx_stats()
2595 self.assertEqual(c['packets'], 3*n_pkts)
2596 c = p.tun_if.get_tx_stats()
2597 self.assertEqual(c['packets'], 2*n_pkts)
2599 # it's a v6 packet when its encrypted
2600 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2602 self.verify_tun_64(p, count=n_pkts)
2603 c = p.tun_if.get_rx_stats()
2604 self.assertEqual(c['packets'], 4*n_pkts)
2605 c = p.tun_if.get_tx_stats()
2606 self.assertEqual(c['packets'], 3*n_pkts)
2608 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2610 self.vapi.cli("clear interfaces")
2612 # rekey - create new SAs and update the tunnel protection
2614 np.crypt_key = b'X' + p.crypt_key[1:]
2615 np.scapy_tun_spi += 100
2616 np.scapy_tun_sa_id += 1
2617 np.vpp_tun_spi += 100
2618 np.vpp_tun_sa_id += 1
2619 np.tun_if.local_spi = p.vpp_tun_spi
2620 np.tun_if.remote_spi = p.scapy_tun_spi
2622 self.config_sa_tun(np,
2624 self.pg0.remote_ip4)
2625 self.config_protect(np)
2628 self.verify_tun_44(np, count=n_pkts)
2629 c = p.tun_if.get_rx_stats()
2630 self.assertEqual(c['packets'], n_pkts)
2631 c = p.tun_if.get_tx_stats()
2632 self.assertEqual(c['packets'], n_pkts)
2635 self.unconfig_protect(np)
2636 self.unconfig_sa(np)
2637 self.unconfig_network(p)
2639 def test_tun_44_null(self):
2640 """IPSEC interface IPv4 NULL auth/crypto"""
2643 p = copy.copy(self.ipv4_params)
2645 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2646 IPSEC_API_INTEG_ALG_NONE)
2647 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2648 IPSEC_API_CRYPTO_ALG_NONE)
2649 p.crypt_algo = "NULL"
2650 p.auth_algo = "NULL"
2652 self.config_network(p)
2653 self.config_sa_tun(p,
2655 self.pg0.remote_ip4)
2656 self.config_protect(p)
2658 self.verify_tun_44(p, count=n_pkts)
2661 self.unconfig_protect(p)
2663 self.unconfig_network(p)
2666 class TemplateIpsecItf6(object):
2667 """ IPsec Interface IPv6 """
2669 encryption_type = ESP
2670 tun6_encrypt_node_name = "esp6-encrypt-tun"
2671 tun6_decrypt_node_name = "esp6-decrypt-tun"
2672 tun6_input_node = "ipsec6-tun-input"
2674 def config_sa_tun(self, p, src, dst):
2675 config_tun_params(p, self.encryption_type, None, src, dst)
2677 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2678 p.auth_algo_vpp_id, p.auth_key,
2679 p.crypt_algo_vpp_id, p.crypt_key,
2680 self.vpp_esp_protocol,
2683 p.tun_sa_out.add_vpp_config()
2685 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2686 p.auth_algo_vpp_id, p.auth_key,
2687 p.crypt_algo_vpp_id, p.crypt_key,
2688 self.vpp_esp_protocol,
2691 p.tun_sa_in.add_vpp_config()
2693 def config_protect(self, p):
2694 p.tun_protect = VppIpsecTunProtect(self,
2698 p.tun_protect.add_vpp_config()
2700 def config_network(self, p):
2701 p.tun_if = VppIpsecInterface(self)
2703 p.tun_if.add_vpp_config()
2705 p.tun_if.config_ip4()
2706 p.tun_if.config_ip6()
2708 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2709 [VppRoutePath(p.tun_if.remote_ip4,
2713 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2714 [VppRoutePath(p.tun_if.remote_ip6,
2716 proto=DpoProto.DPO_PROTO_IP6)])
2717 p.route.add_vpp_config()
2719 def unconfig_network(self, p):
2720 p.route.remove_vpp_config()
2721 p.tun_if.remove_vpp_config()
2723 def unconfig_protect(self, p):
2724 p.tun_protect.remove_vpp_config()
2726 def unconfig_sa(self, p):
2727 p.tun_sa_out.remove_vpp_config()
2728 p.tun_sa_in.remove_vpp_config()
2731 class TestIpsecItf6(TemplateIpsec,
2734 """ IPsec Interface IPv6 """
2737 super(TestIpsecItf6, self).setUp()
2739 self.tun_if = self.pg0
2742 super(TestIpsecItf6, self).tearDown()
2744 def test_tun_44(self):
2745 """IPSEC interface IPv6"""
2748 p = self.ipv6_params
2750 self.config_network(p)
2751 self.config_sa_tun(p,
2753 self.pg0.remote_ip6)
2754 self.config_protect(p)
2756 self.verify_tun_66(p, count=n_pkts)
2757 c = p.tun_if.get_rx_stats()
2758 self.assertEqual(c['packets'], n_pkts)
2759 c = p.tun_if.get_tx_stats()
2760 self.assertEqual(c['packets'], n_pkts)
2762 p.tun_if.admin_down()
2763 self.verify_drop_tun_66(p, count=n_pkts)
2765 self.verify_tun_66(p, count=n_pkts)
2767 c = p.tun_if.get_rx_stats()
2768 self.assertEqual(c['packets'], 3*n_pkts)
2769 c = p.tun_if.get_tx_stats()
2770 self.assertEqual(c['packets'], 2*n_pkts)
2772 # it's a v4 packet when its encrypted
2773 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2775 self.verify_tun_46(p, count=n_pkts)
2776 c = p.tun_if.get_rx_stats()
2777 self.assertEqual(c['packets'], 4*n_pkts)
2778 c = p.tun_if.get_tx_stats()
2779 self.assertEqual(c['packets'], 3*n_pkts)
2781 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2783 self.vapi.cli("clear interfaces")
2785 # rekey - create new SAs and update the tunnel protection
2787 np.crypt_key = b'X' + p.crypt_key[1:]
2788 np.scapy_tun_spi += 100
2789 np.scapy_tun_sa_id += 1
2790 np.vpp_tun_spi += 100
2791 np.vpp_tun_sa_id += 1
2792 np.tun_if.local_spi = p.vpp_tun_spi
2793 np.tun_if.remote_spi = p.scapy_tun_spi
2795 self.config_sa_tun(np,
2797 self.pg0.remote_ip6)
2798 self.config_protect(np)
2801 self.verify_tun_66(np, count=n_pkts)
2802 c = p.tun_if.get_rx_stats()
2803 self.assertEqual(c['packets'], n_pkts)
2804 c = p.tun_if.get_tx_stats()
2805 self.assertEqual(c['packets'], n_pkts)
2808 self.unconfig_protect(np)
2809 self.unconfig_sa(np)
2810 self.unconfig_network(p)
2813 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2814 """ Ipsec P2MP ESP v4 tests """
2815 tun4_encrypt_node_name = "esp4-encrypt-tun"
2816 tun4_decrypt_node_name = "esp4-decrypt-tun"
2817 encryption_type = ESP
2819 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2821 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2822 sa.encrypt(IP(src=self.pg1.local_ip4,
2823 dst=self.pg1.remote_ip4) /
2824 UDP(sport=1144, dport=2233) /
2825 Raw(b'X' * payload_size))
2826 for i in range(count)]
2828 def gen_pkts(self, sw_intf, src, dst, count=1,
2830 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2831 IP(src="1.1.1.1", dst=dst) /
2832 UDP(sport=1144, dport=2233) /
2833 Raw(b'X' * payload_size)
2834 for i in range(count)]
2836 def verify_decrypted(self, p, rxs):
2838 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2839 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2841 def verify_encrypted(self, p, sa, rxs):
2844 pkt = sa.decrypt(rx[IP])
2845 if not pkt.haslayer(IP):
2846 pkt = IP(pkt[Raw].load)
2847 self.assert_packet_checksums_valid(pkt)
2849 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2850 except (IndexError, AssertionError):
2851 self.logger.debug(ppp("Unexpected packet:", rx))
2853 self.logger.debug(ppp("Decrypted packet:", pkt))
2859 super(TestIpsecMIfEsp4, self).setUp()
2862 self.tun_if = self.pg0
2863 p = self.ipv4_params
2864 p.tun_if = VppIpsecInterface(self,
2865 mode=(VppEnum.vl_api_tunnel_mode_t.
2866 TUNNEL_API_MODE_MP))
2867 p.tun_if.add_vpp_config()
2869 p.tun_if.config_ip4()
2870 p.tun_if.generate_remote_hosts(N_NHS)
2871 self.pg0.generate_remote_hosts(N_NHS)
2872 self.pg0.configure_ipv4_neighbors()
2874 # setup some SAs for several next-hops on the interface
2875 self.multi_params = []
2877 for ii in range(N_NHS):
2878 p = copy.copy(self.ipv4_params)
2880 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2881 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2882 p.scapy_tun_spi = p.scapy_tun_spi + ii
2883 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2884 p.vpp_tun_spi = p.vpp_tun_spi + ii
2886 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2887 p.scapy_tra_spi = p.scapy_tra_spi + ii
2888 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2889 p.vpp_tra_spi = p.vpp_tra_spi + ii
2890 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2891 p.auth_algo_vpp_id, p.auth_key,
2892 p.crypt_algo_vpp_id, p.crypt_key,
2893 self.vpp_esp_protocol,
2895 self.pg0.remote_hosts[ii].ip4)
2896 p.tun_sa_out.add_vpp_config()
2898 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2899 p.auth_algo_vpp_id, p.auth_key,
2900 p.crypt_algo_vpp_id, p.crypt_key,
2901 self.vpp_esp_protocol,
2902 self.pg0.remote_hosts[ii].ip4,
2904 p.tun_sa_in.add_vpp_config()
2906 p.tun_protect = VppIpsecTunProtect(
2911 nh=p.tun_if.remote_hosts[ii].ip4)
2912 p.tun_protect.add_vpp_config()
2913 config_tun_params(p, self.encryption_type, None,
2915 self.pg0.remote_hosts[ii].ip4)
2916 self.multi_params.append(p)
2918 VppIpRoute(self, p.remote_tun_if_host, 32,
2919 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2920 p.tun_if.sw_if_index)]).add_vpp_config()
2922 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2925 p = self.ipv4_params
2926 p.tun_if.unconfig_ip4()
2927 super(TestIpsecMIfEsp4, self).tearDown()
2929 def test_tun_44(self):
2932 for p in self.multi_params:
2933 self.verify_tun_44(p, count=N_PKTS)
2936 if __name__ == '__main__':
2937 unittest.main(testRunner=VppTestRunner)