5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
15 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19 VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
30 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
31 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
32 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
33 IPSEC_API_SAD_FLAG_USE_ESN))
34 crypt_key = mk_scapy_crypt_key(p)
36 p.tun_dst = tun_if.remote_ip
37 p.tun_src = tun_if.local_ip
42 p.scapy_tun_sa = SecurityAssociation(
43 encryption_type, spi=p.vpp_tun_spi,
44 crypt_algo=p.crypt_algo,
46 auth_algo=p.auth_algo, auth_key=p.auth_key,
47 tunnel_header=ip_class_by_addr_type[p.addr_type](
50 nat_t_header=p.nat_header,
52 p.vpp_tun_sa = SecurityAssociation(
53 encryption_type, spi=p.scapy_tun_spi,
54 crypt_algo=p.crypt_algo,
56 auth_algo=p.auth_algo, auth_key=p.auth_key,
57 tunnel_header=ip_class_by_addr_type[p.addr_type](
60 nat_t_header=p.nat_header,
64 def config_tra_params(p, encryption_type, tun_if):
65 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
66 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
67 IPSEC_API_SAD_FLAG_USE_ESN))
68 crypt_key = mk_scapy_crypt_key(p)
69 p.tun_dst = tun_if.remote_ip
70 p.tun_src = tun_if.local_ip
71 p.scapy_tun_sa = SecurityAssociation(
72 encryption_type, spi=p.vpp_tun_spi,
73 crypt_algo=p.crypt_algo,
75 auth_algo=p.auth_algo, auth_key=p.auth_key,
77 nat_t_header=p.nat_header)
78 p.vpp_tun_sa = SecurityAssociation(
79 encryption_type, spi=p.scapy_tun_spi,
80 crypt_algo=p.crypt_algo,
82 auth_algo=p.auth_algo, auth_key=p.auth_key,
84 nat_t_header=p.nat_header)
87 class TemplateIpsec4TunProtect(object):
88 """ IPsec IPv4 Tunnel protect """
91 tun4_encrypt_node_name = "esp4-encrypt-tun"
92 tun4_decrypt_node_name = "esp4-decrypt-tun"
93 tun4_input_node = "ipsec4-tun-input"
95 def config_sa_tra(self, p):
96 config_tun_params(p, self.encryption_type, p.tun_if)
98 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
99 p.auth_algo_vpp_id, p.auth_key,
100 p.crypt_algo_vpp_id, p.crypt_key,
101 self.vpp_esp_protocol,
103 p.tun_sa_out.add_vpp_config()
105 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
106 p.auth_algo_vpp_id, p.auth_key,
107 p.crypt_algo_vpp_id, p.crypt_key,
108 self.vpp_esp_protocol,
110 p.tun_sa_in.add_vpp_config()
112 def config_sa_tun(self, p):
113 config_tun_params(p, self.encryption_type, p.tun_if)
115 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
116 p.auth_algo_vpp_id, p.auth_key,
117 p.crypt_algo_vpp_id, p.crypt_key,
118 self.vpp_esp_protocol,
119 self.tun_if.local_addr[p.addr_type],
120 self.tun_if.remote_addr[p.addr_type],
122 p.tun_sa_out.add_vpp_config()
124 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
125 p.auth_algo_vpp_id, p.auth_key,
126 p.crypt_algo_vpp_id, p.crypt_key,
127 self.vpp_esp_protocol,
128 self.tun_if.remote_addr[p.addr_type],
129 self.tun_if.local_addr[p.addr_type],
131 p.tun_sa_in.add_vpp_config()
133 def config_protect(self, p):
134 p.tun_protect = VppIpsecTunProtect(self,
138 p.tun_protect.add_vpp_config()
140 def config_network(self, p):
141 if hasattr(p, 'tun_dst'):
144 tun_dst = self.pg0.remote_ip4
145 p.tun_if = VppIpIpTunInterface(self, self.pg0,
148 p.tun_if.add_vpp_config()
150 p.tun_if.config_ip4()
151 p.tun_if.config_ip6()
153 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
154 [VppRoutePath(p.tun_if.remote_ip4,
156 p.route.add_vpp_config()
157 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
158 [VppRoutePath(p.tun_if.remote_ip6,
160 proto=DpoProto.DPO_PROTO_IP6)])
163 def unconfig_network(self, p):
164 p.route.remove_vpp_config()
165 p.tun_if.remove_vpp_config()
167 def unconfig_protect(self, p):
168 p.tun_protect.remove_vpp_config()
170 def unconfig_sa(self, p):
171 p.tun_sa_out.remove_vpp_config()
172 p.tun_sa_in.remove_vpp_config()
175 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
177 """ IPsec tunnel interface tests """
179 encryption_type = ESP
183 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
186 def tearDownClass(cls):
187 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
190 super(TemplateIpsec4TunIfEsp, self).setUp()
192 self.tun_if = self.pg0
196 self.config_network(p)
197 self.config_sa_tra(p)
198 self.config_protect(p)
201 super(TemplateIpsec4TunIfEsp, self).tearDown()
204 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
206 """ IPsec UDP tunnel interface tests """
208 tun4_encrypt_node_name = "esp4-encrypt-tun"
209 tun4_decrypt_node_name = "esp4-decrypt-tun"
210 encryption_type = ESP
214 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
217 def tearDownClass(cls):
218 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
220 def verify_encrypted(self, p, sa, rxs):
223 # ensure the UDP ports are correct before we decrypt
225 self.assertTrue(rx.haslayer(UDP))
226 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
227 self.assert_equal(rx[UDP].dport, 4500)
229 pkt = sa.decrypt(rx[IP])
230 if not pkt.haslayer(IP):
231 pkt = IP(pkt[Raw].load)
233 self.assert_packet_checksums_valid(pkt)
234 self.assert_equal(pkt[IP].dst, "1.1.1.1")
235 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
236 except (IndexError, AssertionError):
237 self.logger.debug(ppp("Unexpected packet:", rx))
239 self.logger.debug(ppp("Decrypted packet:", pkt))
244 def config_sa_tra(self, p):
245 config_tun_params(p, self.encryption_type, p.tun_if)
247 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
248 p.auth_algo_vpp_id, p.auth_key,
249 p.crypt_algo_vpp_id, p.crypt_key,
250 self.vpp_esp_protocol,
252 udp_src=p.nat_header.sport,
253 udp_dst=p.nat_header.dport)
254 p.tun_sa_out.add_vpp_config()
256 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
257 p.auth_algo_vpp_id, p.auth_key,
258 p.crypt_algo_vpp_id, p.crypt_key,
259 self.vpp_esp_protocol,
261 udp_src=p.nat_header.sport,
262 udp_dst=p.nat_header.dport)
263 p.tun_sa_in.add_vpp_config()
266 super(TemplateIpsec4TunIfEspUdp, self).setUp()
269 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
270 IPSEC_API_SAD_FLAG_UDP_ENCAP)
271 p.nat_header = UDP(sport=5454, dport=4500)
273 self.tun_if = self.pg0
275 self.config_network(p)
276 self.config_sa_tra(p)
277 self.config_protect(p)
280 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
283 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
284 """ Ipsec ESP - TUN tests """
285 tun4_encrypt_node_name = "esp4-encrypt-tun"
286 tun4_decrypt_node_name = "esp4-decrypt-tun"
288 def test_tun_basic64(self):
289 """ ipsec 6o4 tunnel basic test """
290 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
292 self.verify_tun_64(self.params[socket.AF_INET], count=1)
294 def test_tun_burst64(self):
295 """ ipsec 6o4 tunnel basic test """
296 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
298 self.verify_tun_64(self.params[socket.AF_INET], count=257)
300 def test_tun_basic_frag44(self):
301 """ ipsec 4o4 tunnel frag basic test """
302 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
306 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
308 self.verify_tun_44(self.params[socket.AF_INET],
309 count=1, payload_size=1800, n_rx=2)
310 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
314 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
315 """ Ipsec ESP UDP tests """
317 tun4_input_node = "ipsec4-tun-input"
320 super(TestIpsec4TunIfEspUdp, self).setUp()
322 def test_keepalive(self):
323 """ IPSEC NAT Keepalive """
324 self.verify_keepalive(self.ipv4_params)
327 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
328 """ Ipsec ESP UDP GCM tests """
330 tun4_input_node = "ipsec4-tun-input"
333 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
335 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
336 IPSEC_API_INTEG_ALG_NONE)
337 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
338 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
339 p.crypt_algo = "AES-GCM"
341 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
345 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
346 """ Ipsec ESP - TCP tests """
350 class TemplateIpsec6TunProtect(object):
351 """ IPsec IPv6 Tunnel protect """
353 def config_sa_tra(self, p):
354 config_tun_params(p, self.encryption_type, p.tun_if)
356 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
357 p.auth_algo_vpp_id, p.auth_key,
358 p.crypt_algo_vpp_id, p.crypt_key,
359 self.vpp_esp_protocol)
360 p.tun_sa_out.add_vpp_config()
362 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
363 p.auth_algo_vpp_id, p.auth_key,
364 p.crypt_algo_vpp_id, p.crypt_key,
365 self.vpp_esp_protocol)
366 p.tun_sa_in.add_vpp_config()
368 def config_sa_tun(self, p):
369 config_tun_params(p, self.encryption_type, p.tun_if)
371 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
372 p.auth_algo_vpp_id, p.auth_key,
373 p.crypt_algo_vpp_id, p.crypt_key,
374 self.vpp_esp_protocol,
375 self.tun_if.local_addr[p.addr_type],
376 self.tun_if.remote_addr[p.addr_type])
377 p.tun_sa_out.add_vpp_config()
379 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
380 p.auth_algo_vpp_id, p.auth_key,
381 p.crypt_algo_vpp_id, p.crypt_key,
382 self.vpp_esp_protocol,
383 self.tun_if.remote_addr[p.addr_type],
384 self.tun_if.local_addr[p.addr_type])
385 p.tun_sa_in.add_vpp_config()
387 def config_protect(self, p):
388 p.tun_protect = VppIpsecTunProtect(self,
392 p.tun_protect.add_vpp_config()
394 def config_network(self, p):
395 if hasattr(p, 'tun_dst'):
398 tun_dst = self.pg0.remote_ip6
399 p.tun_if = VppIpIpTunInterface(self, self.pg0,
402 p.tun_if.add_vpp_config()
404 p.tun_if.config_ip6()
405 p.tun_if.config_ip4()
407 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
408 [VppRoutePath(p.tun_if.remote_ip6,
410 proto=DpoProto.DPO_PROTO_IP6)])
411 p.route.add_vpp_config()
412 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
413 [VppRoutePath(p.tun_if.remote_ip4,
417 def unconfig_network(self, p):
418 p.route.remove_vpp_config()
419 p.tun_if.remove_vpp_config()
421 def unconfig_protect(self, p):
422 p.tun_protect.remove_vpp_config()
424 def unconfig_sa(self, p):
425 p.tun_sa_out.remove_vpp_config()
426 p.tun_sa_in.remove_vpp_config()
429 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
431 """ IPsec tunnel interface tests """
433 encryption_type = ESP
436 super(TemplateIpsec6TunIfEsp, self).setUp()
438 self.tun_if = self.pg0
441 self.config_network(p)
442 self.config_sa_tra(p)
443 self.config_protect(p)
446 super(TemplateIpsec6TunIfEsp, self).tearDown()
449 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
451 """ Ipsec ESP - TUN tests """
452 tun6_encrypt_node_name = "esp6-encrypt-tun"
453 tun6_decrypt_node_name = "esp6-decrypt-tun"
455 def test_tun_basic46(self):
456 """ ipsec 4o6 tunnel basic test """
457 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
458 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
460 def test_tun_burst46(self):
461 """ ipsec 4o6 tunnel burst test """
462 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
463 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
466 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
467 IpsecTun6HandoffTests):
468 """ Ipsec ESP 6 Handoff tests """
469 tun6_encrypt_node_name = "esp6-encrypt-tun"
470 tun6_decrypt_node_name = "esp6-decrypt-tun"
473 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
474 IpsecTun4HandoffTests):
475 """ Ipsec ESP 4 Handoff tests """
476 tun4_encrypt_node_name = "esp4-encrypt-tun"
477 tun4_decrypt_node_name = "esp4-decrypt-tun"
480 @tag_fixme_vpp_workers
481 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
484 """ IPsec IPv4 Multi Tunnel interface """
486 encryption_type = ESP
487 tun4_encrypt_node_name = "esp4-encrypt-tun"
488 tun4_decrypt_node_name = "esp4-decrypt-tun"
491 super(TestIpsec4MultiTunIfEsp, self).setUp()
493 self.tun_if = self.pg0
495 self.multi_params = []
496 self.pg0.generate_remote_hosts(10)
497 self.pg0.configure_ipv4_neighbors()
500 p = copy.copy(self.ipv4_params)
502 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
503 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
504 p.scapy_tun_spi = p.scapy_tun_spi + ii
505 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
506 p.vpp_tun_spi = p.vpp_tun_spi + ii
508 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
509 p.scapy_tra_spi = p.scapy_tra_spi + ii
510 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
511 p.vpp_tra_spi = p.vpp_tra_spi + ii
512 p.tun_dst = self.pg0.remote_hosts[ii].ip4
514 self.multi_params.append(p)
515 self.config_network(p)
516 self.config_sa_tra(p)
517 self.config_protect(p)
520 super(TestIpsec4MultiTunIfEsp, self).tearDown()
522 def test_tun_44(self):
523 """Multiple IPSEC tunnel interfaces """
524 for p in self.multi_params:
525 self.verify_tun_44(p, count=127)
526 c = p.tun_if.get_rx_stats()
527 self.assertEqual(c['packets'], 127)
528 c = p.tun_if.get_tx_stats()
529 self.assertEqual(c['packets'], 127)
531 def test_tun_rr_44(self):
532 """ Round-robin packets acrros multiple interface """
534 for p in self.multi_params:
535 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
536 src=p.remote_tun_if_host,
537 dst=self.pg1.remote_ip4)
538 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
540 for rx, p in zip(rxs, self.multi_params):
541 self.verify_decrypted(p, [rx])
544 for p in self.multi_params:
545 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
546 dst=p.remote_tun_if_host)
547 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
549 for rx, p in zip(rxs, self.multi_params):
550 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
553 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
556 """ IPsec IPv4 Tunnel interface all Algos """
558 encryption_type = ESP
559 tun4_encrypt_node_name = "esp4-encrypt-tun"
560 tun4_decrypt_node_name = "esp4-decrypt-tun"
563 super(TestIpsec4TunIfEspAll, self).setUp()
565 self.tun_if = self.pg0
568 self.config_network(p)
569 self.config_sa_tra(p)
570 self.config_protect(p)
574 self.unconfig_protect(p)
575 self.unconfig_network(p)
578 super(TestIpsec4TunIfEspAll, self).tearDown()
582 # change the key and the SPI
585 p.crypt_key = b'X' + p.crypt_key[1:]
587 p.scapy_tun_sa_id += 1
590 p.tun_if.local_spi = p.vpp_tun_spi
591 p.tun_if.remote_spi = p.scapy_tun_spi
593 config_tun_params(p, self.encryption_type, p.tun_if)
595 p.tun_sa_out = VppIpsecSA(self,
602 self.vpp_esp_protocol,
605 p.tun_sa_in = VppIpsecSA(self,
612 self.vpp_esp_protocol,
615 p.tun_sa_in.add_vpp_config()
616 p.tun_sa_out.add_vpp_config()
618 self.config_protect(p)
619 np.tun_sa_out.remove_vpp_config()
620 np.tun_sa_in.remove_vpp_config()
621 self.logger.info(self.vapi.cli("sh ipsec sa"))
623 def test_tun_44(self):
624 """IPSEC tunnel all algos """
626 # foreach VPP crypto engine
627 engines = ["ia32", "ipsecmb", "openssl"]
629 # foreach crypto algorithm
630 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
631 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
632 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
633 IPSEC_API_INTEG_ALG_NONE),
634 'scapy-crypto': "AES-GCM",
635 'scapy-integ': "NULL",
636 'key': b"JPjyOWBeVEQiMe7h",
638 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
639 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
640 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
641 IPSEC_API_INTEG_ALG_NONE),
642 'scapy-crypto': "AES-GCM",
643 'scapy-integ': "NULL",
644 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
646 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
647 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
648 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
649 IPSEC_API_INTEG_ALG_NONE),
650 'scapy-crypto': "AES-GCM",
651 'scapy-integ': "NULL",
652 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
654 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
655 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
656 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
657 IPSEC_API_INTEG_ALG_SHA1_96),
658 'scapy-crypto': "AES-CBC",
659 'scapy-integ': "HMAC-SHA1-96",
661 'key': b"JPjyOWBeVEQiMe7h"},
662 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
663 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
664 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
665 IPSEC_API_INTEG_ALG_SHA_512_256),
666 'scapy-crypto': "AES-CBC",
667 'scapy-integ': "SHA2-512-256",
669 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
670 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
671 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
672 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
673 IPSEC_API_INTEG_ALG_SHA_256_128),
674 'scapy-crypto': "AES-CBC",
675 'scapy-integ': "SHA2-256-128",
677 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
678 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
679 IPSEC_API_CRYPTO_ALG_NONE),
680 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
681 IPSEC_API_INTEG_ALG_SHA1_96),
682 'scapy-crypto': "NULL",
683 'scapy-integ': "HMAC-SHA1-96",
685 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
687 for engine in engines:
688 self.vapi.cli("set crypto handler all %s" % engine)
691 # loop through each of the algorithms
694 # with self.subTest(algo=algo['scapy']):
697 p.auth_algo_vpp_id = algo['vpp-integ']
698 p.crypt_algo_vpp_id = algo['vpp-crypto']
699 p.crypt_algo = algo['scapy-crypto']
700 p.auth_algo = algo['scapy-integ']
701 p.crypt_key = algo['key']
702 p.salt = algo['salt']
708 self.verify_tun_44(p, count=127)
711 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
714 """ IPsec IPv4 Tunnel interface no Algos """
716 encryption_type = ESP
717 tun4_encrypt_node_name = "esp4-encrypt-tun"
718 tun4_decrypt_node_name = "esp4-decrypt-tun"
721 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
723 self.tun_if = self.pg0
725 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
726 IPSEC_API_INTEG_ALG_NONE)
730 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
731 IPSEC_API_CRYPTO_ALG_NONE)
732 p.crypt_algo = 'NULL'
736 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
738 def test_tun_44(self):
739 """ IPSec SA with NULL algos """
742 self.config_network(p)
743 self.config_sa_tra(p)
744 self.config_protect(p)
746 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
747 dst=p.remote_tun_if_host)
748 self.send_and_assert_no_replies(self.pg1, tx)
750 self.unconfig_protect(p)
752 self.unconfig_network(p)
755 @tag_fixme_vpp_workers
756 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
759 """ IPsec IPv6 Multi Tunnel interface """
761 encryption_type = ESP
762 tun6_encrypt_node_name = "esp6-encrypt-tun"
763 tun6_decrypt_node_name = "esp6-decrypt-tun"
766 super(TestIpsec6MultiTunIfEsp, self).setUp()
768 self.tun_if = self.pg0
770 self.multi_params = []
771 self.pg0.generate_remote_hosts(10)
772 self.pg0.configure_ipv6_neighbors()
775 p = copy.copy(self.ipv6_params)
777 p.remote_tun_if_host = "1111::%d" % (ii + 1)
778 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
779 p.scapy_tun_spi = p.scapy_tun_spi + ii
780 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
781 p.vpp_tun_spi = p.vpp_tun_spi + ii
783 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
784 p.scapy_tra_spi = p.scapy_tra_spi + ii
785 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
786 p.vpp_tra_spi = p.vpp_tra_spi + ii
787 p.tun_dst = self.pg0.remote_hosts[ii].ip6
789 self.multi_params.append(p)
790 self.config_network(p)
791 self.config_sa_tra(p)
792 self.config_protect(p)
795 super(TestIpsec6MultiTunIfEsp, self).tearDown()
797 def test_tun_66(self):
798 """Multiple IPSEC tunnel interfaces """
799 for p in self.multi_params:
800 self.verify_tun_66(p, count=127)
801 c = p.tun_if.get_rx_stats()
802 self.assertEqual(c['packets'], 127)
803 c = p.tun_if.get_tx_stats()
804 self.assertEqual(c['packets'], 127)
807 class TestIpsecGreTebIfEsp(TemplateIpsec,
809 """ Ipsec GRE TEB ESP - TUN tests """
810 tun4_encrypt_node_name = "esp4-encrypt-tun"
811 tun4_decrypt_node_name = "esp4-decrypt-tun"
812 encryption_type = ESP
813 omac = "00:11:22:33:44:55"
815 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
817 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
818 sa.encrypt(IP(src=self.pg0.remote_ip4,
819 dst=self.pg0.local_ip4) /
821 Ether(dst=self.omac) /
822 IP(src="1.1.1.1", dst="1.1.1.2") /
823 UDP(sport=1144, dport=2233) /
824 Raw(b'X' * payload_size))
825 for i in range(count)]
827 def gen_pkts(self, sw_intf, src, dst, count=1,
829 return [Ether(dst=self.omac) /
830 IP(src="1.1.1.1", dst="1.1.1.2") /
831 UDP(sport=1144, dport=2233) /
832 Raw(b'X' * payload_size)
833 for i in range(count)]
835 def verify_decrypted(self, p, rxs):
837 self.assert_equal(rx[Ether].dst, self.omac)
838 self.assert_equal(rx[IP].dst, "1.1.1.2")
840 def verify_encrypted(self, p, sa, rxs):
843 pkt = sa.decrypt(rx[IP])
844 if not pkt.haslayer(IP):
845 pkt = IP(pkt[Raw].load)
846 self.assert_packet_checksums_valid(pkt)
847 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
848 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
849 self.assertTrue(pkt.haslayer(GRE))
851 self.assertEqual(e[Ether].dst, self.omac)
852 self.assertEqual(e[IP].dst, "1.1.1.2")
853 except (IndexError, AssertionError):
854 self.logger.debug(ppp("Unexpected packet:", rx))
856 self.logger.debug(ppp("Decrypted packet:", pkt))
862 super(TestIpsecGreTebIfEsp, self).setUp()
864 self.tun_if = self.pg0
868 bd1 = VppBridgeDomain(self, 1)
871 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
872 p.auth_algo_vpp_id, p.auth_key,
873 p.crypt_algo_vpp_id, p.crypt_key,
874 self.vpp_esp_protocol,
877 p.tun_sa_out.add_vpp_config()
879 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
880 p.auth_algo_vpp_id, p.auth_key,
881 p.crypt_algo_vpp_id, p.crypt_key,
882 self.vpp_esp_protocol,
885 p.tun_sa_in.add_vpp_config()
887 p.tun_if = VppGreInterface(self,
890 type=(VppEnum.vl_api_gre_tunnel_type_t.
891 GRE_API_TUNNEL_TYPE_TEB))
892 p.tun_if.add_vpp_config()
894 p.tun_protect = VppIpsecTunProtect(self,
899 p.tun_protect.add_vpp_config()
902 p.tun_if.config_ip4()
903 config_tun_params(p, self.encryption_type, p.tun_if)
905 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
906 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
908 self.vapi.cli("clear ipsec sa")
909 self.vapi.cli("sh adj")
910 self.vapi.cli("sh ipsec tun")
914 p.tun_if.unconfig_ip4()
915 super(TestIpsecGreTebIfEsp, self).tearDown()
918 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
920 """ Ipsec GRE TEB ESP - TUN tests """
921 tun4_encrypt_node_name = "esp4-encrypt-tun"
922 tun4_decrypt_node_name = "esp4-decrypt-tun"
923 encryption_type = ESP
924 omac = "00:11:22:33:44:55"
926 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
928 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
929 sa.encrypt(IP(src=self.pg0.remote_ip4,
930 dst=self.pg0.local_ip4) /
932 Ether(dst=self.omac) /
933 IP(src="1.1.1.1", dst="1.1.1.2") /
934 UDP(sport=1144, dport=2233) /
935 Raw(b'X' * payload_size))
936 for i in range(count)]
938 def gen_pkts(self, sw_intf, src, dst, count=1,
940 return [Ether(dst=self.omac) /
942 IP(src="1.1.1.1", dst="1.1.1.2") /
943 UDP(sport=1144, dport=2233) /
944 Raw(b'X' * payload_size)
945 for i in range(count)]
947 def verify_decrypted(self, p, rxs):
949 self.assert_equal(rx[Ether].dst, self.omac)
950 self.assert_equal(rx[Dot1Q].vlan, 11)
951 self.assert_equal(rx[IP].dst, "1.1.1.2")
953 def verify_encrypted(self, p, sa, rxs):
956 pkt = sa.decrypt(rx[IP])
957 if not pkt.haslayer(IP):
958 pkt = IP(pkt[Raw].load)
959 self.assert_packet_checksums_valid(pkt)
960 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
961 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
962 self.assertTrue(pkt.haslayer(GRE))
964 self.assertEqual(e[Ether].dst, self.omac)
965 self.assertFalse(e.haslayer(Dot1Q))
966 self.assertEqual(e[IP].dst, "1.1.1.2")
967 except (IndexError, AssertionError):
968 self.logger.debug(ppp("Unexpected packet:", rx))
970 self.logger.debug(ppp("Decrypted packet:", pkt))
976 super(TestIpsecGreTebVlanIfEsp, self).setUp()
978 self.tun_if = self.pg0
982 bd1 = VppBridgeDomain(self, 1)
985 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
986 self.vapi.l2_interface_vlan_tag_rewrite(
987 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
989 self.pg1_11.admin_up()
991 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
992 p.auth_algo_vpp_id, p.auth_key,
993 p.crypt_algo_vpp_id, p.crypt_key,
994 self.vpp_esp_protocol,
997 p.tun_sa_out.add_vpp_config()
999 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1000 p.auth_algo_vpp_id, p.auth_key,
1001 p.crypt_algo_vpp_id, p.crypt_key,
1002 self.vpp_esp_protocol,
1003 self.pg0.remote_ip4,
1005 p.tun_sa_in.add_vpp_config()
1007 p.tun_if = VppGreInterface(self,
1009 self.pg0.remote_ip4,
1010 type=(VppEnum.vl_api_gre_tunnel_type_t.
1011 GRE_API_TUNNEL_TYPE_TEB))
1012 p.tun_if.add_vpp_config()
1014 p.tun_protect = VppIpsecTunProtect(self,
1019 p.tun_protect.add_vpp_config()
1022 p.tun_if.config_ip4()
1023 config_tun_params(p, self.encryption_type, p.tun_if)
1025 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1026 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1028 self.vapi.cli("clear ipsec sa")
1031 p = self.ipv4_params
1032 p.tun_if.unconfig_ip4()
1033 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1034 self.pg1_11.admin_down()
1035 self.pg1_11.remove_vpp_config()
1038 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1040 """ Ipsec GRE TEB ESP - Tra tests """
1041 tun4_encrypt_node_name = "esp4-encrypt-tun"
1042 tun4_decrypt_node_name = "esp4-decrypt-tun"
1043 encryption_type = ESP
1044 omac = "00:11:22:33:44:55"
1046 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1048 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1049 sa.encrypt(IP(src=self.pg0.remote_ip4,
1050 dst=self.pg0.local_ip4) /
1052 Ether(dst=self.omac) /
1053 IP(src="1.1.1.1", dst="1.1.1.2") /
1054 UDP(sport=1144, dport=2233) /
1055 Raw(b'X' * payload_size))
1056 for i in range(count)]
1058 def gen_pkts(self, sw_intf, src, dst, count=1,
1060 return [Ether(dst=self.omac) /
1061 IP(src="1.1.1.1", dst="1.1.1.2") /
1062 UDP(sport=1144, dport=2233) /
1063 Raw(b'X' * payload_size)
1064 for i in range(count)]
1066 def verify_decrypted(self, p, rxs):
1068 self.assert_equal(rx[Ether].dst, self.omac)
1069 self.assert_equal(rx[IP].dst, "1.1.1.2")
1071 def verify_encrypted(self, p, sa, rxs):
1074 pkt = sa.decrypt(rx[IP])
1075 if not pkt.haslayer(IP):
1076 pkt = IP(pkt[Raw].load)
1077 self.assert_packet_checksums_valid(pkt)
1078 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1079 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1080 self.assertTrue(pkt.haslayer(GRE))
1082 self.assertEqual(e[Ether].dst, self.omac)
1083 self.assertEqual(e[IP].dst, "1.1.1.2")
1084 except (IndexError, AssertionError):
1085 self.logger.debug(ppp("Unexpected packet:", rx))
1087 self.logger.debug(ppp("Decrypted packet:", pkt))
1093 super(TestIpsecGreTebIfEspTra, self).setUp()
1095 self.tun_if = self.pg0
1097 p = self.ipv4_params
1099 bd1 = VppBridgeDomain(self, 1)
1100 bd1.add_vpp_config()
1102 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1103 p.auth_algo_vpp_id, p.auth_key,
1104 p.crypt_algo_vpp_id, p.crypt_key,
1105 self.vpp_esp_protocol)
1106 p.tun_sa_out.add_vpp_config()
1108 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1109 p.auth_algo_vpp_id, p.auth_key,
1110 p.crypt_algo_vpp_id, p.crypt_key,
1111 self.vpp_esp_protocol)
1112 p.tun_sa_in.add_vpp_config()
1114 p.tun_if = VppGreInterface(self,
1116 self.pg0.remote_ip4,
1117 type=(VppEnum.vl_api_gre_tunnel_type_t.
1118 GRE_API_TUNNEL_TYPE_TEB))
1119 p.tun_if.add_vpp_config()
1121 p.tun_protect = VppIpsecTunProtect(self,
1126 p.tun_protect.add_vpp_config()
1129 p.tun_if.config_ip4()
1130 config_tra_params(p, self.encryption_type, p.tun_if)
1132 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1133 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1135 self.vapi.cli("clear ipsec sa")
1138 p = self.ipv4_params
1139 p.tun_if.unconfig_ip4()
1140 super(TestIpsecGreTebIfEspTra, self).tearDown()
1143 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1145 """ Ipsec GRE TEB UDP ESP - Tra tests """
1146 tun4_encrypt_node_name = "esp4-encrypt-tun"
1147 tun4_decrypt_node_name = "esp4-decrypt-tun"
1148 encryption_type = ESP
1149 omac = "00:11:22:33:44:55"
1151 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1153 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1154 sa.encrypt(IP(src=self.pg0.remote_ip4,
1155 dst=self.pg0.local_ip4) /
1157 Ether(dst=self.omac) /
1158 IP(src="1.1.1.1", dst="1.1.1.2") /
1159 UDP(sport=1144, dport=2233) /
1160 Raw(b'X' * payload_size))
1161 for i in range(count)]
1163 def gen_pkts(self, sw_intf, src, dst, count=1,
1165 return [Ether(dst=self.omac) /
1166 IP(src="1.1.1.1", dst="1.1.1.2") /
1167 UDP(sport=1144, dport=2233) /
1168 Raw(b'X' * payload_size)
1169 for i in range(count)]
1171 def verify_decrypted(self, p, rxs):
1173 self.assert_equal(rx[Ether].dst, self.omac)
1174 self.assert_equal(rx[IP].dst, "1.1.1.2")
1176 def verify_encrypted(self, p, sa, rxs):
1178 self.assertTrue(rx.haslayer(UDP))
1179 self.assertEqual(rx[UDP].dport, 4545)
1180 self.assertEqual(rx[UDP].sport, 5454)
1182 pkt = sa.decrypt(rx[IP])
1183 if not pkt.haslayer(IP):
1184 pkt = IP(pkt[Raw].load)
1185 self.assert_packet_checksums_valid(pkt)
1186 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1187 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1188 self.assertTrue(pkt.haslayer(GRE))
1190 self.assertEqual(e[Ether].dst, self.omac)
1191 self.assertEqual(e[IP].dst, "1.1.1.2")
1192 except (IndexError, AssertionError):
1193 self.logger.debug(ppp("Unexpected packet:", rx))
1195 self.logger.debug(ppp("Decrypted packet:", pkt))
1201 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1203 self.tun_if = self.pg0
1205 p = self.ipv4_params
1206 p = self.ipv4_params
1207 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1208 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1209 p.nat_header = UDP(sport=5454, dport=4545)
1211 bd1 = VppBridgeDomain(self, 1)
1212 bd1.add_vpp_config()
1214 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1215 p.auth_algo_vpp_id, p.auth_key,
1216 p.crypt_algo_vpp_id, p.crypt_key,
1217 self.vpp_esp_protocol,
1221 p.tun_sa_out.add_vpp_config()
1223 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1224 p.auth_algo_vpp_id, p.auth_key,
1225 p.crypt_algo_vpp_id, p.crypt_key,
1226 self.vpp_esp_protocol,
1228 VppEnum.vl_api_ipsec_sad_flags_t.
1229 IPSEC_API_SAD_FLAG_IS_INBOUND),
1232 p.tun_sa_in.add_vpp_config()
1234 p.tun_if = VppGreInterface(self,
1236 self.pg0.remote_ip4,
1237 type=(VppEnum.vl_api_gre_tunnel_type_t.
1238 GRE_API_TUNNEL_TYPE_TEB))
1239 p.tun_if.add_vpp_config()
1241 p.tun_protect = VppIpsecTunProtect(self,
1246 p.tun_protect.add_vpp_config()
1249 p.tun_if.config_ip4()
1250 config_tra_params(p, self.encryption_type, p.tun_if)
1252 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1253 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1255 self.vapi.cli("clear ipsec sa")
1256 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1259 p = self.ipv4_params
1260 p.tun_if.unconfig_ip4()
1261 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1264 class TestIpsecGreIfEsp(TemplateIpsec,
1266 """ Ipsec GRE ESP - TUN tests """
1267 tun4_encrypt_node_name = "esp4-encrypt-tun"
1268 tun4_decrypt_node_name = "esp4-decrypt-tun"
1269 encryption_type = ESP
1271 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1273 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1274 sa.encrypt(IP(src=self.pg0.remote_ip4,
1275 dst=self.pg0.local_ip4) /
1277 IP(src=self.pg1.local_ip4,
1278 dst=self.pg1.remote_ip4) /
1279 UDP(sport=1144, dport=2233) /
1280 Raw(b'X' * payload_size))
1281 for i in range(count)]
1283 def gen_pkts(self, sw_intf, src, dst, count=1,
1285 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1286 IP(src="1.1.1.1", dst="1.1.1.2") /
1287 UDP(sport=1144, dport=2233) /
1288 Raw(b'X' * payload_size)
1289 for i in range(count)]
1291 def verify_decrypted(self, p, rxs):
1293 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1294 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1296 def verify_encrypted(self, p, sa, rxs):
1299 pkt = sa.decrypt(rx[IP])
1300 if not pkt.haslayer(IP):
1301 pkt = IP(pkt[Raw].load)
1302 self.assert_packet_checksums_valid(pkt)
1303 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1304 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1305 self.assertTrue(pkt.haslayer(GRE))
1307 self.assertEqual(e[IP].dst, "1.1.1.2")
1308 except (IndexError, AssertionError):
1309 self.logger.debug(ppp("Unexpected packet:", rx))
1311 self.logger.debug(ppp("Decrypted packet:", pkt))
1317 super(TestIpsecGreIfEsp, self).setUp()
1319 self.tun_if = self.pg0
1321 p = self.ipv4_params
1323 bd1 = VppBridgeDomain(self, 1)
1324 bd1.add_vpp_config()
1326 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1327 p.auth_algo_vpp_id, p.auth_key,
1328 p.crypt_algo_vpp_id, p.crypt_key,
1329 self.vpp_esp_protocol,
1331 self.pg0.remote_ip4)
1332 p.tun_sa_out.add_vpp_config()
1334 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1335 p.auth_algo_vpp_id, p.auth_key,
1336 p.crypt_algo_vpp_id, p.crypt_key,
1337 self.vpp_esp_protocol,
1338 self.pg0.remote_ip4,
1340 p.tun_sa_in.add_vpp_config()
1342 p.tun_if = VppGreInterface(self,
1344 self.pg0.remote_ip4)
1345 p.tun_if.add_vpp_config()
1347 p.tun_protect = VppIpsecTunProtect(self,
1351 p.tun_protect.add_vpp_config()
1354 p.tun_if.config_ip4()
1355 config_tun_params(p, self.encryption_type, p.tun_if)
1357 VppIpRoute(self, "1.1.1.2", 32,
1358 [VppRoutePath(p.tun_if.remote_ip4,
1359 0xffffffff)]).add_vpp_config()
1362 p = self.ipv4_params
1363 p.tun_if.unconfig_ip4()
1364 super(TestIpsecGreIfEsp, self).tearDown()
1367 class TestIpsecGreIfEspTra(TemplateIpsec,
1369 """ Ipsec GRE ESP - TRA tests """
1370 tun4_encrypt_node_name = "esp4-encrypt-tun"
1371 tun4_decrypt_node_name = "esp4-decrypt-tun"
1372 encryption_type = ESP
1374 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1376 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1377 sa.encrypt(IP(src=self.pg0.remote_ip4,
1378 dst=self.pg0.local_ip4) /
1380 IP(src=self.pg1.local_ip4,
1381 dst=self.pg1.remote_ip4) /
1382 UDP(sport=1144, dport=2233) /
1383 Raw(b'X' * payload_size))
1384 for i in range(count)]
1386 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1388 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1389 sa.encrypt(IP(src=self.pg0.remote_ip4,
1390 dst=self.pg0.local_ip4) /
1392 UDP(sport=1144, dport=2233) /
1393 Raw(b'X' * payload_size))
1394 for i in range(count)]
1396 def gen_pkts(self, sw_intf, src, dst, count=1,
1398 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1399 IP(src="1.1.1.1", dst="1.1.1.2") /
1400 UDP(sport=1144, dport=2233) /
1401 Raw(b'X' * payload_size)
1402 for i in range(count)]
1404 def verify_decrypted(self, p, rxs):
1406 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1407 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1409 def verify_encrypted(self, p, sa, rxs):
1412 pkt = sa.decrypt(rx[IP])
1413 if not pkt.haslayer(IP):
1414 pkt = IP(pkt[Raw].load)
1415 self.assert_packet_checksums_valid(pkt)
1416 self.assertTrue(pkt.haslayer(GRE))
1418 self.assertEqual(e[IP].dst, "1.1.1.2")
1419 except (IndexError, AssertionError):
1420 self.logger.debug(ppp("Unexpected packet:", rx))
1422 self.logger.debug(ppp("Decrypted packet:", pkt))
1428 super(TestIpsecGreIfEspTra, self).setUp()
1430 self.tun_if = self.pg0
1432 p = self.ipv4_params
1434 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1435 p.auth_algo_vpp_id, p.auth_key,
1436 p.crypt_algo_vpp_id, p.crypt_key,
1437 self.vpp_esp_protocol)
1438 p.tun_sa_out.add_vpp_config()
1440 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1441 p.auth_algo_vpp_id, p.auth_key,
1442 p.crypt_algo_vpp_id, p.crypt_key,
1443 self.vpp_esp_protocol)
1444 p.tun_sa_in.add_vpp_config()
1446 p.tun_if = VppGreInterface(self,
1448 self.pg0.remote_ip4)
1449 p.tun_if.add_vpp_config()
1451 p.tun_protect = VppIpsecTunProtect(self,
1455 p.tun_protect.add_vpp_config()
1458 p.tun_if.config_ip4()
1459 config_tra_params(p, self.encryption_type, p.tun_if)
1461 VppIpRoute(self, "1.1.1.2", 32,
1462 [VppRoutePath(p.tun_if.remote_ip4,
1463 0xffffffff)]).add_vpp_config()
1466 p = self.ipv4_params
1467 p.tun_if.unconfig_ip4()
1468 super(TestIpsecGreIfEspTra, self).tearDown()
1470 def test_gre_non_ip(self):
1471 p = self.ipv4_params
1472 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1473 src=p.remote_tun_if_host,
1474 dst=self.pg1.remote_ip6)
1475 self.send_and_assert_no_replies(self.tun_if, tx)
1476 node_name = ('/err/%s/unsupported payload' %
1477 self.tun4_decrypt_node_name)
1478 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1481 class TestIpsecGre6IfEspTra(TemplateIpsec,
1483 """ Ipsec GRE ESP - TRA tests """
1484 tun6_encrypt_node_name = "esp6-encrypt-tun"
1485 tun6_decrypt_node_name = "esp6-decrypt-tun"
1486 encryption_type = ESP
1488 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1490 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1491 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1492 dst=self.pg0.local_ip6) /
1494 IPv6(src=self.pg1.local_ip6,
1495 dst=self.pg1.remote_ip6) /
1496 UDP(sport=1144, dport=2233) /
1497 Raw(b'X' * payload_size))
1498 for i in range(count)]
1500 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1502 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1503 IPv6(src="1::1", dst="1::2") /
1504 UDP(sport=1144, dport=2233) /
1505 Raw(b'X' * payload_size)
1506 for i in range(count)]
1508 def verify_decrypted6(self, p, rxs):
1510 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1511 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1513 def verify_encrypted6(self, p, sa, rxs):
1516 pkt = sa.decrypt(rx[IPv6])
1517 if not pkt.haslayer(IPv6):
1518 pkt = IPv6(pkt[Raw].load)
1519 self.assert_packet_checksums_valid(pkt)
1520 self.assertTrue(pkt.haslayer(GRE))
1522 self.assertEqual(e[IPv6].dst, "1::2")
1523 except (IndexError, AssertionError):
1524 self.logger.debug(ppp("Unexpected packet:", rx))
1526 self.logger.debug(ppp("Decrypted packet:", pkt))
1532 super(TestIpsecGre6IfEspTra, self).setUp()
1534 self.tun_if = self.pg0
1536 p = self.ipv6_params
1538 bd1 = VppBridgeDomain(self, 1)
1539 bd1.add_vpp_config()
1541 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1542 p.auth_algo_vpp_id, p.auth_key,
1543 p.crypt_algo_vpp_id, p.crypt_key,
1544 self.vpp_esp_protocol)
1545 p.tun_sa_out.add_vpp_config()
1547 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1548 p.auth_algo_vpp_id, p.auth_key,
1549 p.crypt_algo_vpp_id, p.crypt_key,
1550 self.vpp_esp_protocol)
1551 p.tun_sa_in.add_vpp_config()
1553 p.tun_if = VppGreInterface(self,
1555 self.pg0.remote_ip6)
1556 p.tun_if.add_vpp_config()
1558 p.tun_protect = VppIpsecTunProtect(self,
1562 p.tun_protect.add_vpp_config()
1565 p.tun_if.config_ip6()
1566 config_tra_params(p, self.encryption_type, p.tun_if)
1568 r = VppIpRoute(self, "1::2", 128,
1569 [VppRoutePath(p.tun_if.remote_ip6,
1571 proto=DpoProto.DPO_PROTO_IP6)])
1575 p = self.ipv6_params
1576 p.tun_if.unconfig_ip6()
1577 super(TestIpsecGre6IfEspTra, self).tearDown()
1580 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1581 """ Ipsec mGRE ESP v4 TRA tests """
1582 tun4_encrypt_node_name = "esp4-encrypt-tun"
1583 tun4_decrypt_node_name = "esp4-decrypt-tun"
1584 encryption_type = ESP
1586 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1588 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1589 sa.encrypt(IP(src=p.tun_dst,
1590 dst=self.pg0.local_ip4) /
1592 IP(src=self.pg1.local_ip4,
1593 dst=self.pg1.remote_ip4) /
1594 UDP(sport=1144, dport=2233) /
1595 Raw(b'X' * payload_size))
1596 for i in range(count)]
1598 def gen_pkts(self, sw_intf, src, dst, count=1,
1600 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1601 IP(src="1.1.1.1", dst=dst) /
1602 UDP(sport=1144, dport=2233) /
1603 Raw(b'X' * payload_size)
1604 for i in range(count)]
1606 def verify_decrypted(self, p, rxs):
1608 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1609 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1611 def verify_encrypted(self, p, sa, rxs):
1614 pkt = sa.decrypt(rx[IP])
1615 if not pkt.haslayer(IP):
1616 pkt = IP(pkt[Raw].load)
1617 self.assert_packet_checksums_valid(pkt)
1618 self.assertTrue(pkt.haslayer(GRE))
1620 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1621 except (IndexError, AssertionError):
1622 self.logger.debug(ppp("Unexpected packet:", rx))
1624 self.logger.debug(ppp("Decrypted packet:", pkt))
1630 super(TestIpsecMGreIfEspTra4, self).setUp()
1633 self.tun_if = self.pg0
1634 p = self.ipv4_params
1635 p.tun_if = VppGreInterface(self,
1638 mode=(VppEnum.vl_api_tunnel_mode_t.
1639 TUNNEL_API_MODE_MP))
1640 p.tun_if.add_vpp_config()
1642 p.tun_if.config_ip4()
1643 p.tun_if.generate_remote_hosts(N_NHS)
1644 self.pg0.generate_remote_hosts(N_NHS)
1645 self.pg0.configure_ipv4_neighbors()
1647 # setup some SAs for several next-hops on the interface
1648 self.multi_params = []
1650 for ii in range(N_NHS):
1651 p = copy.copy(self.ipv4_params)
1653 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1654 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1655 p.scapy_tun_spi = p.scapy_tun_spi + ii
1656 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1657 p.vpp_tun_spi = p.vpp_tun_spi + ii
1659 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1660 p.scapy_tra_spi = p.scapy_tra_spi + ii
1661 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1662 p.vpp_tra_spi = p.vpp_tra_spi + ii
1663 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1664 p.auth_algo_vpp_id, p.auth_key,
1665 p.crypt_algo_vpp_id, p.crypt_key,
1666 self.vpp_esp_protocol)
1667 p.tun_sa_out.add_vpp_config()
1669 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1670 p.auth_algo_vpp_id, p.auth_key,
1671 p.crypt_algo_vpp_id, p.crypt_key,
1672 self.vpp_esp_protocol)
1673 p.tun_sa_in.add_vpp_config()
1675 p.tun_protect = VppIpsecTunProtect(
1680 nh=p.tun_if.remote_hosts[ii].ip4)
1681 p.tun_protect.add_vpp_config()
1682 config_tra_params(p, self.encryption_type, p.tun_if)
1683 self.multi_params.append(p)
1685 VppIpRoute(self, p.remote_tun_if_host, 32,
1686 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1687 p.tun_if.sw_if_index)]).add_vpp_config()
1689 # in this v4 variant add the teibs after the protect
1690 p.teib = VppTeib(self, p.tun_if,
1691 p.tun_if.remote_hosts[ii].ip4,
1692 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1693 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1694 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1697 p = self.ipv4_params
1698 p.tun_if.unconfig_ip4()
1699 super(TestIpsecMGreIfEspTra4, self).tearDown()
1701 def test_tun_44(self):
1704 for p in self.multi_params:
1705 self.verify_tun_44(p, count=N_PKTS)
1706 p.teib.remove_vpp_config()
1707 self.verify_tun_dropped_44(p, count=N_PKTS)
1708 p.teib.add_vpp_config()
1709 self.verify_tun_44(p, count=N_PKTS)
1712 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1713 """ Ipsec mGRE ESP v6 TRA tests """
1714 tun6_encrypt_node_name = "esp6-encrypt-tun"
1715 tun6_decrypt_node_name = "esp6-decrypt-tun"
1716 encryption_type = ESP
1718 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1720 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1721 sa.encrypt(IPv6(src=p.tun_dst,
1722 dst=self.pg0.local_ip6) /
1724 IPv6(src=self.pg1.local_ip6,
1725 dst=self.pg1.remote_ip6) /
1726 UDP(sport=1144, dport=2233) /
1727 Raw(b'X' * payload_size))
1728 for i in range(count)]
1730 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1732 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1733 IPv6(src="1::1", dst=dst) /
1734 UDP(sport=1144, dport=2233) /
1735 Raw(b'X' * payload_size)
1736 for i in range(count)]
1738 def verify_decrypted6(self, p, rxs):
1740 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1741 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1743 def verify_encrypted6(self, p, sa, rxs):
1746 pkt = sa.decrypt(rx[IPv6])
1747 if not pkt.haslayer(IPv6):
1748 pkt = IPv6(pkt[Raw].load)
1749 self.assert_packet_checksums_valid(pkt)
1750 self.assertTrue(pkt.haslayer(GRE))
1752 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1753 except (IndexError, AssertionError):
1754 self.logger.debug(ppp("Unexpected packet:", rx))
1756 self.logger.debug(ppp("Decrypted packet:", pkt))
1762 super(TestIpsecMGreIfEspTra6, self).setUp()
1764 self.vapi.cli("set logging class ipsec level debug")
1767 self.tun_if = self.pg0
1768 p = self.ipv6_params
1769 p.tun_if = VppGreInterface(self,
1772 mode=(VppEnum.vl_api_tunnel_mode_t.
1773 TUNNEL_API_MODE_MP))
1774 p.tun_if.add_vpp_config()
1776 p.tun_if.config_ip6()
1777 p.tun_if.generate_remote_hosts(N_NHS)
1778 self.pg0.generate_remote_hosts(N_NHS)
1779 self.pg0.configure_ipv6_neighbors()
1781 # setup some SAs for several next-hops on the interface
1782 self.multi_params = []
1784 for ii in range(N_NHS):
1785 p = copy.copy(self.ipv6_params)
1787 p.remote_tun_if_host = "1::%d" % (ii + 1)
1788 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1789 p.scapy_tun_spi = p.scapy_tun_spi + ii
1790 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1791 p.vpp_tun_spi = p.vpp_tun_spi + ii
1793 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1794 p.scapy_tra_spi = p.scapy_tra_spi + ii
1795 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1796 p.vpp_tra_spi = p.vpp_tra_spi + ii
1797 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1798 p.auth_algo_vpp_id, p.auth_key,
1799 p.crypt_algo_vpp_id, p.crypt_key,
1800 self.vpp_esp_protocol)
1801 p.tun_sa_out.add_vpp_config()
1803 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1804 p.auth_algo_vpp_id, p.auth_key,
1805 p.crypt_algo_vpp_id, p.crypt_key,
1806 self.vpp_esp_protocol)
1807 p.tun_sa_in.add_vpp_config()
1809 # in this v6 variant add the teibs first then the protection
1810 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1811 VppTeib(self, p.tun_if,
1812 p.tun_if.remote_hosts[ii].ip6,
1813 p.tun_dst).add_vpp_config()
1815 p.tun_protect = VppIpsecTunProtect(
1820 nh=p.tun_if.remote_hosts[ii].ip6)
1821 p.tun_protect.add_vpp_config()
1822 config_tra_params(p, self.encryption_type, p.tun_if)
1823 self.multi_params.append(p)
1825 VppIpRoute(self, p.remote_tun_if_host, 128,
1826 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1827 p.tun_if.sw_if_index)]).add_vpp_config()
1828 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1830 self.logger.info(self.vapi.cli("sh log"))
1831 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1832 self.logger.info(self.vapi.cli("sh adj 41"))
1835 p = self.ipv6_params
1836 p.tun_if.unconfig_ip6()
1837 super(TestIpsecMGreIfEspTra6, self).tearDown()
1839 def test_tun_66(self):
1841 for p in self.multi_params:
1842 self.verify_tun_66(p, count=63)
1845 @tag_fixme_vpp_workers
1846 class TestIpsec4TunProtect(TemplateIpsec,
1847 TemplateIpsec4TunProtect,
1849 """ IPsec IPv4 Tunnel protect - transport mode"""
1852 super(TestIpsec4TunProtect, self).setUp()
1854 self.tun_if = self.pg0
1857 super(TestIpsec4TunProtect, self).tearDown()
1859 def test_tun_44(self):
1860 """IPSEC tunnel protect"""
1862 p = self.ipv4_params
1864 self.config_network(p)
1865 self.config_sa_tra(p)
1866 self.config_protect(p)
1868 self.verify_tun_44(p, count=127)
1869 c = p.tun_if.get_rx_stats()
1870 self.assertEqual(c['packets'], 127)
1871 c = p.tun_if.get_tx_stats()
1872 self.assertEqual(c['packets'], 127)
1874 self.vapi.cli("clear ipsec sa")
1875 self.verify_tun_64(p, count=127)
1876 c = p.tun_if.get_rx_stats()
1877 self.assertEqual(c['packets'], 254)
1878 c = p.tun_if.get_tx_stats()
1879 self.assertEqual(c['packets'], 254)
1881 # rekey - create new SAs and update the tunnel protection
1883 np.crypt_key = b'X' + p.crypt_key[1:]
1884 np.scapy_tun_spi += 100
1885 np.scapy_tun_sa_id += 1
1886 np.vpp_tun_spi += 100
1887 np.vpp_tun_sa_id += 1
1888 np.tun_if.local_spi = p.vpp_tun_spi
1889 np.tun_if.remote_spi = p.scapy_tun_spi
1891 self.config_sa_tra(np)
1892 self.config_protect(np)
1895 self.verify_tun_44(np, count=127)
1896 c = p.tun_if.get_rx_stats()
1897 self.assertEqual(c['packets'], 381)
1898 c = p.tun_if.get_tx_stats()
1899 self.assertEqual(c['packets'], 381)
1902 self.unconfig_protect(np)
1903 self.unconfig_sa(np)
1904 self.unconfig_network(p)
1907 @tag_fixme_vpp_workers
1908 class TestIpsec4TunProtectUdp(TemplateIpsec,
1909 TemplateIpsec4TunProtect,
1911 """ IPsec IPv4 Tunnel protect - transport mode"""
1914 super(TestIpsec4TunProtectUdp, self).setUp()
1916 self.tun_if = self.pg0
1918 p = self.ipv4_params
1919 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1920 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1921 p.nat_header = UDP(sport=4500, dport=4500)
1922 self.config_network(p)
1923 self.config_sa_tra(p)
1924 self.config_protect(p)
1927 p = self.ipv4_params
1928 self.unconfig_protect(p)
1930 self.unconfig_network(p)
1931 super(TestIpsec4TunProtectUdp, self).tearDown()
1933 def verify_encrypted(self, p, sa, rxs):
1934 # ensure encrypted packets are recieved with the default UDP ports
1936 self.assertEqual(rx[UDP].sport, 4500)
1937 self.assertEqual(rx[UDP].dport, 4500)
1938 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1940 def test_tun_44(self):
1941 """IPSEC UDP tunnel protect"""
1943 p = self.ipv4_params
1945 self.verify_tun_44(p, count=127)
1946 c = p.tun_if.get_rx_stats()
1947 self.assertEqual(c['packets'], 127)
1948 c = p.tun_if.get_tx_stats()
1949 self.assertEqual(c['packets'], 127)
1951 def test_keepalive(self):
1952 """ IPSEC NAT Keepalive """
1953 self.verify_keepalive(self.ipv4_params)
1956 @tag_fixme_vpp_workers
1957 class TestIpsec4TunProtectTun(TemplateIpsec,
1958 TemplateIpsec4TunProtect,
1960 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1962 encryption_type = ESP
1963 tun4_encrypt_node_name = "esp4-encrypt-tun"
1964 tun4_decrypt_node_name = "esp4-decrypt-tun"
1967 super(TestIpsec4TunProtectTun, self).setUp()
1969 self.tun_if = self.pg0
1972 super(TestIpsec4TunProtectTun, self).tearDown()
1974 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1976 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1977 sa.encrypt(IP(src=sw_intf.remote_ip4,
1978 dst=sw_intf.local_ip4) /
1979 IP(src=src, dst=dst) /
1980 UDP(sport=1144, dport=2233) /
1981 Raw(b'X' * payload_size))
1982 for i in range(count)]
1984 def gen_pkts(self, sw_intf, src, dst, count=1,
1986 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1987 IP(src=src, dst=dst) /
1988 UDP(sport=1144, dport=2233) /
1989 Raw(b'X' * payload_size)
1990 for i in range(count)]
1992 def verify_decrypted(self, p, rxs):
1994 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1995 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1996 self.assert_packet_checksums_valid(rx)
1998 def verify_encrypted(self, p, sa, rxs):
2001 pkt = sa.decrypt(rx[IP])
2002 if not pkt.haslayer(IP):
2003 pkt = IP(pkt[Raw].load)
2004 self.assert_packet_checksums_valid(pkt)
2005 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2006 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2007 inner = pkt[IP].payload
2008 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2010 except (IndexError, AssertionError):
2011 self.logger.debug(ppp("Unexpected packet:", rx))
2013 self.logger.debug(ppp("Decrypted packet:", pkt))
2018 def test_tun_44(self):
2019 """IPSEC tunnel protect """
2021 p = self.ipv4_params
2023 self.config_network(p)
2024 self.config_sa_tun(p)
2025 self.config_protect(p)
2027 # also add an output features on the tunnel and physical interface
2028 # so we test they still work
2029 r_all = AclRule(True,
2030 src_prefix="0.0.0.0/0",
2031 dst_prefix="0.0.0.0/0",
2033 a = VppAcl(self, [r_all]).add_vpp_config()
2035 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2036 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2038 self.verify_tun_44(p, count=127)
2040 c = p.tun_if.get_rx_stats()
2041 self.assertEqual(c['packets'], 127)
2042 c = p.tun_if.get_tx_stats()
2043 self.assertEqual(c['packets'], 127)
2045 # rekey - create new SAs and update the tunnel protection
2047 np.crypt_key = b'X' + p.crypt_key[1:]
2048 np.scapy_tun_spi += 100
2049 np.scapy_tun_sa_id += 1
2050 np.vpp_tun_spi += 100
2051 np.vpp_tun_sa_id += 1
2052 np.tun_if.local_spi = p.vpp_tun_spi
2053 np.tun_if.remote_spi = p.scapy_tun_spi
2055 self.config_sa_tun(np)
2056 self.config_protect(np)
2059 self.verify_tun_44(np, count=127)
2060 c = p.tun_if.get_rx_stats()
2061 self.assertEqual(c['packets'], 254)
2062 c = p.tun_if.get_tx_stats()
2063 self.assertEqual(c['packets'], 254)
2066 self.unconfig_protect(np)
2067 self.unconfig_sa(np)
2068 self.unconfig_network(p)
2071 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2072 TemplateIpsec4TunProtect,
2074 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2076 encryption_type = ESP
2077 tun4_encrypt_node_name = "esp4-encrypt-tun"
2078 tun4_decrypt_node_name = "esp4-decrypt-tun"
2081 super(TestIpsec4TunProtectTunDrop, self).setUp()
2083 self.tun_if = self.pg0
2086 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2088 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2090 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2091 sa.encrypt(IP(src=sw_intf.remote_ip4,
2093 IP(src=src, dst=dst) /
2094 UDP(sport=1144, dport=2233) /
2095 Raw(b'X' * payload_size))
2096 for i in range(count)]
2098 def test_tun_drop_44(self):
2099 """IPSEC tunnel protect bogus tunnel header """
2101 p = self.ipv4_params
2103 self.config_network(p)
2104 self.config_sa_tun(p)
2105 self.config_protect(p)
2107 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2108 src=p.remote_tun_if_host,
2109 dst=self.pg1.remote_ip4,
2111 self.send_and_assert_no_replies(self.tun_if, tx)
2114 self.unconfig_protect(p)
2116 self.unconfig_network(p)
2119 @tag_fixme_vpp_workers
2120 class TestIpsec6TunProtect(TemplateIpsec,
2121 TemplateIpsec6TunProtect,
2123 """ IPsec IPv6 Tunnel protect - transport mode"""
2125 encryption_type = ESP
2126 tun6_encrypt_node_name = "esp6-encrypt-tun"
2127 tun6_decrypt_node_name = "esp6-decrypt-tun"
2130 super(TestIpsec6TunProtect, self).setUp()
2132 self.tun_if = self.pg0
2135 super(TestIpsec6TunProtect, self).tearDown()
2137 def test_tun_66(self):
2138 """IPSEC tunnel protect 6o6"""
2140 p = self.ipv6_params
2142 self.config_network(p)
2143 self.config_sa_tra(p)
2144 self.config_protect(p)
2146 self.verify_tun_66(p, count=127)
2147 c = p.tun_if.get_rx_stats()
2148 self.assertEqual(c['packets'], 127)
2149 c = p.tun_if.get_tx_stats()
2150 self.assertEqual(c['packets'], 127)
2152 # rekey - create new SAs and update the tunnel protection
2154 np.crypt_key = b'X' + p.crypt_key[1:]
2155 np.scapy_tun_spi += 100
2156 np.scapy_tun_sa_id += 1
2157 np.vpp_tun_spi += 100
2158 np.vpp_tun_sa_id += 1
2159 np.tun_if.local_spi = p.vpp_tun_spi
2160 np.tun_if.remote_spi = p.scapy_tun_spi
2162 self.config_sa_tra(np)
2163 self.config_protect(np)
2166 self.verify_tun_66(np, count=127)
2167 c = p.tun_if.get_rx_stats()
2168 self.assertEqual(c['packets'], 254)
2169 c = p.tun_if.get_tx_stats()
2170 self.assertEqual(c['packets'], 254)
2172 # bounce the interface state
2173 p.tun_if.admin_down()
2174 self.verify_drop_tun_66(np, count=127)
2175 node = ('/err/ipsec6-tun-input/%s' %
2176 'ipsec packets received on disabled interface')
2177 self.assertEqual(127, self.statistics.get_err_counter(node))
2179 self.verify_tun_66(np, count=127)
2182 # 1) add two input SAs [old, new]
2183 # 2) swap output SA to [new]
2184 # 3) use only [new] input SA
2186 np3.crypt_key = b'Z' + p.crypt_key[1:]
2187 np3.scapy_tun_spi += 100
2188 np3.scapy_tun_sa_id += 1
2189 np3.vpp_tun_spi += 100
2190 np3.vpp_tun_sa_id += 1
2191 np3.tun_if.local_spi = p.vpp_tun_spi
2192 np3.tun_if.remote_spi = p.scapy_tun_spi
2194 self.config_sa_tra(np3)
2197 p.tun_protect.update_vpp_config(np.tun_sa_out,
2198 [np.tun_sa_in, np3.tun_sa_in])
2199 self.verify_tun_66(np, np, count=127)
2200 self.verify_tun_66(np3, np, count=127)
2203 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2204 [np.tun_sa_in, np3.tun_sa_in])
2205 self.verify_tun_66(np, np3, count=127)
2206 self.verify_tun_66(np3, np3, count=127)
2209 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2211 self.verify_tun_66(np3, np3, count=127)
2212 self.verify_drop_tun_66(np, count=127)
2214 c = p.tun_if.get_rx_stats()
2215 self.assertEqual(c['packets'], 127*9)
2216 c = p.tun_if.get_tx_stats()
2217 self.assertEqual(c['packets'], 127*8)
2218 self.unconfig_sa(np)
2221 self.unconfig_protect(np3)
2222 self.unconfig_sa(np3)
2223 self.unconfig_network(p)
2225 def test_tun_46(self):
2226 """IPSEC tunnel protect 4o6"""
2228 p = self.ipv6_params
2230 self.config_network(p)
2231 self.config_sa_tra(p)
2232 self.config_protect(p)
2234 self.verify_tun_46(p, count=127)
2235 c = p.tun_if.get_rx_stats()
2236 self.assertEqual(c['packets'], 127)
2237 c = p.tun_if.get_tx_stats()
2238 self.assertEqual(c['packets'], 127)
2241 self.unconfig_protect(p)
2243 self.unconfig_network(p)
2246 @tag_fixme_vpp_workers
2247 class TestIpsec6TunProtectTun(TemplateIpsec,
2248 TemplateIpsec6TunProtect,
2250 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2252 encryption_type = ESP
2253 tun6_encrypt_node_name = "esp6-encrypt-tun"
2254 tun6_decrypt_node_name = "esp6-decrypt-tun"
2257 super(TestIpsec6TunProtectTun, self).setUp()
2259 self.tun_if = self.pg0
2262 super(TestIpsec6TunProtectTun, self).tearDown()
2264 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2266 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2267 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2268 dst=sw_intf.local_ip6) /
2269 IPv6(src=src, dst=dst) /
2270 UDP(sport=1166, dport=2233) /
2271 Raw(b'X' * payload_size))
2272 for i in range(count)]
2274 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2276 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2277 IPv6(src=src, dst=dst) /
2278 UDP(sport=1166, dport=2233) /
2279 Raw(b'X' * payload_size)
2280 for i in range(count)]
2282 def verify_decrypted6(self, p, rxs):
2284 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2285 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2286 self.assert_packet_checksums_valid(rx)
2288 def verify_encrypted6(self, p, sa, rxs):
2291 pkt = sa.decrypt(rx[IPv6])
2292 if not pkt.haslayer(IPv6):
2293 pkt = IPv6(pkt[Raw].load)
2294 self.assert_packet_checksums_valid(pkt)
2295 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2296 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2297 inner = pkt[IPv6].payload
2298 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2300 except (IndexError, AssertionError):
2301 self.logger.debug(ppp("Unexpected packet:", rx))
2303 self.logger.debug(ppp("Decrypted packet:", pkt))
2308 def test_tun_66(self):
2309 """IPSEC tunnel protect """
2311 p = self.ipv6_params
2313 self.config_network(p)
2314 self.config_sa_tun(p)
2315 self.config_protect(p)
2317 self.verify_tun_66(p, count=127)
2319 c = p.tun_if.get_rx_stats()
2320 self.assertEqual(c['packets'], 127)
2321 c = p.tun_if.get_tx_stats()
2322 self.assertEqual(c['packets'], 127)
2324 # rekey - create new SAs and update the tunnel protection
2326 np.crypt_key = b'X' + p.crypt_key[1:]
2327 np.scapy_tun_spi += 100
2328 np.scapy_tun_sa_id += 1
2329 np.vpp_tun_spi += 100
2330 np.vpp_tun_sa_id += 1
2331 np.tun_if.local_spi = p.vpp_tun_spi
2332 np.tun_if.remote_spi = p.scapy_tun_spi
2334 self.config_sa_tun(np)
2335 self.config_protect(np)
2338 self.verify_tun_66(np, count=127)
2339 c = p.tun_if.get_rx_stats()
2340 self.assertEqual(c['packets'], 254)
2341 c = p.tun_if.get_tx_stats()
2342 self.assertEqual(c['packets'], 254)
2345 self.unconfig_protect(np)
2346 self.unconfig_sa(np)
2347 self.unconfig_network(p)
2350 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2351 TemplateIpsec6TunProtect,
2353 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2355 encryption_type = ESP
2356 tun6_encrypt_node_name = "esp6-encrypt-tun"
2357 tun6_decrypt_node_name = "esp6-decrypt-tun"
2360 super(TestIpsec6TunProtectTunDrop, self).setUp()
2362 self.tun_if = self.pg0
2365 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2367 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2369 # the IP destination of the revelaed packet does not match
2370 # that assigned to the tunnel
2371 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2372 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2374 IPv6(src=src, dst=dst) /
2375 UDP(sport=1144, dport=2233) /
2376 Raw(b'X' * payload_size))
2377 for i in range(count)]
2379 def test_tun_drop_66(self):
2380 """IPSEC 6 tunnel protect bogus tunnel header """
2382 p = self.ipv6_params
2384 self.config_network(p)
2385 self.config_sa_tun(p)
2386 self.config_protect(p)
2388 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2389 src=p.remote_tun_if_host,
2390 dst=self.pg1.remote_ip6,
2392 self.send_and_assert_no_replies(self.tun_if, tx)
2394 self.unconfig_protect(p)
2396 self.unconfig_network(p)
2399 class TemplateIpsecItf4(object):
2400 """ IPsec Interface IPv4 """
2402 encryption_type = ESP
2403 tun4_encrypt_node_name = "esp4-encrypt-tun"
2404 tun4_decrypt_node_name = "esp4-decrypt-tun"
2405 tun4_input_node = "ipsec4-tun-input"
2407 def config_sa_tun(self, p, src, dst):
2408 config_tun_params(p, self.encryption_type, None, src, dst)
2410 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2411 p.auth_algo_vpp_id, p.auth_key,
2412 p.crypt_algo_vpp_id, p.crypt_key,
2413 self.vpp_esp_protocol,
2416 p.tun_sa_out.add_vpp_config()
2418 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2419 p.auth_algo_vpp_id, p.auth_key,
2420 p.crypt_algo_vpp_id, p.crypt_key,
2421 self.vpp_esp_protocol,
2424 p.tun_sa_in.add_vpp_config()
2426 def config_protect(self, p):
2427 p.tun_protect = VppIpsecTunProtect(self,
2431 p.tun_protect.add_vpp_config()
2433 def config_network(self, p, instance=0xffffffff):
2434 p.tun_if = VppIpsecInterface(self, instance=instance)
2436 p.tun_if.add_vpp_config()
2438 p.tun_if.config_ip4()
2439 p.tun_if.config_ip6()
2441 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2442 [VppRoutePath(p.tun_if.remote_ip4,
2444 p.route.add_vpp_config()
2445 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2446 [VppRoutePath(p.tun_if.remote_ip6,
2448 proto=DpoProto.DPO_PROTO_IP6)])
2451 def unconfig_network(self, p):
2452 p.route.remove_vpp_config()
2453 p.tun_if.remove_vpp_config()
2455 def unconfig_protect(self, p):
2456 p.tun_protect.remove_vpp_config()
2458 def unconfig_sa(self, p):
2459 p.tun_sa_out.remove_vpp_config()
2460 p.tun_sa_in.remove_vpp_config()
2463 @tag_fixme_vpp_workers
2464 class TestIpsecItf4(TemplateIpsec,
2467 """ IPsec Interface IPv4 """
2470 super(TestIpsecItf4, self).setUp()
2472 self.tun_if = self.pg0
2475 super(TestIpsecItf4, self).tearDown()
2477 def test_tun_instance_44(self):
2478 p = self.ipv4_params
2479 self.config_network(p, instance=3)
2481 with self.assertRaises(CliFailedCommandError):
2482 self.vapi.cli("show interface ipsec0")
2484 output = self.vapi.cli("show interface ipsec3")
2485 self.assertTrue("unknown" not in output)
2487 self.unconfig_network(p)
2489 def test_tun_44(self):
2490 """IPSEC interface IPv4"""
2493 p = self.ipv4_params
2495 self.config_network(p)
2496 self.config_sa_tun(p,
2498 self.pg0.remote_ip4)
2499 self.config_protect(p)
2501 self.verify_tun_44(p, count=n_pkts)
2502 c = p.tun_if.get_rx_stats()
2503 self.assertEqual(c['packets'], n_pkts)
2504 c = p.tun_if.get_tx_stats()
2505 self.assertEqual(c['packets'], n_pkts)
2507 p.tun_if.admin_down()
2508 self.verify_tun_dropped_44(p, count=n_pkts)
2510 self.verify_tun_44(p, count=n_pkts)
2512 c = p.tun_if.get_rx_stats()
2513 self.assertEqual(c['packets'], 3*n_pkts)
2514 c = p.tun_if.get_tx_stats()
2515 self.assertEqual(c['packets'], 2*n_pkts)
2517 # it's a v6 packet when its encrypted
2518 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2520 self.verify_tun_64(p, count=n_pkts)
2521 c = p.tun_if.get_rx_stats()
2522 self.assertEqual(c['packets'], 4*n_pkts)
2523 c = p.tun_if.get_tx_stats()
2524 self.assertEqual(c['packets'], 3*n_pkts)
2526 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2528 self.vapi.cli("clear interfaces")
2530 # rekey - create new SAs and update the tunnel protection
2532 np.crypt_key = b'X' + p.crypt_key[1:]
2533 np.scapy_tun_spi += 100
2534 np.scapy_tun_sa_id += 1
2535 np.vpp_tun_spi += 100
2536 np.vpp_tun_sa_id += 1
2537 np.tun_if.local_spi = p.vpp_tun_spi
2538 np.tun_if.remote_spi = p.scapy_tun_spi
2540 self.config_sa_tun(np,
2542 self.pg0.remote_ip4)
2543 self.config_protect(np)
2546 self.verify_tun_44(np, count=n_pkts)
2547 c = p.tun_if.get_rx_stats()
2548 self.assertEqual(c['packets'], n_pkts)
2549 c = p.tun_if.get_tx_stats()
2550 self.assertEqual(c['packets'], n_pkts)
2553 self.unconfig_protect(np)
2554 self.unconfig_sa(np)
2555 self.unconfig_network(p)
2557 def test_tun_44_null(self):
2558 """IPSEC interface IPv4 NULL auth/crypto"""
2561 p = copy.copy(self.ipv4_params)
2563 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2564 IPSEC_API_INTEG_ALG_NONE)
2565 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2566 IPSEC_API_CRYPTO_ALG_NONE)
2567 p.crypt_algo = "NULL"
2568 p.auth_algo = "NULL"
2570 self.config_network(p)
2571 self.config_sa_tun(p,
2573 self.pg0.remote_ip4)
2574 self.config_protect(p)
2576 self.verify_tun_44(p, count=n_pkts)
2579 self.unconfig_protect(p)
2581 self.unconfig_network(p)
2584 class TestIpsecItf4MPLS(TemplateIpsec,
2587 """ IPsec Interface MPLSoIPv4 """
2589 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2592 super(TestIpsecItf4MPLS, self).setUp()
2594 self.tun_if = self.pg0
2597 super(TestIpsecItf4MPLS, self).tearDown()
2599 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2601 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2602 sa.encrypt(MPLS(label=44, ttl=3) /
2603 IP(src=src, dst=dst) /
2604 UDP(sport=1166, dport=2233) /
2605 Raw(b'X' * payload_size))
2606 for i in range(count)]
2608 def verify_encrypted(self, p, sa, rxs):
2611 pkt = sa.decrypt(rx[IP])
2612 if not pkt.haslayer(IP):
2613 pkt = IP(pkt[Raw].load)
2614 self.assert_packet_checksums_valid(pkt)
2615 self.assert_equal(pkt[MPLS].label, 44)
2616 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2617 except (IndexError, AssertionError):
2618 self.logger.debug(ppp("Unexpected packet:", rx))
2620 self.logger.debug(ppp("Decrypted packet:", pkt))
2625 def test_tun_mpls_o_ip4(self):
2626 """IPSEC interface MPLS over IPv4"""
2629 p = self.ipv4_params
2632 tbl = VppMplsTable(self, 0)
2633 tbl.add_vpp_config()
2635 self.config_network(p)
2636 # deag MPLS routes from the tunnel
2637 r4 = VppMplsRoute(self, 44, 1,
2639 self.pg1.remote_ip4,
2640 self.pg1.sw_if_index)]).add_vpp_config()
2641 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2642 p.tun_if.sw_if_index,
2643 labels=[VppMplsLabel(44)])])
2644 p.tun_if.enable_mpls()
2646 self.config_sa_tun(p,
2648 self.pg0.remote_ip4)
2649 self.config_protect(p)
2651 self.verify_tun_44(p, count=n_pkts)
2654 p.tun_if.disable_mpls()
2655 self.unconfig_protect(p)
2657 self.unconfig_network(p)
2660 class TemplateIpsecItf6(object):
2661 """ IPsec Interface IPv6 """
2663 encryption_type = ESP
2664 tun6_encrypt_node_name = "esp6-encrypt-tun"
2665 tun6_decrypt_node_name = "esp6-decrypt-tun"
2666 tun6_input_node = "ipsec6-tun-input"
2668 def config_sa_tun(self, p, src, dst):
2669 config_tun_params(p, self.encryption_type, None, src, dst)
2671 if not hasattr(p, 'tun_flags'):
2673 if not hasattr(p, 'hop_limit'):
2676 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2677 p.auth_algo_vpp_id, p.auth_key,
2678 p.crypt_algo_vpp_id, p.crypt_key,
2679 self.vpp_esp_protocol,
2682 tun_flags=p.tun_flags,
2683 hop_limit=p.hop_limit)
2684 p.tun_sa_out.add_vpp_config()
2686 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2687 p.auth_algo_vpp_id, p.auth_key,
2688 p.crypt_algo_vpp_id, p.crypt_key,
2689 self.vpp_esp_protocol,
2692 p.tun_sa_in.add_vpp_config()
2694 def config_protect(self, p):
2695 p.tun_protect = VppIpsecTunProtect(self,
2699 p.tun_protect.add_vpp_config()
2701 def config_network(self, p):
2702 p.tun_if = VppIpsecInterface(self)
2704 p.tun_if.add_vpp_config()
2706 p.tun_if.config_ip4()
2707 p.tun_if.config_ip6()
2709 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2710 [VppRoutePath(p.tun_if.remote_ip4,
2714 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2715 [VppRoutePath(p.tun_if.remote_ip6,
2717 proto=DpoProto.DPO_PROTO_IP6)])
2718 p.route.add_vpp_config()
2720 def unconfig_network(self, p):
2721 p.route.remove_vpp_config()
2722 p.tun_if.remove_vpp_config()
2724 def unconfig_protect(self, p):
2725 p.tun_protect.remove_vpp_config()
2727 def unconfig_sa(self, p):
2728 p.tun_sa_out.remove_vpp_config()
2729 p.tun_sa_in.remove_vpp_config()
2732 @tag_fixme_vpp_workers
2733 class TestIpsecItf6(TemplateIpsec,
2736 """ IPsec Interface IPv6 """
2739 super(TestIpsecItf6, self).setUp()
2741 self.tun_if = self.pg0
2744 super(TestIpsecItf6, self).tearDown()
2746 def test_tun_44(self):
2747 """IPSEC interface IPv6"""
2749 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2751 p = self.ipv6_params
2752 p.inner_hop_limit = 24
2753 p.outer_hop_limit = 23
2754 p.outer_flow_label = 243224
2755 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2757 self.config_network(p)
2758 self.config_sa_tun(p,
2760 self.pg0.remote_ip6)
2761 self.config_protect(p)
2763 self.verify_tun_66(p, count=n_pkts)
2764 c = p.tun_if.get_rx_stats()
2765 self.assertEqual(c['packets'], n_pkts)
2766 c = p.tun_if.get_tx_stats()
2767 self.assertEqual(c['packets'], n_pkts)
2769 p.tun_if.admin_down()
2770 self.verify_drop_tun_66(p, count=n_pkts)
2772 self.verify_tun_66(p, count=n_pkts)
2774 c = p.tun_if.get_rx_stats()
2775 self.assertEqual(c['packets'], 3*n_pkts)
2776 c = p.tun_if.get_tx_stats()
2777 self.assertEqual(c['packets'], 2*n_pkts)
2779 # it's a v4 packet when its encrypted
2780 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2782 self.verify_tun_46(p, count=n_pkts)
2783 c = p.tun_if.get_rx_stats()
2784 self.assertEqual(c['packets'], 4*n_pkts)
2785 c = p.tun_if.get_tx_stats()
2786 self.assertEqual(c['packets'], 3*n_pkts)
2788 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2790 self.vapi.cli("clear interfaces")
2792 # rekey - create new SAs and update the tunnel protection
2794 np.crypt_key = b'X' + p.crypt_key[1:]
2795 np.scapy_tun_spi += 100
2796 np.scapy_tun_sa_id += 1
2797 np.vpp_tun_spi += 100
2798 np.vpp_tun_sa_id += 1
2799 np.tun_if.local_spi = p.vpp_tun_spi
2800 np.tun_if.remote_spi = p.scapy_tun_spi
2801 np.inner_hop_limit = 24
2802 np.outer_hop_limit = 128
2803 np.inner_flow_label = 0xabcde
2804 np.outer_flow_label = 0xabcde
2806 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2808 self.config_sa_tun(np,
2810 self.pg0.remote_ip6)
2811 self.config_protect(np)
2814 self.verify_tun_66(np, count=n_pkts)
2815 c = p.tun_if.get_rx_stats()
2816 self.assertEqual(c['packets'], n_pkts)
2817 c = p.tun_if.get_tx_stats()
2818 self.assertEqual(c['packets'], n_pkts)
2821 self.unconfig_protect(np)
2822 self.unconfig_sa(np)
2823 self.unconfig_network(p)
2826 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2827 """ Ipsec P2MP ESP v4 tests """
2828 tun4_encrypt_node_name = "esp4-encrypt-tun"
2829 tun4_decrypt_node_name = "esp4-decrypt-tun"
2830 encryption_type = ESP
2832 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2834 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2835 sa.encrypt(IP(src=self.pg1.local_ip4,
2836 dst=self.pg1.remote_ip4) /
2837 UDP(sport=1144, dport=2233) /
2838 Raw(b'X' * payload_size))
2839 for i in range(count)]
2841 def gen_pkts(self, sw_intf, src, dst, count=1,
2843 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2844 IP(src="1.1.1.1", dst=dst) /
2845 UDP(sport=1144, dport=2233) /
2846 Raw(b'X' * payload_size)
2847 for i in range(count)]
2849 def verify_decrypted(self, p, rxs):
2851 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2852 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2854 def verify_encrypted(self, p, sa, rxs):
2857 self.assertEqual(rx[IP].tos,
2858 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2859 self.assertEqual(rx[IP].ttl, p.hop_limit)
2860 pkt = sa.decrypt(rx[IP])
2861 if not pkt.haslayer(IP):
2862 pkt = IP(pkt[Raw].load)
2863 self.assert_packet_checksums_valid(pkt)
2865 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2866 except (IndexError, AssertionError):
2867 self.logger.debug(ppp("Unexpected packet:", rx))
2869 self.logger.debug(ppp("Decrypted packet:", pkt))
2875 super(TestIpsecMIfEsp4, self).setUp()
2878 self.tun_if = self.pg0
2879 p = self.ipv4_params
2880 p.tun_if = VppIpsecInterface(self,
2881 mode=(VppEnum.vl_api_tunnel_mode_t.
2882 TUNNEL_API_MODE_MP))
2883 p.tun_if.add_vpp_config()
2885 p.tun_if.config_ip4()
2886 p.tun_if.unconfig_ip4()
2887 p.tun_if.config_ip4()
2888 p.tun_if.generate_remote_hosts(N_NHS)
2889 self.pg0.generate_remote_hosts(N_NHS)
2890 self.pg0.configure_ipv4_neighbors()
2892 # setup some SAs for several next-hops on the interface
2893 self.multi_params = []
2895 for ii in range(N_NHS):
2896 p = copy.copy(self.ipv4_params)
2898 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2899 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2900 p.scapy_tun_spi = p.scapy_tun_spi + ii
2901 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2902 p.vpp_tun_spi = p.vpp_tun_spi + ii
2904 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2905 p.scapy_tra_spi = p.scapy_tra_spi + ii
2906 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2907 p.vpp_tra_spi = p.vpp_tra_spi + ii
2909 p.tun_sa_out = VppIpsecSA(
2910 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2911 p.auth_algo_vpp_id, p.auth_key,
2912 p.crypt_algo_vpp_id, p.crypt_key,
2913 self.vpp_esp_protocol,
2915 self.pg0.remote_hosts[ii].ip4,
2916 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
2917 hop_limit=p.hop_limit)
2918 p.tun_sa_out.add_vpp_config()
2920 p.tun_sa_in = VppIpsecSA(
2921 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2922 p.auth_algo_vpp_id, p.auth_key,
2923 p.crypt_algo_vpp_id, p.crypt_key,
2924 self.vpp_esp_protocol,
2925 self.pg0.remote_hosts[ii].ip4,
2927 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
2928 hop_limit=p.hop_limit)
2929 p.tun_sa_in.add_vpp_config()
2931 p.tun_protect = VppIpsecTunProtect(
2936 nh=p.tun_if.remote_hosts[ii].ip4)
2937 p.tun_protect.add_vpp_config()
2938 config_tun_params(p, self.encryption_type, None,
2940 self.pg0.remote_hosts[ii].ip4)
2941 self.multi_params.append(p)
2943 VppIpRoute(self, p.remote_tun_if_host, 32,
2944 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2945 p.tun_if.sw_if_index)]).add_vpp_config()
2947 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2950 p = self.ipv4_params
2951 p.tun_if.unconfig_ip4()
2952 super(TestIpsecMIfEsp4, self).tearDown()
2954 def test_tun_44(self):
2957 for p in self.multi_params:
2958 self.verify_tun_44(p, count=N_PKTS)
2961 class TestIpsecItf6MPLS(TemplateIpsec,
2964 """ IPsec Interface MPLSoIPv6 """
2966 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
2969 super(TestIpsecItf6MPLS, self).setUp()
2971 self.tun_if = self.pg0
2974 super(TestIpsecItf6MPLS, self).tearDown()
2976 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2978 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2979 sa.encrypt(MPLS(label=66, ttl=3) /
2980 IPv6(src=src, dst=dst) /
2981 UDP(sport=1166, dport=2233) /
2982 Raw(b'X' * payload_size))
2983 for i in range(count)]
2985 def verify_encrypted6(self, p, sa, rxs):
2988 pkt = sa.decrypt(rx[IPv6])
2989 if not pkt.haslayer(IPv6):
2990 pkt = IP(pkt[Raw].load)
2991 self.assert_packet_checksums_valid(pkt)
2992 self.assert_equal(pkt[MPLS].label, 66)
2993 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
2994 except (IndexError, AssertionError):
2995 self.logger.debug(ppp("Unexpected packet:", rx))
2997 self.logger.debug(ppp("Decrypted packet:", pkt))
3002 def test_tun_mpls_o_ip6(self):
3003 """IPSEC interface MPLS over IPv6"""
3006 p = self.ipv6_params
3009 tbl = VppMplsTable(self, 0)
3010 tbl.add_vpp_config()
3012 self.config_network(p)
3013 # deag MPLS routes from the tunnel
3014 r6 = VppMplsRoute(self, 66, 1,
3016 self.pg1.remote_ip6,
3017 self.pg1.sw_if_index)],
3018 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3019 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3020 p.tun_if.sw_if_index,
3021 labels=[VppMplsLabel(66)])])
3022 p.tun_if.enable_mpls()
3024 self.config_sa_tun(p,
3026 self.pg0.remote_ip6)
3027 self.config_protect(p)
3029 self.verify_tun_66(p, count=n_pkts)
3032 p.tun_if.disable_mpls()
3033 self.unconfig_protect(p)
3035 self.unconfig_network(p)
3038 if __name__ == '__main__':
3039 unittest.main(testRunner=VppTestRunner)