5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import VppTestRunner
12 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
13 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
14 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
18 VppMplsTable, VppMplsRoute, FibPathProto
19 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
20 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
21 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
22 from vpp_teib import VppTeib
24 from vpp_papi import VppEnum
25 from vpp_papi_provider import CliFailedCommandError
26 from vpp_acl import AclRule, VppAcl, VppAclInterface
29 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
30 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
31 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
32 IPSEC_API_SAD_FLAG_USE_ESN))
33 crypt_key = mk_scapy_crypt_key(p)
35 p.tun_dst = tun_if.remote_ip
36 p.tun_src = tun_if.local_ip
41 p.scapy_tun_sa = SecurityAssociation(
42 encryption_type, spi=p.vpp_tun_spi,
43 crypt_algo=p.crypt_algo,
45 auth_algo=p.auth_algo, auth_key=p.auth_key,
46 tunnel_header=ip_class_by_addr_type[p.addr_type](
49 nat_t_header=p.nat_header,
51 p.vpp_tun_sa = SecurityAssociation(
52 encryption_type, spi=p.scapy_tun_spi,
53 crypt_algo=p.crypt_algo,
55 auth_algo=p.auth_algo, auth_key=p.auth_key,
56 tunnel_header=ip_class_by_addr_type[p.addr_type](
59 nat_t_header=p.nat_header,
63 def config_tra_params(p, encryption_type, tun_if):
64 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
65 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
66 IPSEC_API_SAD_FLAG_USE_ESN))
67 crypt_key = mk_scapy_crypt_key(p)
68 p.tun_dst = tun_if.remote_ip
69 p.tun_src = tun_if.local_ip
70 p.scapy_tun_sa = SecurityAssociation(
71 encryption_type, spi=p.vpp_tun_spi,
72 crypt_algo=p.crypt_algo,
74 auth_algo=p.auth_algo, auth_key=p.auth_key,
76 nat_t_header=p.nat_header)
77 p.vpp_tun_sa = SecurityAssociation(
78 encryption_type, spi=p.scapy_tun_spi,
79 crypt_algo=p.crypt_algo,
81 auth_algo=p.auth_algo, auth_key=p.auth_key,
83 nat_t_header=p.nat_header)
86 class TemplateIpsec4TunProtect(object):
87 """ IPsec IPv4 Tunnel protect """
90 tun4_encrypt_node_name = "esp4-encrypt-tun"
91 tun4_decrypt_node_name = "esp4-decrypt-tun"
92 tun4_input_node = "ipsec4-tun-input"
94 def config_sa_tra(self, p):
95 config_tun_params(p, self.encryption_type, p.tun_if)
97 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
98 p.auth_algo_vpp_id, p.auth_key,
99 p.crypt_algo_vpp_id, p.crypt_key,
100 self.vpp_esp_protocol,
102 p.tun_sa_out.add_vpp_config()
104 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
105 p.auth_algo_vpp_id, p.auth_key,
106 p.crypt_algo_vpp_id, p.crypt_key,
107 self.vpp_esp_protocol,
109 p.tun_sa_in.add_vpp_config()
111 def config_sa_tun(self, p):
112 config_tun_params(p, self.encryption_type, p.tun_if)
114 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
115 p.auth_algo_vpp_id, p.auth_key,
116 p.crypt_algo_vpp_id, p.crypt_key,
117 self.vpp_esp_protocol,
118 self.tun_if.local_addr[p.addr_type],
119 self.tun_if.remote_addr[p.addr_type],
121 p.tun_sa_out.add_vpp_config()
123 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
124 p.auth_algo_vpp_id, p.auth_key,
125 p.crypt_algo_vpp_id, p.crypt_key,
126 self.vpp_esp_protocol,
127 self.tun_if.remote_addr[p.addr_type],
128 self.tun_if.local_addr[p.addr_type],
130 p.tun_sa_in.add_vpp_config()
132 def config_protect(self, p):
133 p.tun_protect = VppIpsecTunProtect(self,
137 p.tun_protect.add_vpp_config()
139 def config_network(self, p):
140 if hasattr(p, 'tun_dst'):
143 tun_dst = self.pg0.remote_ip4
144 p.tun_if = VppIpIpTunInterface(self, self.pg0,
147 p.tun_if.add_vpp_config()
149 p.tun_if.config_ip4()
150 p.tun_if.config_ip6()
152 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
153 [VppRoutePath(p.tun_if.remote_ip4,
155 p.route.add_vpp_config()
156 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
157 [VppRoutePath(p.tun_if.remote_ip6,
159 proto=DpoProto.DPO_PROTO_IP6)])
162 def unconfig_network(self, p):
163 p.route.remove_vpp_config()
164 p.tun_if.remove_vpp_config()
166 def unconfig_protect(self, p):
167 p.tun_protect.remove_vpp_config()
169 def unconfig_sa(self, p):
170 p.tun_sa_out.remove_vpp_config()
171 p.tun_sa_in.remove_vpp_config()
174 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
176 """ IPsec tunnel interface tests """
178 encryption_type = ESP
182 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
185 def tearDownClass(cls):
186 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
189 super(TemplateIpsec4TunIfEsp, self).setUp()
191 self.tun_if = self.pg0
195 self.config_network(p)
196 self.config_sa_tra(p)
197 self.config_protect(p)
200 super(TemplateIpsec4TunIfEsp, self).tearDown()
203 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
205 """ IPsec UDP tunnel interface tests """
207 tun4_encrypt_node_name = "esp4-encrypt-tun"
208 tun4_decrypt_node_name = "esp4-decrypt-tun"
209 encryption_type = ESP
213 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
216 def tearDownClass(cls):
217 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
219 def verify_encrypted(self, p, sa, rxs):
222 # ensure the UDP ports are correct before we decrypt
224 self.assertTrue(rx.haslayer(UDP))
225 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
226 self.assert_equal(rx[UDP].dport, 4500)
228 pkt = sa.decrypt(rx[IP])
229 if not pkt.haslayer(IP):
230 pkt = IP(pkt[Raw].load)
232 self.assert_packet_checksums_valid(pkt)
233 self.assert_equal(pkt[IP].dst, "1.1.1.1")
234 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
235 except (IndexError, AssertionError):
236 self.logger.debug(ppp("Unexpected packet:", rx))
238 self.logger.debug(ppp("Decrypted packet:", pkt))
243 def config_sa_tra(self, p):
244 config_tun_params(p, self.encryption_type, p.tun_if)
246 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
247 p.auth_algo_vpp_id, p.auth_key,
248 p.crypt_algo_vpp_id, p.crypt_key,
249 self.vpp_esp_protocol,
251 udp_src=p.nat_header.sport,
252 udp_dst=p.nat_header.dport)
253 p.tun_sa_out.add_vpp_config()
255 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
256 p.auth_algo_vpp_id, p.auth_key,
257 p.crypt_algo_vpp_id, p.crypt_key,
258 self.vpp_esp_protocol,
260 udp_src=p.nat_header.sport,
261 udp_dst=p.nat_header.dport)
262 p.tun_sa_in.add_vpp_config()
265 super(TemplateIpsec4TunIfEspUdp, self).setUp()
268 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
269 IPSEC_API_SAD_FLAG_UDP_ENCAP)
270 p.nat_header = UDP(sport=5454, dport=4500)
272 self.tun_if = self.pg0
274 self.config_network(p)
275 self.config_sa_tra(p)
276 self.config_protect(p)
279 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
282 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
283 """ Ipsec ESP - TUN tests """
284 tun4_encrypt_node_name = "esp4-encrypt-tun"
285 tun4_decrypt_node_name = "esp4-decrypt-tun"
287 def test_tun_basic64(self):
288 """ ipsec 6o4 tunnel basic test """
289 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
291 self.verify_tun_64(self.params[socket.AF_INET], count=1)
293 def test_tun_burst64(self):
294 """ ipsec 6o4 tunnel basic test """
295 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
297 self.verify_tun_64(self.params[socket.AF_INET], count=257)
299 def test_tun_basic_frag44(self):
300 """ ipsec 4o4 tunnel frag basic test """
301 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
305 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
307 self.verify_tun_44(self.params[socket.AF_INET],
308 count=1, payload_size=1800, n_rx=2)
309 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
313 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
314 """ Ipsec ESP UDP tests """
316 tun4_input_node = "ipsec4-tun-input"
319 super(TestIpsec4TunIfEspUdp, self).setUp()
321 def test_keepalive(self):
322 """ IPSEC NAT Keepalive """
323 self.verify_keepalive(self.ipv4_params)
326 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
327 """ Ipsec ESP UDP GCM tests """
329 tun4_input_node = "ipsec4-tun-input"
332 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
334 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
335 IPSEC_API_INTEG_ALG_NONE)
336 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
337 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
338 p.crypt_algo = "AES-GCM"
340 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
344 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
345 """ Ipsec ESP - TCP tests """
349 class TemplateIpsec6TunProtect(object):
350 """ IPsec IPv6 Tunnel protect """
352 def config_sa_tra(self, p):
353 config_tun_params(p, self.encryption_type, p.tun_if)
355 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
356 p.auth_algo_vpp_id, p.auth_key,
357 p.crypt_algo_vpp_id, p.crypt_key,
358 self.vpp_esp_protocol)
359 p.tun_sa_out.add_vpp_config()
361 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
362 p.auth_algo_vpp_id, p.auth_key,
363 p.crypt_algo_vpp_id, p.crypt_key,
364 self.vpp_esp_protocol)
365 p.tun_sa_in.add_vpp_config()
367 def config_sa_tun(self, p):
368 config_tun_params(p, self.encryption_type, p.tun_if)
370 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
371 p.auth_algo_vpp_id, p.auth_key,
372 p.crypt_algo_vpp_id, p.crypt_key,
373 self.vpp_esp_protocol,
374 self.tun_if.local_addr[p.addr_type],
375 self.tun_if.remote_addr[p.addr_type])
376 p.tun_sa_out.add_vpp_config()
378 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
379 p.auth_algo_vpp_id, p.auth_key,
380 p.crypt_algo_vpp_id, p.crypt_key,
381 self.vpp_esp_protocol,
382 self.tun_if.remote_addr[p.addr_type],
383 self.tun_if.local_addr[p.addr_type])
384 p.tun_sa_in.add_vpp_config()
386 def config_protect(self, p):
387 p.tun_protect = VppIpsecTunProtect(self,
391 p.tun_protect.add_vpp_config()
393 def config_network(self, p):
394 if hasattr(p, 'tun_dst'):
397 tun_dst = self.pg0.remote_ip6
398 p.tun_if = VppIpIpTunInterface(self, self.pg0,
401 p.tun_if.add_vpp_config()
403 p.tun_if.config_ip6()
404 p.tun_if.config_ip4()
406 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
407 [VppRoutePath(p.tun_if.remote_ip6,
409 proto=DpoProto.DPO_PROTO_IP6)])
410 p.route.add_vpp_config()
411 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
412 [VppRoutePath(p.tun_if.remote_ip4,
416 def unconfig_network(self, p):
417 p.route.remove_vpp_config()
418 p.tun_if.remove_vpp_config()
420 def unconfig_protect(self, p):
421 p.tun_protect.remove_vpp_config()
423 def unconfig_sa(self, p):
424 p.tun_sa_out.remove_vpp_config()
425 p.tun_sa_in.remove_vpp_config()
428 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
430 """ IPsec tunnel interface tests """
432 encryption_type = ESP
435 super(TemplateIpsec6TunIfEsp, self).setUp()
437 self.tun_if = self.pg0
440 self.config_network(p)
441 self.config_sa_tra(p)
442 self.config_protect(p)
445 super(TemplateIpsec6TunIfEsp, self).tearDown()
448 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
450 """ Ipsec ESP - TUN tests """
451 tun6_encrypt_node_name = "esp6-encrypt-tun"
452 tun6_decrypt_node_name = "esp6-decrypt-tun"
454 def test_tun_basic46(self):
455 """ ipsec 4o6 tunnel basic test """
456 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
457 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
459 def test_tun_burst46(self):
460 """ ipsec 4o6 tunnel burst test """
461 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
462 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
465 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
466 IpsecTun6HandoffTests):
467 """ Ipsec ESP 6 Handoff tests """
468 tun6_encrypt_node_name = "esp6-encrypt-tun"
469 tun6_decrypt_node_name = "esp6-decrypt-tun"
472 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
473 IpsecTun4HandoffTests):
474 """ Ipsec ESP 4 Handoff tests """
475 tun4_encrypt_node_name = "esp4-encrypt-tun"
476 tun4_decrypt_node_name = "esp4-decrypt-tun"
479 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
482 """ IPsec IPv4 Multi Tunnel interface """
484 encryption_type = ESP
485 tun4_encrypt_node_name = "esp4-encrypt-tun"
486 tun4_decrypt_node_name = "esp4-decrypt-tun"
489 super(TestIpsec4MultiTunIfEsp, self).setUp()
491 self.tun_if = self.pg0
493 self.multi_params = []
494 self.pg0.generate_remote_hosts(10)
495 self.pg0.configure_ipv4_neighbors()
498 p = copy.copy(self.ipv4_params)
500 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
501 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
502 p.scapy_tun_spi = p.scapy_tun_spi + ii
503 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
504 p.vpp_tun_spi = p.vpp_tun_spi + ii
506 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
507 p.scapy_tra_spi = p.scapy_tra_spi + ii
508 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
509 p.vpp_tra_spi = p.vpp_tra_spi + ii
510 p.tun_dst = self.pg0.remote_hosts[ii].ip4
512 self.multi_params.append(p)
513 self.config_network(p)
514 self.config_sa_tra(p)
515 self.config_protect(p)
518 super(TestIpsec4MultiTunIfEsp, self).tearDown()
520 def test_tun_44(self):
521 """Multiple IPSEC tunnel interfaces """
522 for p in self.multi_params:
523 self.verify_tun_44(p, count=127)
524 c = p.tun_if.get_rx_stats()
525 self.assertEqual(c['packets'], 127)
526 c = p.tun_if.get_tx_stats()
527 self.assertEqual(c['packets'], 127)
529 def test_tun_rr_44(self):
530 """ Round-robin packets acrros multiple interface """
532 for p in self.multi_params:
533 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
534 src=p.remote_tun_if_host,
535 dst=self.pg1.remote_ip4)
536 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
538 for rx, p in zip(rxs, self.multi_params):
539 self.verify_decrypted(p, [rx])
542 for p in self.multi_params:
543 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
544 dst=p.remote_tun_if_host)
545 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
547 for rx, p in zip(rxs, self.multi_params):
548 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
551 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
554 """ IPsec IPv4 Tunnel interface all Algos """
556 encryption_type = ESP
557 tun4_encrypt_node_name = "esp4-encrypt-tun"
558 tun4_decrypt_node_name = "esp4-decrypt-tun"
561 super(TestIpsec4TunIfEspAll, self).setUp()
563 self.tun_if = self.pg0
566 self.config_network(p)
567 self.config_sa_tra(p)
568 self.config_protect(p)
572 self.unconfig_protect(p)
573 self.unconfig_network(p)
576 super(TestIpsec4TunIfEspAll, self).tearDown()
580 # change the key and the SPI
583 p.crypt_key = b'X' + p.crypt_key[1:]
585 p.scapy_tun_sa_id += 1
588 p.tun_if.local_spi = p.vpp_tun_spi
589 p.tun_if.remote_spi = p.scapy_tun_spi
591 config_tun_params(p, self.encryption_type, p.tun_if)
593 p.tun_sa_out = VppIpsecSA(self,
600 self.vpp_esp_protocol,
603 p.tun_sa_in = VppIpsecSA(self,
610 self.vpp_esp_protocol,
613 p.tun_sa_in.add_vpp_config()
614 p.tun_sa_out.add_vpp_config()
616 self.config_protect(p)
617 np.tun_sa_out.remove_vpp_config()
618 np.tun_sa_in.remove_vpp_config()
619 self.logger.info(self.vapi.cli("sh ipsec sa"))
621 def test_tun_44(self):
622 """IPSEC tunnel all algos """
624 # foreach VPP crypto engine
625 engines = ["ia32", "ipsecmb", "openssl"]
627 # foreach crypto algorithm
628 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
629 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
630 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
631 IPSEC_API_INTEG_ALG_NONE),
632 'scapy-crypto': "AES-GCM",
633 'scapy-integ': "NULL",
634 'key': b"JPjyOWBeVEQiMe7h",
636 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
637 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
638 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
639 IPSEC_API_INTEG_ALG_NONE),
640 'scapy-crypto': "AES-GCM",
641 'scapy-integ': "NULL",
642 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
644 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
645 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
646 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
647 IPSEC_API_INTEG_ALG_NONE),
648 'scapy-crypto': "AES-GCM",
649 'scapy-integ': "NULL",
650 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
652 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
653 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
654 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
655 IPSEC_API_INTEG_ALG_SHA1_96),
656 'scapy-crypto': "AES-CBC",
657 'scapy-integ': "HMAC-SHA1-96",
659 'key': b"JPjyOWBeVEQiMe7h"},
660 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
661 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
662 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
663 IPSEC_API_INTEG_ALG_SHA_512_256),
664 'scapy-crypto': "AES-CBC",
665 'scapy-integ': "SHA2-512-256",
667 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
668 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
669 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
670 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
671 IPSEC_API_INTEG_ALG_SHA_256_128),
672 'scapy-crypto': "AES-CBC",
673 'scapy-integ': "SHA2-256-128",
675 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
676 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
677 IPSEC_API_CRYPTO_ALG_NONE),
678 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
679 IPSEC_API_INTEG_ALG_SHA1_96),
680 'scapy-crypto': "NULL",
681 'scapy-integ': "HMAC-SHA1-96",
683 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
685 for engine in engines:
686 self.vapi.cli("set crypto handler all %s" % engine)
689 # loop through each of the algorithms
692 # with self.subTest(algo=algo['scapy']):
695 p.auth_algo_vpp_id = algo['vpp-integ']
696 p.crypt_algo_vpp_id = algo['vpp-crypto']
697 p.crypt_algo = algo['scapy-crypto']
698 p.auth_algo = algo['scapy-integ']
699 p.crypt_key = algo['key']
700 p.salt = algo['salt']
706 self.verify_tun_44(p, count=127)
709 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
712 """ IPsec IPv4 Tunnel interface no Algos """
714 encryption_type = ESP
715 tun4_encrypt_node_name = "esp4-encrypt-tun"
716 tun4_decrypt_node_name = "esp4-decrypt-tun"
719 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
721 self.tun_if = self.pg0
723 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
724 IPSEC_API_INTEG_ALG_NONE)
728 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
729 IPSEC_API_CRYPTO_ALG_NONE)
730 p.crypt_algo = 'NULL'
734 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
736 def test_tun_44(self):
737 """ IPSec SA with NULL algos """
740 self.config_network(p)
741 self.config_sa_tra(p)
742 self.config_protect(p)
744 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
745 dst=p.remote_tun_if_host)
746 self.send_and_assert_no_replies(self.pg1, tx)
748 self.unconfig_protect(p)
750 self.unconfig_network(p)
753 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
756 """ IPsec IPv6 Multi Tunnel interface """
758 encryption_type = ESP
759 tun6_encrypt_node_name = "esp6-encrypt-tun"
760 tun6_decrypt_node_name = "esp6-decrypt-tun"
763 super(TestIpsec6MultiTunIfEsp, self).setUp()
765 self.tun_if = self.pg0
767 self.multi_params = []
768 self.pg0.generate_remote_hosts(10)
769 self.pg0.configure_ipv6_neighbors()
772 p = copy.copy(self.ipv6_params)
774 p.remote_tun_if_host = "1111::%d" % (ii + 1)
775 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
776 p.scapy_tun_spi = p.scapy_tun_spi + ii
777 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
778 p.vpp_tun_spi = p.vpp_tun_spi + ii
780 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
781 p.scapy_tra_spi = p.scapy_tra_spi + ii
782 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
783 p.vpp_tra_spi = p.vpp_tra_spi + ii
784 p.tun_dst = self.pg0.remote_hosts[ii].ip6
786 self.multi_params.append(p)
787 self.config_network(p)
788 self.config_sa_tra(p)
789 self.config_protect(p)
792 super(TestIpsec6MultiTunIfEsp, self).tearDown()
794 def test_tun_66(self):
795 """Multiple IPSEC tunnel interfaces """
796 for p in self.multi_params:
797 self.verify_tun_66(p, count=127)
798 c = p.tun_if.get_rx_stats()
799 self.assertEqual(c['packets'], 127)
800 c = p.tun_if.get_tx_stats()
801 self.assertEqual(c['packets'], 127)
804 class TestIpsecGreTebIfEsp(TemplateIpsec,
806 """ Ipsec GRE TEB ESP - TUN tests """
807 tun4_encrypt_node_name = "esp4-encrypt-tun"
808 tun4_decrypt_node_name = "esp4-decrypt-tun"
809 encryption_type = ESP
810 omac = "00:11:22:33:44:55"
812 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
814 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
815 sa.encrypt(IP(src=self.pg0.remote_ip4,
816 dst=self.pg0.local_ip4) /
818 Ether(dst=self.omac) /
819 IP(src="1.1.1.1", dst="1.1.1.2") /
820 UDP(sport=1144, dport=2233) /
821 Raw(b'X' * payload_size))
822 for i in range(count)]
824 def gen_pkts(self, sw_intf, src, dst, count=1,
826 return [Ether(dst=self.omac) /
827 IP(src="1.1.1.1", dst="1.1.1.2") /
828 UDP(sport=1144, dport=2233) /
829 Raw(b'X' * payload_size)
830 for i in range(count)]
832 def verify_decrypted(self, p, rxs):
834 self.assert_equal(rx[Ether].dst, self.omac)
835 self.assert_equal(rx[IP].dst, "1.1.1.2")
837 def verify_encrypted(self, p, sa, rxs):
840 pkt = sa.decrypt(rx[IP])
841 if not pkt.haslayer(IP):
842 pkt = IP(pkt[Raw].load)
843 self.assert_packet_checksums_valid(pkt)
844 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
845 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
846 self.assertTrue(pkt.haslayer(GRE))
848 self.assertEqual(e[Ether].dst, self.omac)
849 self.assertEqual(e[IP].dst, "1.1.1.2")
850 except (IndexError, AssertionError):
851 self.logger.debug(ppp("Unexpected packet:", rx))
853 self.logger.debug(ppp("Decrypted packet:", pkt))
859 super(TestIpsecGreTebIfEsp, self).setUp()
861 self.tun_if = self.pg0
865 bd1 = VppBridgeDomain(self, 1)
868 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
869 p.auth_algo_vpp_id, p.auth_key,
870 p.crypt_algo_vpp_id, p.crypt_key,
871 self.vpp_esp_protocol,
874 p.tun_sa_out.add_vpp_config()
876 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
877 p.auth_algo_vpp_id, p.auth_key,
878 p.crypt_algo_vpp_id, p.crypt_key,
879 self.vpp_esp_protocol,
882 p.tun_sa_in.add_vpp_config()
884 p.tun_if = VppGreInterface(self,
887 type=(VppEnum.vl_api_gre_tunnel_type_t.
888 GRE_API_TUNNEL_TYPE_TEB))
889 p.tun_if.add_vpp_config()
891 p.tun_protect = VppIpsecTunProtect(self,
896 p.tun_protect.add_vpp_config()
899 p.tun_if.config_ip4()
900 config_tun_params(p, self.encryption_type, p.tun_if)
902 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
903 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
905 self.vapi.cli("clear ipsec sa")
906 self.vapi.cli("sh adj")
907 self.vapi.cli("sh ipsec tun")
911 p.tun_if.unconfig_ip4()
912 super(TestIpsecGreTebIfEsp, self).tearDown()
915 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
917 """ Ipsec GRE TEB ESP - TUN tests """
918 tun4_encrypt_node_name = "esp4-encrypt-tun"
919 tun4_decrypt_node_name = "esp4-decrypt-tun"
920 encryption_type = ESP
921 omac = "00:11:22:33:44:55"
923 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
925 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
926 sa.encrypt(IP(src=self.pg0.remote_ip4,
927 dst=self.pg0.local_ip4) /
929 Ether(dst=self.omac) /
930 IP(src="1.1.1.1", dst="1.1.1.2") /
931 UDP(sport=1144, dport=2233) /
932 Raw(b'X' * payload_size))
933 for i in range(count)]
935 def gen_pkts(self, sw_intf, src, dst, count=1,
937 return [Ether(dst=self.omac) /
939 IP(src="1.1.1.1", dst="1.1.1.2") /
940 UDP(sport=1144, dport=2233) /
941 Raw(b'X' * payload_size)
942 for i in range(count)]
944 def verify_decrypted(self, p, rxs):
946 self.assert_equal(rx[Ether].dst, self.omac)
947 self.assert_equal(rx[Dot1Q].vlan, 11)
948 self.assert_equal(rx[IP].dst, "1.1.1.2")
950 def verify_encrypted(self, p, sa, rxs):
953 pkt = sa.decrypt(rx[IP])
954 if not pkt.haslayer(IP):
955 pkt = IP(pkt[Raw].load)
956 self.assert_packet_checksums_valid(pkt)
957 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
958 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
959 self.assertTrue(pkt.haslayer(GRE))
961 self.assertEqual(e[Ether].dst, self.omac)
962 self.assertFalse(e.haslayer(Dot1Q))
963 self.assertEqual(e[IP].dst, "1.1.1.2")
964 except (IndexError, AssertionError):
965 self.logger.debug(ppp("Unexpected packet:", rx))
967 self.logger.debug(ppp("Decrypted packet:", pkt))
973 super(TestIpsecGreTebVlanIfEsp, self).setUp()
975 self.tun_if = self.pg0
979 bd1 = VppBridgeDomain(self, 1)
982 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
983 self.vapi.l2_interface_vlan_tag_rewrite(
984 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
986 self.pg1_11.admin_up()
988 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
989 p.auth_algo_vpp_id, p.auth_key,
990 p.crypt_algo_vpp_id, p.crypt_key,
991 self.vpp_esp_protocol,
994 p.tun_sa_out.add_vpp_config()
996 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
997 p.auth_algo_vpp_id, p.auth_key,
998 p.crypt_algo_vpp_id, p.crypt_key,
999 self.vpp_esp_protocol,
1000 self.pg0.remote_ip4,
1002 p.tun_sa_in.add_vpp_config()
1004 p.tun_if = VppGreInterface(self,
1006 self.pg0.remote_ip4,
1007 type=(VppEnum.vl_api_gre_tunnel_type_t.
1008 GRE_API_TUNNEL_TYPE_TEB))
1009 p.tun_if.add_vpp_config()
1011 p.tun_protect = VppIpsecTunProtect(self,
1016 p.tun_protect.add_vpp_config()
1019 p.tun_if.config_ip4()
1020 config_tun_params(p, self.encryption_type, p.tun_if)
1022 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1023 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1025 self.vapi.cli("clear ipsec sa")
1028 p = self.ipv4_params
1029 p.tun_if.unconfig_ip4()
1030 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1031 self.pg1_11.admin_down()
1032 self.pg1_11.remove_vpp_config()
1035 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1037 """ Ipsec GRE TEB ESP - Tra tests """
1038 tun4_encrypt_node_name = "esp4-encrypt-tun"
1039 tun4_decrypt_node_name = "esp4-decrypt-tun"
1040 encryption_type = ESP
1041 omac = "00:11:22:33:44:55"
1043 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1045 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1046 sa.encrypt(IP(src=self.pg0.remote_ip4,
1047 dst=self.pg0.local_ip4) /
1049 Ether(dst=self.omac) /
1050 IP(src="1.1.1.1", dst="1.1.1.2") /
1051 UDP(sport=1144, dport=2233) /
1052 Raw(b'X' * payload_size))
1053 for i in range(count)]
1055 def gen_pkts(self, sw_intf, src, dst, count=1,
1057 return [Ether(dst=self.omac) /
1058 IP(src="1.1.1.1", dst="1.1.1.2") /
1059 UDP(sport=1144, dport=2233) /
1060 Raw(b'X' * payload_size)
1061 for i in range(count)]
1063 def verify_decrypted(self, p, rxs):
1065 self.assert_equal(rx[Ether].dst, self.omac)
1066 self.assert_equal(rx[IP].dst, "1.1.1.2")
1068 def verify_encrypted(self, p, sa, rxs):
1071 pkt = sa.decrypt(rx[IP])
1072 if not pkt.haslayer(IP):
1073 pkt = IP(pkt[Raw].load)
1074 self.assert_packet_checksums_valid(pkt)
1075 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1076 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1077 self.assertTrue(pkt.haslayer(GRE))
1079 self.assertEqual(e[Ether].dst, self.omac)
1080 self.assertEqual(e[IP].dst, "1.1.1.2")
1081 except (IndexError, AssertionError):
1082 self.logger.debug(ppp("Unexpected packet:", rx))
1084 self.logger.debug(ppp("Decrypted packet:", pkt))
1090 super(TestIpsecGreTebIfEspTra, self).setUp()
1092 self.tun_if = self.pg0
1094 p = self.ipv4_params
1096 bd1 = VppBridgeDomain(self, 1)
1097 bd1.add_vpp_config()
1099 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1100 p.auth_algo_vpp_id, p.auth_key,
1101 p.crypt_algo_vpp_id, p.crypt_key,
1102 self.vpp_esp_protocol)
1103 p.tun_sa_out.add_vpp_config()
1105 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1106 p.auth_algo_vpp_id, p.auth_key,
1107 p.crypt_algo_vpp_id, p.crypt_key,
1108 self.vpp_esp_protocol)
1109 p.tun_sa_in.add_vpp_config()
1111 p.tun_if = VppGreInterface(self,
1113 self.pg0.remote_ip4,
1114 type=(VppEnum.vl_api_gre_tunnel_type_t.
1115 GRE_API_TUNNEL_TYPE_TEB))
1116 p.tun_if.add_vpp_config()
1118 p.tun_protect = VppIpsecTunProtect(self,
1123 p.tun_protect.add_vpp_config()
1126 p.tun_if.config_ip4()
1127 config_tra_params(p, self.encryption_type, p.tun_if)
1129 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1130 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1132 self.vapi.cli("clear ipsec sa")
1135 p = self.ipv4_params
1136 p.tun_if.unconfig_ip4()
1137 super(TestIpsecGreTebIfEspTra, self).tearDown()
1140 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1142 """ Ipsec GRE TEB UDP ESP - Tra tests """
1143 tun4_encrypt_node_name = "esp4-encrypt-tun"
1144 tun4_decrypt_node_name = "esp4-decrypt-tun"
1145 encryption_type = ESP
1146 omac = "00:11:22:33:44:55"
1148 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1150 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1151 sa.encrypt(IP(src=self.pg0.remote_ip4,
1152 dst=self.pg0.local_ip4) /
1154 Ether(dst=self.omac) /
1155 IP(src="1.1.1.1", dst="1.1.1.2") /
1156 UDP(sport=1144, dport=2233) /
1157 Raw(b'X' * payload_size))
1158 for i in range(count)]
1160 def gen_pkts(self, sw_intf, src, dst, count=1,
1162 return [Ether(dst=self.omac) /
1163 IP(src="1.1.1.1", dst="1.1.1.2") /
1164 UDP(sport=1144, dport=2233) /
1165 Raw(b'X' * payload_size)
1166 for i in range(count)]
1168 def verify_decrypted(self, p, rxs):
1170 self.assert_equal(rx[Ether].dst, self.omac)
1171 self.assert_equal(rx[IP].dst, "1.1.1.2")
1173 def verify_encrypted(self, p, sa, rxs):
1175 self.assertTrue(rx.haslayer(UDP))
1176 self.assertEqual(rx[UDP].dport, 4545)
1177 self.assertEqual(rx[UDP].sport, 5454)
1179 pkt = sa.decrypt(rx[IP])
1180 if not pkt.haslayer(IP):
1181 pkt = IP(pkt[Raw].load)
1182 self.assert_packet_checksums_valid(pkt)
1183 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1184 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1185 self.assertTrue(pkt.haslayer(GRE))
1187 self.assertEqual(e[Ether].dst, self.omac)
1188 self.assertEqual(e[IP].dst, "1.1.1.2")
1189 except (IndexError, AssertionError):
1190 self.logger.debug(ppp("Unexpected packet:", rx))
1192 self.logger.debug(ppp("Decrypted packet:", pkt))
1198 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1200 self.tun_if = self.pg0
1202 p = self.ipv4_params
1203 p = self.ipv4_params
1204 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1205 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1206 p.nat_header = UDP(sport=5454, dport=4545)
1208 bd1 = VppBridgeDomain(self, 1)
1209 bd1.add_vpp_config()
1211 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1212 p.auth_algo_vpp_id, p.auth_key,
1213 p.crypt_algo_vpp_id, p.crypt_key,
1214 self.vpp_esp_protocol,
1218 p.tun_sa_out.add_vpp_config()
1220 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1221 p.auth_algo_vpp_id, p.auth_key,
1222 p.crypt_algo_vpp_id, p.crypt_key,
1223 self.vpp_esp_protocol,
1225 VppEnum.vl_api_ipsec_sad_flags_t.
1226 IPSEC_API_SAD_FLAG_IS_INBOUND),
1229 p.tun_sa_in.add_vpp_config()
1231 p.tun_if = VppGreInterface(self,
1233 self.pg0.remote_ip4,
1234 type=(VppEnum.vl_api_gre_tunnel_type_t.
1235 GRE_API_TUNNEL_TYPE_TEB))
1236 p.tun_if.add_vpp_config()
1238 p.tun_protect = VppIpsecTunProtect(self,
1243 p.tun_protect.add_vpp_config()
1246 p.tun_if.config_ip4()
1247 config_tra_params(p, self.encryption_type, p.tun_if)
1249 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1250 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1252 self.vapi.cli("clear ipsec sa")
1253 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1256 p = self.ipv4_params
1257 p.tun_if.unconfig_ip4()
1258 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1261 class TestIpsecGreIfEsp(TemplateIpsec,
1263 """ Ipsec GRE ESP - TUN tests """
1264 tun4_encrypt_node_name = "esp4-encrypt-tun"
1265 tun4_decrypt_node_name = "esp4-decrypt-tun"
1266 encryption_type = ESP
1268 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1270 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1271 sa.encrypt(IP(src=self.pg0.remote_ip4,
1272 dst=self.pg0.local_ip4) /
1274 IP(src=self.pg1.local_ip4,
1275 dst=self.pg1.remote_ip4) /
1276 UDP(sport=1144, dport=2233) /
1277 Raw(b'X' * payload_size))
1278 for i in range(count)]
1280 def gen_pkts(self, sw_intf, src, dst, count=1,
1282 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1283 IP(src="1.1.1.1", dst="1.1.1.2") /
1284 UDP(sport=1144, dport=2233) /
1285 Raw(b'X' * payload_size)
1286 for i in range(count)]
1288 def verify_decrypted(self, p, rxs):
1290 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1291 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1293 def verify_encrypted(self, p, sa, rxs):
1296 pkt = sa.decrypt(rx[IP])
1297 if not pkt.haslayer(IP):
1298 pkt = IP(pkt[Raw].load)
1299 self.assert_packet_checksums_valid(pkt)
1300 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1301 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1302 self.assertTrue(pkt.haslayer(GRE))
1304 self.assertEqual(e[IP].dst, "1.1.1.2")
1305 except (IndexError, AssertionError):
1306 self.logger.debug(ppp("Unexpected packet:", rx))
1308 self.logger.debug(ppp("Decrypted packet:", pkt))
1314 super(TestIpsecGreIfEsp, self).setUp()
1316 self.tun_if = self.pg0
1318 p = self.ipv4_params
1320 bd1 = VppBridgeDomain(self, 1)
1321 bd1.add_vpp_config()
1323 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1324 p.auth_algo_vpp_id, p.auth_key,
1325 p.crypt_algo_vpp_id, p.crypt_key,
1326 self.vpp_esp_protocol,
1328 self.pg0.remote_ip4)
1329 p.tun_sa_out.add_vpp_config()
1331 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1332 p.auth_algo_vpp_id, p.auth_key,
1333 p.crypt_algo_vpp_id, p.crypt_key,
1334 self.vpp_esp_protocol,
1335 self.pg0.remote_ip4,
1337 p.tun_sa_in.add_vpp_config()
1339 p.tun_if = VppGreInterface(self,
1341 self.pg0.remote_ip4)
1342 p.tun_if.add_vpp_config()
1344 p.tun_protect = VppIpsecTunProtect(self,
1348 p.tun_protect.add_vpp_config()
1351 p.tun_if.config_ip4()
1352 config_tun_params(p, self.encryption_type, p.tun_if)
1354 VppIpRoute(self, "1.1.1.2", 32,
1355 [VppRoutePath(p.tun_if.remote_ip4,
1356 0xffffffff)]).add_vpp_config()
1359 p = self.ipv4_params
1360 p.tun_if.unconfig_ip4()
1361 super(TestIpsecGreIfEsp, self).tearDown()
1364 class TestIpsecGreIfEspTra(TemplateIpsec,
1366 """ Ipsec GRE ESP - TRA tests """
1367 tun4_encrypt_node_name = "esp4-encrypt-tun"
1368 tun4_decrypt_node_name = "esp4-decrypt-tun"
1369 encryption_type = ESP
1371 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1373 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1374 sa.encrypt(IP(src=self.pg0.remote_ip4,
1375 dst=self.pg0.local_ip4) /
1377 IP(src=self.pg1.local_ip4,
1378 dst=self.pg1.remote_ip4) /
1379 UDP(sport=1144, dport=2233) /
1380 Raw(b'X' * payload_size))
1381 for i in range(count)]
1383 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1385 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1386 sa.encrypt(IP(src=self.pg0.remote_ip4,
1387 dst=self.pg0.local_ip4) /
1389 UDP(sport=1144, dport=2233) /
1390 Raw(b'X' * payload_size))
1391 for i in range(count)]
1393 def gen_pkts(self, sw_intf, src, dst, count=1,
1395 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1396 IP(src="1.1.1.1", dst="1.1.1.2") /
1397 UDP(sport=1144, dport=2233) /
1398 Raw(b'X' * payload_size)
1399 for i in range(count)]
1401 def verify_decrypted(self, p, rxs):
1403 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1404 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1406 def verify_encrypted(self, p, sa, rxs):
1409 pkt = sa.decrypt(rx[IP])
1410 if not pkt.haslayer(IP):
1411 pkt = IP(pkt[Raw].load)
1412 self.assert_packet_checksums_valid(pkt)
1413 self.assertTrue(pkt.haslayer(GRE))
1415 self.assertEqual(e[IP].dst, "1.1.1.2")
1416 except (IndexError, AssertionError):
1417 self.logger.debug(ppp("Unexpected packet:", rx))
1419 self.logger.debug(ppp("Decrypted packet:", pkt))
1425 super(TestIpsecGreIfEspTra, self).setUp()
1427 self.tun_if = self.pg0
1429 p = self.ipv4_params
1431 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1432 p.auth_algo_vpp_id, p.auth_key,
1433 p.crypt_algo_vpp_id, p.crypt_key,
1434 self.vpp_esp_protocol)
1435 p.tun_sa_out.add_vpp_config()
1437 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1438 p.auth_algo_vpp_id, p.auth_key,
1439 p.crypt_algo_vpp_id, p.crypt_key,
1440 self.vpp_esp_protocol)
1441 p.tun_sa_in.add_vpp_config()
1443 p.tun_if = VppGreInterface(self,
1445 self.pg0.remote_ip4)
1446 p.tun_if.add_vpp_config()
1448 p.tun_protect = VppIpsecTunProtect(self,
1452 p.tun_protect.add_vpp_config()
1455 p.tun_if.config_ip4()
1456 config_tra_params(p, self.encryption_type, p.tun_if)
1458 VppIpRoute(self, "1.1.1.2", 32,
1459 [VppRoutePath(p.tun_if.remote_ip4,
1460 0xffffffff)]).add_vpp_config()
1463 p = self.ipv4_params
1464 p.tun_if.unconfig_ip4()
1465 super(TestIpsecGreIfEspTra, self).tearDown()
1467 def test_gre_non_ip(self):
1468 p = self.ipv4_params
1469 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1470 src=p.remote_tun_if_host,
1471 dst=self.pg1.remote_ip6)
1472 self.send_and_assert_no_replies(self.tun_if, tx)
1473 node_name = ('/err/%s/unsupported payload' %
1474 self.tun4_decrypt_node_name)
1475 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1478 class TestIpsecGre6IfEspTra(TemplateIpsec,
1480 """ Ipsec GRE ESP - TRA tests """
1481 tun6_encrypt_node_name = "esp6-encrypt-tun"
1482 tun6_decrypt_node_name = "esp6-decrypt-tun"
1483 encryption_type = ESP
1485 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1487 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1488 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1489 dst=self.pg0.local_ip6) /
1491 IPv6(src=self.pg1.local_ip6,
1492 dst=self.pg1.remote_ip6) /
1493 UDP(sport=1144, dport=2233) /
1494 Raw(b'X' * payload_size))
1495 for i in range(count)]
1497 def gen_pkts6(self, sw_intf, src, dst, count=1,
1499 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1500 IPv6(src="1::1", dst="1::2") /
1501 UDP(sport=1144, dport=2233) /
1502 Raw(b'X' * payload_size)
1503 for i in range(count)]
1505 def verify_decrypted6(self, p, rxs):
1507 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1508 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1510 def verify_encrypted6(self, p, sa, rxs):
1513 pkt = sa.decrypt(rx[IPv6])
1514 if not pkt.haslayer(IPv6):
1515 pkt = IPv6(pkt[Raw].load)
1516 self.assert_packet_checksums_valid(pkt)
1517 self.assertTrue(pkt.haslayer(GRE))
1519 self.assertEqual(e[IPv6].dst, "1::2")
1520 except (IndexError, AssertionError):
1521 self.logger.debug(ppp("Unexpected packet:", rx))
1523 self.logger.debug(ppp("Decrypted packet:", pkt))
1529 super(TestIpsecGre6IfEspTra, self).setUp()
1531 self.tun_if = self.pg0
1533 p = self.ipv6_params
1535 bd1 = VppBridgeDomain(self, 1)
1536 bd1.add_vpp_config()
1538 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1539 p.auth_algo_vpp_id, p.auth_key,
1540 p.crypt_algo_vpp_id, p.crypt_key,
1541 self.vpp_esp_protocol)
1542 p.tun_sa_out.add_vpp_config()
1544 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1545 p.auth_algo_vpp_id, p.auth_key,
1546 p.crypt_algo_vpp_id, p.crypt_key,
1547 self.vpp_esp_protocol)
1548 p.tun_sa_in.add_vpp_config()
1550 p.tun_if = VppGreInterface(self,
1552 self.pg0.remote_ip6)
1553 p.tun_if.add_vpp_config()
1555 p.tun_protect = VppIpsecTunProtect(self,
1559 p.tun_protect.add_vpp_config()
1562 p.tun_if.config_ip6()
1563 config_tra_params(p, self.encryption_type, p.tun_if)
1565 r = VppIpRoute(self, "1::2", 128,
1566 [VppRoutePath(p.tun_if.remote_ip6,
1568 proto=DpoProto.DPO_PROTO_IP6)])
1572 p = self.ipv6_params
1573 p.tun_if.unconfig_ip6()
1574 super(TestIpsecGre6IfEspTra, self).tearDown()
1577 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1578 """ Ipsec mGRE ESP v4 TRA tests """
1579 tun4_encrypt_node_name = "esp4-encrypt-tun"
1580 tun4_decrypt_node_name = "esp4-decrypt-tun"
1581 encryption_type = ESP
1583 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1585 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1586 sa.encrypt(IP(src=p.tun_dst,
1587 dst=self.pg0.local_ip4) /
1589 IP(src=self.pg1.local_ip4,
1590 dst=self.pg1.remote_ip4) /
1591 UDP(sport=1144, dport=2233) /
1592 Raw(b'X' * payload_size))
1593 for i in range(count)]
1595 def gen_pkts(self, sw_intf, src, dst, count=1,
1597 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1598 IP(src="1.1.1.1", dst=dst) /
1599 UDP(sport=1144, dport=2233) /
1600 Raw(b'X' * payload_size)
1601 for i in range(count)]
1603 def verify_decrypted(self, p, rxs):
1605 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1606 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1608 def verify_encrypted(self, p, sa, rxs):
1611 pkt = sa.decrypt(rx[IP])
1612 if not pkt.haslayer(IP):
1613 pkt = IP(pkt[Raw].load)
1614 self.assert_packet_checksums_valid(pkt)
1615 self.assertTrue(pkt.haslayer(GRE))
1617 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1618 except (IndexError, AssertionError):
1619 self.logger.debug(ppp("Unexpected packet:", rx))
1621 self.logger.debug(ppp("Decrypted packet:", pkt))
1627 super(TestIpsecMGreIfEspTra4, self).setUp()
1630 self.tun_if = self.pg0
1631 p = self.ipv4_params
1632 p.tun_if = VppGreInterface(self,
1635 mode=(VppEnum.vl_api_tunnel_mode_t.
1636 TUNNEL_API_MODE_MP))
1637 p.tun_if.add_vpp_config()
1639 p.tun_if.config_ip4()
1640 p.tun_if.generate_remote_hosts(N_NHS)
1641 self.pg0.generate_remote_hosts(N_NHS)
1642 self.pg0.configure_ipv4_neighbors()
1644 # setup some SAs for several next-hops on the interface
1645 self.multi_params = []
1647 for ii in range(N_NHS):
1648 p = copy.copy(self.ipv4_params)
1650 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1651 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1652 p.scapy_tun_spi = p.scapy_tun_spi + ii
1653 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1654 p.vpp_tun_spi = p.vpp_tun_spi + ii
1656 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1657 p.scapy_tra_spi = p.scapy_tra_spi + ii
1658 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1659 p.vpp_tra_spi = p.vpp_tra_spi + ii
1660 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1661 p.auth_algo_vpp_id, p.auth_key,
1662 p.crypt_algo_vpp_id, p.crypt_key,
1663 self.vpp_esp_protocol)
1664 p.tun_sa_out.add_vpp_config()
1666 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1667 p.auth_algo_vpp_id, p.auth_key,
1668 p.crypt_algo_vpp_id, p.crypt_key,
1669 self.vpp_esp_protocol)
1670 p.tun_sa_in.add_vpp_config()
1672 p.tun_protect = VppIpsecTunProtect(
1677 nh=p.tun_if.remote_hosts[ii].ip4)
1678 p.tun_protect.add_vpp_config()
1679 config_tra_params(p, self.encryption_type, p.tun_if)
1680 self.multi_params.append(p)
1682 VppIpRoute(self, p.remote_tun_if_host, 32,
1683 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1684 p.tun_if.sw_if_index)]).add_vpp_config()
1686 # in this v4 variant add the teibs after the protect
1687 p.teib = VppTeib(self, p.tun_if,
1688 p.tun_if.remote_hosts[ii].ip4,
1689 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1690 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1691 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1694 p = self.ipv4_params
1695 p.tun_if.unconfig_ip4()
1696 super(TestIpsecMGreIfEspTra4, self).tearDown()
1698 def test_tun_44(self):
1701 for p in self.multi_params:
1702 self.verify_tun_44(p, count=N_PKTS)
1703 p.teib.remove_vpp_config()
1704 self.verify_tun_dropped_44(p, count=N_PKTS)
1705 p.teib.add_vpp_config()
1706 self.verify_tun_44(p, count=N_PKTS)
1709 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1710 """ Ipsec mGRE ESP v6 TRA tests """
1711 tun6_encrypt_node_name = "esp6-encrypt-tun"
1712 tun6_decrypt_node_name = "esp6-decrypt-tun"
1713 encryption_type = ESP
1715 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1717 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1718 sa.encrypt(IPv6(src=p.tun_dst,
1719 dst=self.pg0.local_ip6) /
1721 IPv6(src=self.pg1.local_ip6,
1722 dst=self.pg1.remote_ip6) /
1723 UDP(sport=1144, dport=2233) /
1724 Raw(b'X' * payload_size))
1725 for i in range(count)]
1727 def gen_pkts6(self, sw_intf, src, dst, count=1,
1729 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1730 IPv6(src="1::1", dst=dst) /
1731 UDP(sport=1144, dport=2233) /
1732 Raw(b'X' * payload_size)
1733 for i in range(count)]
1735 def verify_decrypted6(self, p, rxs):
1737 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1738 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1740 def verify_encrypted6(self, p, sa, rxs):
1743 pkt = sa.decrypt(rx[IPv6])
1744 if not pkt.haslayer(IPv6):
1745 pkt = IPv6(pkt[Raw].load)
1746 self.assert_packet_checksums_valid(pkt)
1747 self.assertTrue(pkt.haslayer(GRE))
1749 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1750 except (IndexError, AssertionError):
1751 self.logger.debug(ppp("Unexpected packet:", rx))
1753 self.logger.debug(ppp("Decrypted packet:", pkt))
1759 super(TestIpsecMGreIfEspTra6, self).setUp()
1761 self.vapi.cli("set logging class ipsec level debug")
1764 self.tun_if = self.pg0
1765 p = self.ipv6_params
1766 p.tun_if = VppGreInterface(self,
1769 mode=(VppEnum.vl_api_tunnel_mode_t.
1770 TUNNEL_API_MODE_MP))
1771 p.tun_if.add_vpp_config()
1773 p.tun_if.config_ip6()
1774 p.tun_if.generate_remote_hosts(N_NHS)
1775 self.pg0.generate_remote_hosts(N_NHS)
1776 self.pg0.configure_ipv6_neighbors()
1778 # setup some SAs for several next-hops on the interface
1779 self.multi_params = []
1781 for ii in range(N_NHS):
1782 p = copy.copy(self.ipv6_params)
1784 p.remote_tun_if_host = "1::%d" % (ii + 1)
1785 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1786 p.scapy_tun_spi = p.scapy_tun_spi + ii
1787 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1788 p.vpp_tun_spi = p.vpp_tun_spi + ii
1790 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1791 p.scapy_tra_spi = p.scapy_tra_spi + ii
1792 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1793 p.vpp_tra_spi = p.vpp_tra_spi + ii
1794 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1795 p.auth_algo_vpp_id, p.auth_key,
1796 p.crypt_algo_vpp_id, p.crypt_key,
1797 self.vpp_esp_protocol)
1798 p.tun_sa_out.add_vpp_config()
1800 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1801 p.auth_algo_vpp_id, p.auth_key,
1802 p.crypt_algo_vpp_id, p.crypt_key,
1803 self.vpp_esp_protocol)
1804 p.tun_sa_in.add_vpp_config()
1806 # in this v6 variant add the teibs first then the protection
1807 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1808 VppTeib(self, p.tun_if,
1809 p.tun_if.remote_hosts[ii].ip6,
1810 p.tun_dst).add_vpp_config()
1812 p.tun_protect = VppIpsecTunProtect(
1817 nh=p.tun_if.remote_hosts[ii].ip6)
1818 p.tun_protect.add_vpp_config()
1819 config_tra_params(p, self.encryption_type, p.tun_if)
1820 self.multi_params.append(p)
1822 VppIpRoute(self, p.remote_tun_if_host, 128,
1823 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1824 p.tun_if.sw_if_index)]).add_vpp_config()
1825 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1827 self.logger.info(self.vapi.cli("sh log"))
1828 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1829 self.logger.info(self.vapi.cli("sh adj 41"))
1832 p = self.ipv6_params
1833 p.tun_if.unconfig_ip6()
1834 super(TestIpsecMGreIfEspTra6, self).tearDown()
1836 def test_tun_66(self):
1838 for p in self.multi_params:
1839 self.verify_tun_66(p, count=63)
1842 class TestIpsec4TunProtect(TemplateIpsec,
1843 TemplateIpsec4TunProtect,
1845 """ IPsec IPv4 Tunnel protect - transport mode"""
1848 super(TestIpsec4TunProtect, self).setUp()
1850 self.tun_if = self.pg0
1853 super(TestIpsec4TunProtect, self).tearDown()
1855 def test_tun_44(self):
1856 """IPSEC tunnel protect"""
1858 p = self.ipv4_params
1860 self.config_network(p)
1861 self.config_sa_tra(p)
1862 self.config_protect(p)
1864 self.verify_tun_44(p, count=127)
1865 c = p.tun_if.get_rx_stats()
1866 self.assertEqual(c['packets'], 127)
1867 c = p.tun_if.get_tx_stats()
1868 self.assertEqual(c['packets'], 127)
1870 self.vapi.cli("clear ipsec sa")
1871 self.verify_tun_64(p, count=127)
1872 c = p.tun_if.get_rx_stats()
1873 self.assertEqual(c['packets'], 254)
1874 c = p.tun_if.get_tx_stats()
1875 self.assertEqual(c['packets'], 254)
1877 # rekey - create new SAs and update the tunnel protection
1879 np.crypt_key = b'X' + p.crypt_key[1:]
1880 np.scapy_tun_spi += 100
1881 np.scapy_tun_sa_id += 1
1882 np.vpp_tun_spi += 100
1883 np.vpp_tun_sa_id += 1
1884 np.tun_if.local_spi = p.vpp_tun_spi
1885 np.tun_if.remote_spi = p.scapy_tun_spi
1887 self.config_sa_tra(np)
1888 self.config_protect(np)
1891 self.verify_tun_44(np, count=127)
1892 c = p.tun_if.get_rx_stats()
1893 self.assertEqual(c['packets'], 381)
1894 c = p.tun_if.get_tx_stats()
1895 self.assertEqual(c['packets'], 381)
1898 self.unconfig_protect(np)
1899 self.unconfig_sa(np)
1900 self.unconfig_network(p)
1903 class TestIpsec4TunProtectUdp(TemplateIpsec,
1904 TemplateIpsec4TunProtect,
1906 """ IPsec IPv4 Tunnel protect - transport mode"""
1909 super(TestIpsec4TunProtectUdp, self).setUp()
1911 self.tun_if = self.pg0
1913 p = self.ipv4_params
1914 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1915 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1916 p.nat_header = UDP(sport=4500, dport=4500)
1917 self.config_network(p)
1918 self.config_sa_tra(p)
1919 self.config_protect(p)
1922 p = self.ipv4_params
1923 self.unconfig_protect(p)
1925 self.unconfig_network(p)
1926 super(TestIpsec4TunProtectUdp, self).tearDown()
1928 def verify_encrypted(self, p, sa, rxs):
1929 # ensure encrypted packets are recieved with the default UDP ports
1931 self.assertEqual(rx[UDP].sport, 4500)
1932 self.assertEqual(rx[UDP].dport, 4500)
1933 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1935 def test_tun_44(self):
1936 """IPSEC UDP tunnel protect"""
1938 p = self.ipv4_params
1940 self.verify_tun_44(p, count=127)
1941 c = p.tun_if.get_rx_stats()
1942 self.assertEqual(c['packets'], 127)
1943 c = p.tun_if.get_tx_stats()
1944 self.assertEqual(c['packets'], 127)
1946 def test_keepalive(self):
1947 """ IPSEC NAT Keepalive """
1948 self.verify_keepalive(self.ipv4_params)
1951 class TestIpsec4TunProtectTun(TemplateIpsec,
1952 TemplateIpsec4TunProtect,
1954 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1956 encryption_type = ESP
1957 tun4_encrypt_node_name = "esp4-encrypt-tun"
1958 tun4_decrypt_node_name = "esp4-decrypt-tun"
1961 super(TestIpsec4TunProtectTun, self).setUp()
1963 self.tun_if = self.pg0
1966 super(TestIpsec4TunProtectTun, self).tearDown()
1968 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1970 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1971 sa.encrypt(IP(src=sw_intf.remote_ip4,
1972 dst=sw_intf.local_ip4) /
1973 IP(src=src, dst=dst) /
1974 UDP(sport=1144, dport=2233) /
1975 Raw(b'X' * payload_size))
1976 for i in range(count)]
1978 def gen_pkts(self, sw_intf, src, dst, count=1,
1980 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1981 IP(src=src, dst=dst) /
1982 UDP(sport=1144, dport=2233) /
1983 Raw(b'X' * payload_size)
1984 for i in range(count)]
1986 def verify_decrypted(self, p, rxs):
1988 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1989 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1990 self.assert_packet_checksums_valid(rx)
1992 def verify_encrypted(self, p, sa, rxs):
1995 pkt = sa.decrypt(rx[IP])
1996 if not pkt.haslayer(IP):
1997 pkt = IP(pkt[Raw].load)
1998 self.assert_packet_checksums_valid(pkt)
1999 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2000 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2001 inner = pkt[IP].payload
2002 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2004 except (IndexError, AssertionError):
2005 self.logger.debug(ppp("Unexpected packet:", rx))
2007 self.logger.debug(ppp("Decrypted packet:", pkt))
2012 def test_tun_44(self):
2013 """IPSEC tunnel protect """
2015 p = self.ipv4_params
2017 self.config_network(p)
2018 self.config_sa_tun(p)
2019 self.config_protect(p)
2021 # also add an output features on the tunnel and physical interface
2022 # so we test they still work
2023 r_all = AclRule(True,
2024 src_prefix="0.0.0.0/0",
2025 dst_prefix="0.0.0.0/0",
2027 a = VppAcl(self, [r_all]).add_vpp_config()
2029 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2030 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2032 self.verify_tun_44(p, count=127)
2034 c = p.tun_if.get_rx_stats()
2035 self.assertEqual(c['packets'], 127)
2036 c = p.tun_if.get_tx_stats()
2037 self.assertEqual(c['packets'], 127)
2039 # rekey - create new SAs and update the tunnel protection
2041 np.crypt_key = b'X' + p.crypt_key[1:]
2042 np.scapy_tun_spi += 100
2043 np.scapy_tun_sa_id += 1
2044 np.vpp_tun_spi += 100
2045 np.vpp_tun_sa_id += 1
2046 np.tun_if.local_spi = p.vpp_tun_spi
2047 np.tun_if.remote_spi = p.scapy_tun_spi
2049 self.config_sa_tun(np)
2050 self.config_protect(np)
2053 self.verify_tun_44(np, count=127)
2054 c = p.tun_if.get_rx_stats()
2055 self.assertEqual(c['packets'], 254)
2056 c = p.tun_if.get_tx_stats()
2057 self.assertEqual(c['packets'], 254)
2060 self.unconfig_protect(np)
2061 self.unconfig_sa(np)
2062 self.unconfig_network(p)
2065 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2066 TemplateIpsec4TunProtect,
2068 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2070 encryption_type = ESP
2071 tun4_encrypt_node_name = "esp4-encrypt-tun"
2072 tun4_decrypt_node_name = "esp4-decrypt-tun"
2075 super(TestIpsec4TunProtectTunDrop, self).setUp()
2077 self.tun_if = self.pg0
2080 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2082 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2084 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2085 sa.encrypt(IP(src=sw_intf.remote_ip4,
2087 IP(src=src, dst=dst) /
2088 UDP(sport=1144, dport=2233) /
2089 Raw(b'X' * payload_size))
2090 for i in range(count)]
2092 def test_tun_drop_44(self):
2093 """IPSEC tunnel protect bogus tunnel header """
2095 p = self.ipv4_params
2097 self.config_network(p)
2098 self.config_sa_tun(p)
2099 self.config_protect(p)
2101 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2102 src=p.remote_tun_if_host,
2103 dst=self.pg1.remote_ip4,
2105 self.send_and_assert_no_replies(self.tun_if, tx)
2108 self.unconfig_protect(p)
2110 self.unconfig_network(p)
2113 class TestIpsec6TunProtect(TemplateIpsec,
2114 TemplateIpsec6TunProtect,
2116 """ IPsec IPv6 Tunnel protect - transport mode"""
2118 encryption_type = ESP
2119 tun6_encrypt_node_name = "esp6-encrypt-tun"
2120 tun6_decrypt_node_name = "esp6-decrypt-tun"
2123 super(TestIpsec6TunProtect, self).setUp()
2125 self.tun_if = self.pg0
2128 super(TestIpsec6TunProtect, self).tearDown()
2130 def test_tun_66(self):
2131 """IPSEC tunnel protect 6o6"""
2133 p = self.ipv6_params
2135 self.config_network(p)
2136 self.config_sa_tra(p)
2137 self.config_protect(p)
2139 self.verify_tun_66(p, count=127)
2140 c = p.tun_if.get_rx_stats()
2141 self.assertEqual(c['packets'], 127)
2142 c = p.tun_if.get_tx_stats()
2143 self.assertEqual(c['packets'], 127)
2145 # rekey - create new SAs and update the tunnel protection
2147 np.crypt_key = b'X' + p.crypt_key[1:]
2148 np.scapy_tun_spi += 100
2149 np.scapy_tun_sa_id += 1
2150 np.vpp_tun_spi += 100
2151 np.vpp_tun_sa_id += 1
2152 np.tun_if.local_spi = p.vpp_tun_spi
2153 np.tun_if.remote_spi = p.scapy_tun_spi
2155 self.config_sa_tra(np)
2156 self.config_protect(np)
2159 self.verify_tun_66(np, count=127)
2160 c = p.tun_if.get_rx_stats()
2161 self.assertEqual(c['packets'], 254)
2162 c = p.tun_if.get_tx_stats()
2163 self.assertEqual(c['packets'], 254)
2165 # bounce the interface state
2166 p.tun_if.admin_down()
2167 self.verify_drop_tun_66(np, count=127)
2168 node = ('/err/ipsec6-tun-input/%s' %
2169 'ipsec packets received on disabled interface')
2170 self.assertEqual(127, self.statistics.get_err_counter(node))
2172 self.verify_tun_66(np, count=127)
2175 # 1) add two input SAs [old, new]
2176 # 2) swap output SA to [new]
2177 # 3) use only [new] input SA
2179 np3.crypt_key = b'Z' + p.crypt_key[1:]
2180 np3.scapy_tun_spi += 100
2181 np3.scapy_tun_sa_id += 1
2182 np3.vpp_tun_spi += 100
2183 np3.vpp_tun_sa_id += 1
2184 np3.tun_if.local_spi = p.vpp_tun_spi
2185 np3.tun_if.remote_spi = p.scapy_tun_spi
2187 self.config_sa_tra(np3)
2190 p.tun_protect.update_vpp_config(np.tun_sa_out,
2191 [np.tun_sa_in, np3.tun_sa_in])
2192 self.verify_tun_66(np, np, count=127)
2193 self.verify_tun_66(np3, np, count=127)
2196 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2197 [np.tun_sa_in, np3.tun_sa_in])
2198 self.verify_tun_66(np, np3, count=127)
2199 self.verify_tun_66(np3, np3, count=127)
2202 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2204 self.verify_tun_66(np3, np3, count=127)
2205 self.verify_drop_tun_66(np, count=127)
2207 c = p.tun_if.get_rx_stats()
2208 self.assertEqual(c['packets'], 127*9)
2209 c = p.tun_if.get_tx_stats()
2210 self.assertEqual(c['packets'], 127*8)
2211 self.unconfig_sa(np)
2214 self.unconfig_protect(np3)
2215 self.unconfig_sa(np3)
2216 self.unconfig_network(p)
2218 def test_tun_46(self):
2219 """IPSEC tunnel protect 4o6"""
2221 p = self.ipv6_params
2223 self.config_network(p)
2224 self.config_sa_tra(p)
2225 self.config_protect(p)
2227 self.verify_tun_46(p, count=127)
2228 c = p.tun_if.get_rx_stats()
2229 self.assertEqual(c['packets'], 127)
2230 c = p.tun_if.get_tx_stats()
2231 self.assertEqual(c['packets'], 127)
2234 self.unconfig_protect(p)
2236 self.unconfig_network(p)
2239 class TestIpsec6TunProtectTun(TemplateIpsec,
2240 TemplateIpsec6TunProtect,
2242 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2244 encryption_type = ESP
2245 tun6_encrypt_node_name = "esp6-encrypt-tun"
2246 tun6_decrypt_node_name = "esp6-decrypt-tun"
2249 super(TestIpsec6TunProtectTun, self).setUp()
2251 self.tun_if = self.pg0
2254 super(TestIpsec6TunProtectTun, self).tearDown()
2256 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2258 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2259 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2260 dst=sw_intf.local_ip6) /
2261 IPv6(src=src, dst=dst) /
2262 UDP(sport=1166, dport=2233) /
2263 Raw(b'X' * payload_size))
2264 for i in range(count)]
2266 def gen_pkts6(self, sw_intf, src, dst, count=1,
2268 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2269 IPv6(src=src, dst=dst) /
2270 UDP(sport=1166, dport=2233) /
2271 Raw(b'X' * payload_size)
2272 for i in range(count)]
2274 def verify_decrypted6(self, p, rxs):
2276 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2277 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2278 self.assert_packet_checksums_valid(rx)
2280 def verify_encrypted6(self, p, sa, rxs):
2283 pkt = sa.decrypt(rx[IPv6])
2284 if not pkt.haslayer(IPv6):
2285 pkt = IPv6(pkt[Raw].load)
2286 self.assert_packet_checksums_valid(pkt)
2287 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2288 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2289 inner = pkt[IPv6].payload
2290 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2292 except (IndexError, AssertionError):
2293 self.logger.debug(ppp("Unexpected packet:", rx))
2295 self.logger.debug(ppp("Decrypted packet:", pkt))
2300 def test_tun_66(self):
2301 """IPSEC tunnel protect """
2303 p = self.ipv6_params
2305 self.config_network(p)
2306 self.config_sa_tun(p)
2307 self.config_protect(p)
2309 self.verify_tun_66(p, count=127)
2311 c = p.tun_if.get_rx_stats()
2312 self.assertEqual(c['packets'], 127)
2313 c = p.tun_if.get_tx_stats()
2314 self.assertEqual(c['packets'], 127)
2316 # rekey - create new SAs and update the tunnel protection
2318 np.crypt_key = b'X' + p.crypt_key[1:]
2319 np.scapy_tun_spi += 100
2320 np.scapy_tun_sa_id += 1
2321 np.vpp_tun_spi += 100
2322 np.vpp_tun_sa_id += 1
2323 np.tun_if.local_spi = p.vpp_tun_spi
2324 np.tun_if.remote_spi = p.scapy_tun_spi
2326 self.config_sa_tun(np)
2327 self.config_protect(np)
2330 self.verify_tun_66(np, count=127)
2331 c = p.tun_if.get_rx_stats()
2332 self.assertEqual(c['packets'], 254)
2333 c = p.tun_if.get_tx_stats()
2334 self.assertEqual(c['packets'], 254)
2337 self.unconfig_protect(np)
2338 self.unconfig_sa(np)
2339 self.unconfig_network(p)
2342 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2343 TemplateIpsec6TunProtect,
2345 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2347 encryption_type = ESP
2348 tun6_encrypt_node_name = "esp6-encrypt-tun"
2349 tun6_decrypt_node_name = "esp6-decrypt-tun"
2352 super(TestIpsec6TunProtectTunDrop, self).setUp()
2354 self.tun_if = self.pg0
2357 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2359 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2361 # the IP destination of the revelaed packet does not match
2362 # that assigned to the tunnel
2363 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2364 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2366 IPv6(src=src, dst=dst) /
2367 UDP(sport=1144, dport=2233) /
2368 Raw(b'X' * payload_size))
2369 for i in range(count)]
2371 def test_tun_drop_66(self):
2372 """IPSEC 6 tunnel protect bogus tunnel header """
2374 p = self.ipv6_params
2376 self.config_network(p)
2377 self.config_sa_tun(p)
2378 self.config_protect(p)
2380 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2381 src=p.remote_tun_if_host,
2382 dst=self.pg1.remote_ip6,
2384 self.send_and_assert_no_replies(self.tun_if, tx)
2386 self.unconfig_protect(p)
2388 self.unconfig_network(p)
2391 class TemplateIpsecItf4(object):
2392 """ IPsec Interface IPv4 """
2394 encryption_type = ESP
2395 tun4_encrypt_node_name = "esp4-encrypt-tun"
2396 tun4_decrypt_node_name = "esp4-decrypt-tun"
2397 tun4_input_node = "ipsec4-tun-input"
2399 def config_sa_tun(self, p, src, dst):
2400 config_tun_params(p, self.encryption_type, None, src, dst)
2402 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2403 p.auth_algo_vpp_id, p.auth_key,
2404 p.crypt_algo_vpp_id, p.crypt_key,
2405 self.vpp_esp_protocol,
2408 p.tun_sa_out.add_vpp_config()
2410 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2411 p.auth_algo_vpp_id, p.auth_key,
2412 p.crypt_algo_vpp_id, p.crypt_key,
2413 self.vpp_esp_protocol,
2416 p.tun_sa_in.add_vpp_config()
2418 def config_protect(self, p):
2419 p.tun_protect = VppIpsecTunProtect(self,
2423 p.tun_protect.add_vpp_config()
2425 def config_network(self, p, instance=0xffffffff):
2426 p.tun_if = VppIpsecInterface(self, instance=instance)
2428 p.tun_if.add_vpp_config()
2430 p.tun_if.config_ip4()
2431 p.tun_if.config_ip6()
2433 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2434 [VppRoutePath(p.tun_if.remote_ip4,
2436 p.route.add_vpp_config()
2437 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2438 [VppRoutePath(p.tun_if.remote_ip6,
2440 proto=DpoProto.DPO_PROTO_IP6)])
2443 def unconfig_network(self, p):
2444 p.route.remove_vpp_config()
2445 p.tun_if.remove_vpp_config()
2447 def unconfig_protect(self, p):
2448 p.tun_protect.remove_vpp_config()
2450 def unconfig_sa(self, p):
2451 p.tun_sa_out.remove_vpp_config()
2452 p.tun_sa_in.remove_vpp_config()
2455 class TestIpsecItf4(TemplateIpsec,
2458 """ IPsec Interface IPv4 """
2461 super(TestIpsecItf4, self).setUp()
2463 self.tun_if = self.pg0
2466 super(TestIpsecItf4, self).tearDown()
2468 def test_tun_instance_44(self):
2469 p = self.ipv4_params
2470 self.config_network(p, instance=3)
2472 with self.assertRaises(CliFailedCommandError):
2473 self.vapi.cli("show interface ipsec0")
2475 output = self.vapi.cli("show interface ipsec3")
2476 self.assertTrue("unknown" not in output)
2478 self.unconfig_network(p)
2480 def test_tun_44(self):
2481 """IPSEC interface IPv4"""
2484 p = self.ipv4_params
2486 self.config_network(p)
2487 self.config_sa_tun(p,
2489 self.pg0.remote_ip4)
2490 self.config_protect(p)
2492 self.verify_tun_44(p, count=n_pkts)
2493 c = p.tun_if.get_rx_stats()
2494 self.assertEqual(c['packets'], n_pkts)
2495 c = p.tun_if.get_tx_stats()
2496 self.assertEqual(c['packets'], n_pkts)
2498 p.tun_if.admin_down()
2499 self.verify_tun_dropped_44(p, count=n_pkts)
2501 self.verify_tun_44(p, count=n_pkts)
2503 c = p.tun_if.get_rx_stats()
2504 self.assertEqual(c['packets'], 3*n_pkts)
2505 c = p.tun_if.get_tx_stats()
2506 self.assertEqual(c['packets'], 2*n_pkts)
2508 # it's a v6 packet when its encrypted
2509 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2511 self.verify_tun_64(p, count=n_pkts)
2512 c = p.tun_if.get_rx_stats()
2513 self.assertEqual(c['packets'], 4*n_pkts)
2514 c = p.tun_if.get_tx_stats()
2515 self.assertEqual(c['packets'], 3*n_pkts)
2517 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2519 self.vapi.cli("clear interfaces")
2521 # rekey - create new SAs and update the tunnel protection
2523 np.crypt_key = b'X' + p.crypt_key[1:]
2524 np.scapy_tun_spi += 100
2525 np.scapy_tun_sa_id += 1
2526 np.vpp_tun_spi += 100
2527 np.vpp_tun_sa_id += 1
2528 np.tun_if.local_spi = p.vpp_tun_spi
2529 np.tun_if.remote_spi = p.scapy_tun_spi
2531 self.config_sa_tun(np,
2533 self.pg0.remote_ip4)
2534 self.config_protect(np)
2537 self.verify_tun_44(np, count=n_pkts)
2538 c = p.tun_if.get_rx_stats()
2539 self.assertEqual(c['packets'], n_pkts)
2540 c = p.tun_if.get_tx_stats()
2541 self.assertEqual(c['packets'], n_pkts)
2544 self.unconfig_protect(np)
2545 self.unconfig_sa(np)
2546 self.unconfig_network(p)
2548 def test_tun_44_null(self):
2549 """IPSEC interface IPv4 NULL auth/crypto"""
2552 p = copy.copy(self.ipv4_params)
2554 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2555 IPSEC_API_INTEG_ALG_NONE)
2556 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2557 IPSEC_API_CRYPTO_ALG_NONE)
2558 p.crypt_algo = "NULL"
2559 p.auth_algo = "NULL"
2561 self.config_network(p)
2562 self.config_sa_tun(p,
2564 self.pg0.remote_ip4)
2565 self.config_protect(p)
2567 self.verify_tun_44(p, count=n_pkts)
2570 self.unconfig_protect(p)
2572 self.unconfig_network(p)
2575 class TestIpsecItf4MPLS(TemplateIpsec,
2578 """ IPsec Interface MPLSoIPv4 """
2580 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2583 super(TestIpsecItf4MPLS, self).setUp()
2585 self.tun_if = self.pg0
2588 super(TestIpsecItf4MPLS, self).tearDown()
2590 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2592 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2593 sa.encrypt(MPLS(label=44, ttl=3) /
2594 IP(src=src, dst=dst) /
2595 UDP(sport=1166, dport=2233) /
2596 Raw(b'X' * payload_size))
2597 for i in range(count)]
2599 def verify_encrypted(self, p, sa, rxs):
2602 pkt = sa.decrypt(rx[IP])
2603 if not pkt.haslayer(IP):
2604 pkt = IP(pkt[Raw].load)
2605 self.assert_packet_checksums_valid(pkt)
2606 self.assert_equal(pkt[MPLS].label, 44)
2607 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2608 except (IndexError, AssertionError):
2609 self.logger.debug(ppp("Unexpected packet:", rx))
2611 self.logger.debug(ppp("Decrypted packet:", pkt))
2616 def test_tun_mpls_o_ip4(self):
2617 """IPSEC interface MPLS over IPv4"""
2620 p = self.ipv4_params
2623 tbl = VppMplsTable(self, 0)
2624 tbl.add_vpp_config()
2626 self.config_network(p)
2627 # deag MPLS routes from the tunnel
2628 r4 = VppMplsRoute(self, 44, 1,
2630 self.pg1.remote_ip4,
2631 self.pg1.sw_if_index)]).add_vpp_config()
2632 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2633 p.tun_if.sw_if_index,
2634 labels=[VppMplsLabel(44)])])
2635 p.tun_if.enable_mpls()
2637 self.config_sa_tun(p,
2639 self.pg0.remote_ip4)
2640 self.config_protect(p)
2642 self.verify_tun_44(p, count=n_pkts)
2645 p.tun_if.disable_mpls()
2646 self.unconfig_protect(p)
2648 self.unconfig_network(p)
2651 class TemplateIpsecItf6(object):
2652 """ IPsec Interface IPv6 """
2654 encryption_type = ESP
2655 tun6_encrypt_node_name = "esp6-encrypt-tun"
2656 tun6_decrypt_node_name = "esp6-decrypt-tun"
2657 tun6_input_node = "ipsec6-tun-input"
2659 def config_sa_tun(self, p, src, dst):
2660 config_tun_params(p, self.encryption_type, None, src, dst)
2662 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2663 p.auth_algo_vpp_id, p.auth_key,
2664 p.crypt_algo_vpp_id, p.crypt_key,
2665 self.vpp_esp_protocol,
2668 p.tun_sa_out.add_vpp_config()
2670 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2671 p.auth_algo_vpp_id, p.auth_key,
2672 p.crypt_algo_vpp_id, p.crypt_key,
2673 self.vpp_esp_protocol,
2676 p.tun_sa_in.add_vpp_config()
2678 def config_protect(self, p):
2679 p.tun_protect = VppIpsecTunProtect(self,
2683 p.tun_protect.add_vpp_config()
2685 def config_network(self, p):
2686 p.tun_if = VppIpsecInterface(self)
2688 p.tun_if.add_vpp_config()
2690 p.tun_if.config_ip4()
2691 p.tun_if.config_ip6()
2693 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2694 [VppRoutePath(p.tun_if.remote_ip4,
2698 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2699 [VppRoutePath(p.tun_if.remote_ip6,
2701 proto=DpoProto.DPO_PROTO_IP6)])
2702 p.route.add_vpp_config()
2704 def unconfig_network(self, p):
2705 p.route.remove_vpp_config()
2706 p.tun_if.remove_vpp_config()
2708 def unconfig_protect(self, p):
2709 p.tun_protect.remove_vpp_config()
2711 def unconfig_sa(self, p):
2712 p.tun_sa_out.remove_vpp_config()
2713 p.tun_sa_in.remove_vpp_config()
2716 class TestIpsecItf6(TemplateIpsec,
2719 """ IPsec Interface IPv6 """
2722 super(TestIpsecItf6, self).setUp()
2724 self.tun_if = self.pg0
2727 super(TestIpsecItf6, self).tearDown()
2729 def test_tun_44(self):
2730 """IPSEC interface IPv6"""
2733 p = self.ipv6_params
2735 self.config_network(p)
2736 self.config_sa_tun(p,
2738 self.pg0.remote_ip6)
2739 self.config_protect(p)
2741 self.verify_tun_66(p, count=n_pkts)
2742 c = p.tun_if.get_rx_stats()
2743 self.assertEqual(c['packets'], n_pkts)
2744 c = p.tun_if.get_tx_stats()
2745 self.assertEqual(c['packets'], n_pkts)
2747 p.tun_if.admin_down()
2748 self.verify_drop_tun_66(p, count=n_pkts)
2750 self.verify_tun_66(p, count=n_pkts)
2752 c = p.tun_if.get_rx_stats()
2753 self.assertEqual(c['packets'], 3*n_pkts)
2754 c = p.tun_if.get_tx_stats()
2755 self.assertEqual(c['packets'], 2*n_pkts)
2757 # it's a v4 packet when its encrypted
2758 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2760 self.verify_tun_46(p, count=n_pkts)
2761 c = p.tun_if.get_rx_stats()
2762 self.assertEqual(c['packets'], 4*n_pkts)
2763 c = p.tun_if.get_tx_stats()
2764 self.assertEqual(c['packets'], 3*n_pkts)
2766 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2768 self.vapi.cli("clear interfaces")
2770 # rekey - create new SAs and update the tunnel protection
2772 np.crypt_key = b'X' + p.crypt_key[1:]
2773 np.scapy_tun_spi += 100
2774 np.scapy_tun_sa_id += 1
2775 np.vpp_tun_spi += 100
2776 np.vpp_tun_sa_id += 1
2777 np.tun_if.local_spi = p.vpp_tun_spi
2778 np.tun_if.remote_spi = p.scapy_tun_spi
2780 self.config_sa_tun(np,
2782 self.pg0.remote_ip6)
2783 self.config_protect(np)
2786 self.verify_tun_66(np, count=n_pkts)
2787 c = p.tun_if.get_rx_stats()
2788 self.assertEqual(c['packets'], n_pkts)
2789 c = p.tun_if.get_tx_stats()
2790 self.assertEqual(c['packets'], n_pkts)
2793 self.unconfig_protect(np)
2794 self.unconfig_sa(np)
2795 self.unconfig_network(p)
2798 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2799 """ Ipsec P2MP ESP v4 tests """
2800 tun4_encrypt_node_name = "esp4-encrypt-tun"
2801 tun4_decrypt_node_name = "esp4-decrypt-tun"
2802 encryption_type = ESP
2804 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2806 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2807 sa.encrypt(IP(src=self.pg1.local_ip4,
2808 dst=self.pg1.remote_ip4) /
2809 UDP(sport=1144, dport=2233) /
2810 Raw(b'X' * payload_size))
2811 for i in range(count)]
2813 def gen_pkts(self, sw_intf, src, dst, count=1,
2815 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2816 IP(src="1.1.1.1", dst=dst) /
2817 UDP(sport=1144, dport=2233) /
2818 Raw(b'X' * payload_size)
2819 for i in range(count)]
2821 def verify_decrypted(self, p, rxs):
2823 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2824 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2826 def verify_encrypted(self, p, sa, rxs):
2829 self.assertEqual(rx[IP].tos,
2830 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2831 pkt = sa.decrypt(rx[IP])
2832 if not pkt.haslayer(IP):
2833 pkt = IP(pkt[Raw].load)
2834 self.assert_packet_checksums_valid(pkt)
2836 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2837 except (IndexError, AssertionError):
2838 self.logger.debug(ppp("Unexpected packet:", rx))
2840 self.logger.debug(ppp("Decrypted packet:", pkt))
2846 super(TestIpsecMIfEsp4, self).setUp()
2849 self.tun_if = self.pg0
2850 p = self.ipv4_params
2851 p.tun_if = VppIpsecInterface(self,
2852 mode=(VppEnum.vl_api_tunnel_mode_t.
2853 TUNNEL_API_MODE_MP))
2854 p.tun_if.add_vpp_config()
2856 p.tun_if.config_ip4()
2857 p.tun_if.unconfig_ip4()
2858 p.tun_if.config_ip4()
2859 p.tun_if.generate_remote_hosts(N_NHS)
2860 self.pg0.generate_remote_hosts(N_NHS)
2861 self.pg0.configure_ipv4_neighbors()
2863 # setup some SAs for several next-hops on the interface
2864 self.multi_params = []
2866 for ii in range(N_NHS):
2867 p = copy.copy(self.ipv4_params)
2869 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2870 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2871 p.scapy_tun_spi = p.scapy_tun_spi + ii
2872 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2873 p.vpp_tun_spi = p.vpp_tun_spi + ii
2875 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2876 p.scapy_tra_spi = p.scapy_tra_spi + ii
2877 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2878 p.vpp_tra_spi = p.vpp_tra_spi + ii
2879 p.tun_sa_out = VppIpsecSA(
2880 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2881 p.auth_algo_vpp_id, p.auth_key,
2882 p.crypt_algo_vpp_id, p.crypt_key,
2883 self.vpp_esp_protocol,
2885 self.pg0.remote_hosts[ii].ip4,
2886 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2887 p.tun_sa_out.add_vpp_config()
2889 p.tun_sa_in = VppIpsecSA(
2890 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2891 p.auth_algo_vpp_id, p.auth_key,
2892 p.crypt_algo_vpp_id, p.crypt_key,
2893 self.vpp_esp_protocol,
2894 self.pg0.remote_hosts[ii].ip4,
2896 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2897 p.tun_sa_in.add_vpp_config()
2899 p.tun_protect = VppIpsecTunProtect(
2904 nh=p.tun_if.remote_hosts[ii].ip4)
2905 p.tun_protect.add_vpp_config()
2906 config_tun_params(p, self.encryption_type, None,
2908 self.pg0.remote_hosts[ii].ip4)
2909 self.multi_params.append(p)
2911 VppIpRoute(self, p.remote_tun_if_host, 32,
2912 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2913 p.tun_if.sw_if_index)]).add_vpp_config()
2915 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2918 p = self.ipv4_params
2919 p.tun_if.unconfig_ip4()
2920 super(TestIpsecMIfEsp4, self).tearDown()
2922 def test_tun_44(self):
2925 for p in self.multi_params:
2926 self.verify_tun_44(p, count=N_PKTS)
2929 class TestIpsecItf6MPLS(TemplateIpsec,
2932 """ IPsec Interface MPLSoIPv6 """
2934 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
2937 super(TestIpsecItf6MPLS, self).setUp()
2939 self.tun_if = self.pg0
2942 super(TestIpsecItf6MPLS, self).tearDown()
2944 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2946 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2947 sa.encrypt(MPLS(label=66, ttl=3) /
2948 IPv6(src=src, dst=dst) /
2949 UDP(sport=1166, dport=2233) /
2950 Raw(b'X' * payload_size))
2951 for i in range(count)]
2953 def verify_encrypted6(self, p, sa, rxs):
2956 pkt = sa.decrypt(rx[IPv6])
2957 if not pkt.haslayer(IPv6):
2958 pkt = IP(pkt[Raw].load)
2959 self.assert_packet_checksums_valid(pkt)
2960 self.assert_equal(pkt[MPLS].label, 66)
2961 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
2962 except (IndexError, AssertionError):
2963 self.logger.debug(ppp("Unexpected packet:", rx))
2965 self.logger.debug(ppp("Decrypted packet:", pkt))
2970 def test_tun_mpls_o_ip6(self):
2971 """IPSEC interface MPLS over IPv6"""
2974 p = self.ipv6_params
2977 tbl = VppMplsTable(self, 0)
2978 tbl.add_vpp_config()
2980 self.config_network(p)
2981 # deag MPLS routes from the tunnel
2982 r6 = VppMplsRoute(self, 66, 1,
2984 self.pg1.remote_ip6,
2985 self.pg1.sw_if_index)],
2986 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
2987 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
2988 p.tun_if.sw_if_index,
2989 labels=[VppMplsLabel(66)])])
2990 p.tun_if.enable_mpls()
2992 self.config_sa_tun(p,
2994 self.pg0.remote_ip6)
2995 self.config_protect(p)
2997 self.verify_tun_66(p, count=n_pkts)
3000 p.tun_if.disable_mpls()
3001 self.unconfig_protect(p)
3003 self.unconfig_network(p)
3006 if __name__ == '__main__':
3007 unittest.main(testRunner=VppTestRunner)