5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
19 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
20 from vpp_teib import VppTeib
22 from vpp_papi import VppEnum
23 from vpp_papi_provider import CliFailedCommandError
24 from vpp_acl import AclRule, VppAcl, VppAclInterface
27 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
28 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
29 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
30 IPSEC_API_SAD_FLAG_USE_ESN))
31 crypt_key = mk_scapy_crypt_key(p)
33 p.tun_dst = tun_if.remote_ip
34 p.tun_src = tun_if.local_ip
39 p.scapy_tun_sa = SecurityAssociation(
40 encryption_type, spi=p.vpp_tun_spi,
41 crypt_algo=p.crypt_algo,
43 auth_algo=p.auth_algo, auth_key=p.auth_key,
44 tunnel_header=ip_class_by_addr_type[p.addr_type](
47 nat_t_header=p.nat_header,
49 p.vpp_tun_sa = SecurityAssociation(
50 encryption_type, spi=p.scapy_tun_spi,
51 crypt_algo=p.crypt_algo,
53 auth_algo=p.auth_algo, auth_key=p.auth_key,
54 tunnel_header=ip_class_by_addr_type[p.addr_type](
57 nat_t_header=p.nat_header,
61 def config_tra_params(p, encryption_type, tun_if):
62 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
63 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
64 IPSEC_API_SAD_FLAG_USE_ESN))
65 crypt_key = mk_scapy_crypt_key(p)
66 p.tun_dst = tun_if.remote_ip
67 p.tun_src = tun_if.local_ip
68 p.scapy_tun_sa = SecurityAssociation(
69 encryption_type, spi=p.vpp_tun_spi,
70 crypt_algo=p.crypt_algo,
72 auth_algo=p.auth_algo, auth_key=p.auth_key,
74 nat_t_header=p.nat_header)
75 p.vpp_tun_sa = SecurityAssociation(
76 encryption_type, spi=p.scapy_tun_spi,
77 crypt_algo=p.crypt_algo,
79 auth_algo=p.auth_algo, auth_key=p.auth_key,
81 nat_t_header=p.nat_header)
84 class TemplateIpsec4TunProtect(object):
85 """ IPsec IPv4 Tunnel protect """
88 tun4_encrypt_node_name = "esp4-encrypt-tun"
89 tun4_decrypt_node_name = "esp4-decrypt-tun"
90 tun4_input_node = "ipsec4-tun-input"
92 def config_sa_tra(self, p):
93 config_tun_params(p, self.encryption_type, p.tun_if)
95 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
96 p.auth_algo_vpp_id, p.auth_key,
97 p.crypt_algo_vpp_id, p.crypt_key,
98 self.vpp_esp_protocol,
100 p.tun_sa_out.add_vpp_config()
102 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
103 p.auth_algo_vpp_id, p.auth_key,
104 p.crypt_algo_vpp_id, p.crypt_key,
105 self.vpp_esp_protocol,
107 p.tun_sa_in.add_vpp_config()
109 def config_sa_tun(self, p):
110 config_tun_params(p, self.encryption_type, p.tun_if)
112 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
113 p.auth_algo_vpp_id, p.auth_key,
114 p.crypt_algo_vpp_id, p.crypt_key,
115 self.vpp_esp_protocol,
116 self.tun_if.local_addr[p.addr_type],
117 self.tun_if.remote_addr[p.addr_type],
119 p.tun_sa_out.add_vpp_config()
121 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
122 p.auth_algo_vpp_id, p.auth_key,
123 p.crypt_algo_vpp_id, p.crypt_key,
124 self.vpp_esp_protocol,
125 self.tun_if.remote_addr[p.addr_type],
126 self.tun_if.local_addr[p.addr_type],
128 p.tun_sa_in.add_vpp_config()
130 def config_protect(self, p):
131 p.tun_protect = VppIpsecTunProtect(self,
135 p.tun_protect.add_vpp_config()
137 def config_network(self, p):
138 if hasattr(p, 'tun_dst'):
141 tun_dst = self.pg0.remote_ip4
142 p.tun_if = VppIpIpTunInterface(self, self.pg0,
145 p.tun_if.add_vpp_config()
147 p.tun_if.config_ip4()
148 p.tun_if.config_ip6()
150 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
151 [VppRoutePath(p.tun_if.remote_ip4,
153 p.route.add_vpp_config()
154 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
155 [VppRoutePath(p.tun_if.remote_ip6,
157 proto=DpoProto.DPO_PROTO_IP6)])
160 def unconfig_network(self, p):
161 p.route.remove_vpp_config()
162 p.tun_if.remove_vpp_config()
164 def unconfig_protect(self, p):
165 p.tun_protect.remove_vpp_config()
167 def unconfig_sa(self, p):
168 p.tun_sa_out.remove_vpp_config()
169 p.tun_sa_in.remove_vpp_config()
172 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
174 """ IPsec tunnel interface tests """
176 encryption_type = ESP
180 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
183 def tearDownClass(cls):
184 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
187 super(TemplateIpsec4TunIfEsp, self).setUp()
189 self.tun_if = self.pg0
193 self.config_network(p)
194 self.config_sa_tra(p)
195 self.config_protect(p)
198 super(TemplateIpsec4TunIfEsp, self).tearDown()
201 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
203 """ IPsec UDP tunnel interface tests """
205 tun4_encrypt_node_name = "esp4-encrypt-tun"
206 tun4_decrypt_node_name = "esp4-decrypt-tun"
207 encryption_type = ESP
211 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
214 def tearDownClass(cls):
215 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
217 def verify_encrypted(self, p, sa, rxs):
220 # ensure the UDP ports are correct before we decrypt
222 self.assertTrue(rx.haslayer(UDP))
223 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
224 self.assert_equal(rx[UDP].dport, 4500)
226 pkt = sa.decrypt(rx[IP])
227 if not pkt.haslayer(IP):
228 pkt = IP(pkt[Raw].load)
230 self.assert_packet_checksums_valid(pkt)
231 self.assert_equal(pkt[IP].dst, "1.1.1.1")
232 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
233 except (IndexError, AssertionError):
234 self.logger.debug(ppp("Unexpected packet:", rx))
236 self.logger.debug(ppp("Decrypted packet:", pkt))
241 def config_sa_tra(self, p):
242 config_tun_params(p, self.encryption_type, p.tun_if)
244 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
245 p.auth_algo_vpp_id, p.auth_key,
246 p.crypt_algo_vpp_id, p.crypt_key,
247 self.vpp_esp_protocol,
249 udp_src=p.nat_header.sport,
250 udp_dst=p.nat_header.dport)
251 p.tun_sa_out.add_vpp_config()
253 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
254 p.auth_algo_vpp_id, p.auth_key,
255 p.crypt_algo_vpp_id, p.crypt_key,
256 self.vpp_esp_protocol,
258 udp_src=p.nat_header.sport,
259 udp_dst=p.nat_header.dport)
260 p.tun_sa_in.add_vpp_config()
263 super(TemplateIpsec4TunIfEspUdp, self).setUp()
266 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
267 IPSEC_API_SAD_FLAG_UDP_ENCAP)
268 p.nat_header = UDP(sport=5454, dport=4500)
270 self.tun_if = self.pg0
272 self.config_network(p)
273 self.config_sa_tra(p)
274 self.config_protect(p)
277 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
280 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
281 """ Ipsec ESP - TUN tests """
282 tun4_encrypt_node_name = "esp4-encrypt-tun"
283 tun4_decrypt_node_name = "esp4-decrypt-tun"
285 def test_tun_basic64(self):
286 """ ipsec 6o4 tunnel basic test """
287 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
289 self.verify_tun_64(self.params[socket.AF_INET], count=1)
291 def test_tun_burst64(self):
292 """ ipsec 6o4 tunnel basic test """
293 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
295 self.verify_tun_64(self.params[socket.AF_INET], count=257)
297 def test_tun_basic_frag44(self):
298 """ ipsec 4o4 tunnel frag basic test """
299 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
303 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
305 self.verify_tun_44(self.params[socket.AF_INET],
306 count=1, payload_size=1800, n_rx=2)
307 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
311 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
312 """ Ipsec ESP UDP tests """
314 tun4_input_node = "ipsec4-tun-input"
317 super(TestIpsec4TunIfEspUdp, self).setUp()
319 def test_keepalive(self):
320 """ IPSEC NAT Keepalive """
321 self.verify_keepalive(self.ipv4_params)
324 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
325 """ Ipsec ESP UDP GCM tests """
327 tun4_input_node = "ipsec4-tun-input"
330 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
332 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
333 IPSEC_API_INTEG_ALG_NONE)
334 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
335 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
336 p.crypt_algo = "AES-GCM"
338 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
342 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
343 """ Ipsec ESP - TCP tests """
347 class TemplateIpsec6TunProtect(object):
348 """ IPsec IPv6 Tunnel protect """
350 def config_sa_tra(self, p):
351 config_tun_params(p, self.encryption_type, p.tun_if)
353 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
354 p.auth_algo_vpp_id, p.auth_key,
355 p.crypt_algo_vpp_id, p.crypt_key,
356 self.vpp_esp_protocol)
357 p.tun_sa_out.add_vpp_config()
359 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
360 p.auth_algo_vpp_id, p.auth_key,
361 p.crypt_algo_vpp_id, p.crypt_key,
362 self.vpp_esp_protocol)
363 p.tun_sa_in.add_vpp_config()
365 def config_sa_tun(self, p):
366 config_tun_params(p, self.encryption_type, p.tun_if)
368 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
369 p.auth_algo_vpp_id, p.auth_key,
370 p.crypt_algo_vpp_id, p.crypt_key,
371 self.vpp_esp_protocol,
372 self.tun_if.local_addr[p.addr_type],
373 self.tun_if.remote_addr[p.addr_type])
374 p.tun_sa_out.add_vpp_config()
376 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
377 p.auth_algo_vpp_id, p.auth_key,
378 p.crypt_algo_vpp_id, p.crypt_key,
379 self.vpp_esp_protocol,
380 self.tun_if.remote_addr[p.addr_type],
381 self.tun_if.local_addr[p.addr_type])
382 p.tun_sa_in.add_vpp_config()
384 def config_protect(self, p):
385 p.tun_protect = VppIpsecTunProtect(self,
389 p.tun_protect.add_vpp_config()
391 def config_network(self, p):
392 if hasattr(p, 'tun_dst'):
395 tun_dst = self.pg0.remote_ip6
396 p.tun_if = VppIpIpTunInterface(self, self.pg0,
399 p.tun_if.add_vpp_config()
401 p.tun_if.config_ip6()
402 p.tun_if.config_ip4()
404 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
405 [VppRoutePath(p.tun_if.remote_ip6,
407 proto=DpoProto.DPO_PROTO_IP6)])
408 p.route.add_vpp_config()
409 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
410 [VppRoutePath(p.tun_if.remote_ip4,
414 def unconfig_network(self, p):
415 p.route.remove_vpp_config()
416 p.tun_if.remove_vpp_config()
418 def unconfig_protect(self, p):
419 p.tun_protect.remove_vpp_config()
421 def unconfig_sa(self, p):
422 p.tun_sa_out.remove_vpp_config()
423 p.tun_sa_in.remove_vpp_config()
426 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
428 """ IPsec tunnel interface tests """
430 encryption_type = ESP
433 super(TemplateIpsec6TunIfEsp, self).setUp()
435 self.tun_if = self.pg0
438 self.config_network(p)
439 self.config_sa_tra(p)
440 self.config_protect(p)
443 super(TemplateIpsec6TunIfEsp, self).tearDown()
446 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
448 """ Ipsec ESP - TUN tests """
449 tun6_encrypt_node_name = "esp6-encrypt-tun"
450 tun6_decrypt_node_name = "esp6-decrypt-tun"
452 def test_tun_basic46(self):
453 """ ipsec 4o6 tunnel basic test """
454 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
455 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
457 def test_tun_burst46(self):
458 """ ipsec 4o6 tunnel burst test """
459 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
460 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
463 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
464 IpsecTun6HandoffTests):
465 """ Ipsec ESP 6 Handoff tests """
466 tun6_encrypt_node_name = "esp6-encrypt-tun"
467 tun6_decrypt_node_name = "esp6-decrypt-tun"
470 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
471 IpsecTun4HandoffTests):
472 """ Ipsec ESP 4 Handoff tests """
473 tun4_encrypt_node_name = "esp4-encrypt-tun"
474 tun4_decrypt_node_name = "esp4-decrypt-tun"
477 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
480 """ IPsec IPv4 Multi Tunnel interface """
482 encryption_type = ESP
483 tun4_encrypt_node_name = "esp4-encrypt-tun"
484 tun4_decrypt_node_name = "esp4-decrypt-tun"
487 super(TestIpsec4MultiTunIfEsp, self).setUp()
489 self.tun_if = self.pg0
491 self.multi_params = []
492 self.pg0.generate_remote_hosts(10)
493 self.pg0.configure_ipv4_neighbors()
496 p = copy.copy(self.ipv4_params)
498 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
499 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
500 p.scapy_tun_spi = p.scapy_tun_spi + ii
501 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
502 p.vpp_tun_spi = p.vpp_tun_spi + ii
504 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
505 p.scapy_tra_spi = p.scapy_tra_spi + ii
506 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
507 p.vpp_tra_spi = p.vpp_tra_spi + ii
508 p.tun_dst = self.pg0.remote_hosts[ii].ip4
510 self.multi_params.append(p)
511 self.config_network(p)
512 self.config_sa_tra(p)
513 self.config_protect(p)
516 super(TestIpsec4MultiTunIfEsp, self).tearDown()
518 def test_tun_44(self):
519 """Multiple IPSEC tunnel interfaces """
520 for p in self.multi_params:
521 self.verify_tun_44(p, count=127)
522 c = p.tun_if.get_rx_stats()
523 self.assertEqual(c['packets'], 127)
524 c = p.tun_if.get_tx_stats()
525 self.assertEqual(c['packets'], 127)
527 def test_tun_rr_44(self):
528 """ Round-robin packets acrros multiple interface """
530 for p in self.multi_params:
531 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
532 src=p.remote_tun_if_host,
533 dst=self.pg1.remote_ip4)
534 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
536 for rx, p in zip(rxs, self.multi_params):
537 self.verify_decrypted(p, [rx])
540 for p in self.multi_params:
541 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
542 dst=p.remote_tun_if_host)
543 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
545 for rx, p in zip(rxs, self.multi_params):
546 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
549 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
552 """ IPsec IPv4 Tunnel interface all Algos """
554 encryption_type = ESP
555 tun4_encrypt_node_name = "esp4-encrypt-tun"
556 tun4_decrypt_node_name = "esp4-decrypt-tun"
559 super(TestIpsec4TunIfEspAll, self).setUp()
561 self.tun_if = self.pg0
564 self.config_network(p)
565 self.config_sa_tra(p)
566 self.config_protect(p)
570 self.unconfig_protect(p)
571 self.unconfig_network(p)
574 super(TestIpsec4TunIfEspAll, self).tearDown()
578 # change the key and the SPI
581 p.crypt_key = b'X' + p.crypt_key[1:]
583 p.scapy_tun_sa_id += 1
586 p.tun_if.local_spi = p.vpp_tun_spi
587 p.tun_if.remote_spi = p.scapy_tun_spi
589 config_tun_params(p, self.encryption_type, p.tun_if)
591 p.tun_sa_out = VppIpsecSA(self,
598 self.vpp_esp_protocol,
601 p.tun_sa_in = VppIpsecSA(self,
608 self.vpp_esp_protocol,
611 p.tun_sa_in.add_vpp_config()
612 p.tun_sa_out.add_vpp_config()
614 self.config_protect(p)
615 np.tun_sa_out.remove_vpp_config()
616 np.tun_sa_in.remove_vpp_config()
617 self.logger.info(self.vapi.cli("sh ipsec sa"))
619 def test_tun_44(self):
620 """IPSEC tunnel all algos """
622 # foreach VPP crypto engine
623 engines = ["ia32", "ipsecmb", "openssl"]
625 # foreach crypto algorithm
626 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
627 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
628 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
629 IPSEC_API_INTEG_ALG_NONE),
630 'scapy-crypto': "AES-GCM",
631 'scapy-integ': "NULL",
632 'key': b"JPjyOWBeVEQiMe7h",
634 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
635 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
636 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
637 IPSEC_API_INTEG_ALG_NONE),
638 'scapy-crypto': "AES-GCM",
639 'scapy-integ': "NULL",
640 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
642 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
643 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
644 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
645 IPSEC_API_INTEG_ALG_NONE),
646 'scapy-crypto': "AES-GCM",
647 'scapy-integ': "NULL",
648 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
650 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
651 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
652 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
653 IPSEC_API_INTEG_ALG_SHA1_96),
654 'scapy-crypto': "AES-CBC",
655 'scapy-integ': "HMAC-SHA1-96",
657 'key': b"JPjyOWBeVEQiMe7h"},
658 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
659 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
660 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
661 IPSEC_API_INTEG_ALG_SHA_512_256),
662 'scapy-crypto': "AES-CBC",
663 'scapy-integ': "SHA2-512-256",
665 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
666 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
667 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
668 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
669 IPSEC_API_INTEG_ALG_SHA_256_128),
670 'scapy-crypto': "AES-CBC",
671 'scapy-integ': "SHA2-256-128",
673 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
674 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
675 IPSEC_API_CRYPTO_ALG_NONE),
676 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
677 IPSEC_API_INTEG_ALG_SHA1_96),
678 'scapy-crypto': "NULL",
679 'scapy-integ': "HMAC-SHA1-96",
681 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
683 for engine in engines:
684 self.vapi.cli("set crypto handler all %s" % engine)
687 # loop through each of the algorithms
690 # with self.subTest(algo=algo['scapy']):
693 p.auth_algo_vpp_id = algo['vpp-integ']
694 p.crypt_algo_vpp_id = algo['vpp-crypto']
695 p.crypt_algo = algo['scapy-crypto']
696 p.auth_algo = algo['scapy-integ']
697 p.crypt_key = algo['key']
698 p.salt = algo['salt']
704 self.verify_tun_44(p, count=127)
707 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
710 """ IPsec IPv4 Tunnel interface no Algos """
712 encryption_type = ESP
713 tun4_encrypt_node_name = "esp4-encrypt-tun"
714 tun4_decrypt_node_name = "esp4-decrypt-tun"
717 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
719 self.tun_if = self.pg0
721 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
722 IPSEC_API_INTEG_ALG_NONE)
726 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
727 IPSEC_API_CRYPTO_ALG_NONE)
728 p.crypt_algo = 'NULL'
732 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
734 def test_tun_44(self):
735 """ IPSec SA with NULL algos """
738 self.config_network(p)
739 self.config_sa_tra(p)
740 self.config_protect(p)
742 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
743 dst=p.remote_tun_if_host)
744 self.send_and_assert_no_replies(self.pg1, tx)
746 self.unconfig_protect(p)
748 self.unconfig_network(p)
751 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
754 """ IPsec IPv6 Multi Tunnel interface """
756 encryption_type = ESP
757 tun6_encrypt_node_name = "esp6-encrypt-tun"
758 tun6_decrypt_node_name = "esp6-decrypt-tun"
761 super(TestIpsec6MultiTunIfEsp, self).setUp()
763 self.tun_if = self.pg0
765 self.multi_params = []
766 self.pg0.generate_remote_hosts(10)
767 self.pg0.configure_ipv6_neighbors()
770 p = copy.copy(self.ipv6_params)
772 p.remote_tun_if_host = "1111::%d" % (ii + 1)
773 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
774 p.scapy_tun_spi = p.scapy_tun_spi + ii
775 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
776 p.vpp_tun_spi = p.vpp_tun_spi + ii
778 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
779 p.scapy_tra_spi = p.scapy_tra_spi + ii
780 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
781 p.vpp_tra_spi = p.vpp_tra_spi + ii
782 p.tun_dst = self.pg0.remote_hosts[ii].ip6
784 self.multi_params.append(p)
785 self.config_network(p)
786 self.config_sa_tra(p)
787 self.config_protect(p)
790 super(TestIpsec6MultiTunIfEsp, self).tearDown()
792 def test_tun_66(self):
793 """Multiple IPSEC tunnel interfaces """
794 for p in self.multi_params:
795 self.verify_tun_66(p, count=127)
796 c = p.tun_if.get_rx_stats()
797 self.assertEqual(c['packets'], 127)
798 c = p.tun_if.get_tx_stats()
799 self.assertEqual(c['packets'], 127)
802 class TestIpsecGreTebIfEsp(TemplateIpsec,
804 """ Ipsec GRE TEB ESP - TUN tests """
805 tun4_encrypt_node_name = "esp4-encrypt-tun"
806 tun4_decrypt_node_name = "esp4-decrypt-tun"
807 encryption_type = ESP
808 omac = "00:11:22:33:44:55"
810 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
812 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
813 sa.encrypt(IP(src=self.pg0.remote_ip4,
814 dst=self.pg0.local_ip4) /
816 Ether(dst=self.omac) /
817 IP(src="1.1.1.1", dst="1.1.1.2") /
818 UDP(sport=1144, dport=2233) /
819 Raw(b'X' * payload_size))
820 for i in range(count)]
822 def gen_pkts(self, sw_intf, src, dst, count=1,
824 return [Ether(dst=self.omac) /
825 IP(src="1.1.1.1", dst="1.1.1.2") /
826 UDP(sport=1144, dport=2233) /
827 Raw(b'X' * payload_size)
828 for i in range(count)]
830 def verify_decrypted(self, p, rxs):
832 self.assert_equal(rx[Ether].dst, self.omac)
833 self.assert_equal(rx[IP].dst, "1.1.1.2")
835 def verify_encrypted(self, p, sa, rxs):
838 pkt = sa.decrypt(rx[IP])
839 if not pkt.haslayer(IP):
840 pkt = IP(pkt[Raw].load)
841 self.assert_packet_checksums_valid(pkt)
842 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
843 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
844 self.assertTrue(pkt.haslayer(GRE))
846 self.assertEqual(e[Ether].dst, self.omac)
847 self.assertEqual(e[IP].dst, "1.1.1.2")
848 except (IndexError, AssertionError):
849 self.logger.debug(ppp("Unexpected packet:", rx))
851 self.logger.debug(ppp("Decrypted packet:", pkt))
857 super(TestIpsecGreTebIfEsp, self).setUp()
859 self.tun_if = self.pg0
863 bd1 = VppBridgeDomain(self, 1)
866 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
867 p.auth_algo_vpp_id, p.auth_key,
868 p.crypt_algo_vpp_id, p.crypt_key,
869 self.vpp_esp_protocol,
872 p.tun_sa_out.add_vpp_config()
874 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
875 p.auth_algo_vpp_id, p.auth_key,
876 p.crypt_algo_vpp_id, p.crypt_key,
877 self.vpp_esp_protocol,
880 p.tun_sa_in.add_vpp_config()
882 p.tun_if = VppGreInterface(self,
885 type=(VppEnum.vl_api_gre_tunnel_type_t.
886 GRE_API_TUNNEL_TYPE_TEB))
887 p.tun_if.add_vpp_config()
889 p.tun_protect = VppIpsecTunProtect(self,
894 p.tun_protect.add_vpp_config()
897 p.tun_if.config_ip4()
898 config_tun_params(p, self.encryption_type, p.tun_if)
900 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
901 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
903 self.vapi.cli("clear ipsec sa")
904 self.vapi.cli("sh adj")
905 self.vapi.cli("sh ipsec tun")
909 p.tun_if.unconfig_ip4()
910 super(TestIpsecGreTebIfEsp, self).tearDown()
913 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
915 """ Ipsec GRE TEB ESP - TUN tests """
916 tun4_encrypt_node_name = "esp4-encrypt-tun"
917 tun4_decrypt_node_name = "esp4-decrypt-tun"
918 encryption_type = ESP
919 omac = "00:11:22:33:44:55"
921 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
923 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
924 sa.encrypt(IP(src=self.pg0.remote_ip4,
925 dst=self.pg0.local_ip4) /
927 Ether(dst=self.omac) /
928 IP(src="1.1.1.1", dst="1.1.1.2") /
929 UDP(sport=1144, dport=2233) /
930 Raw(b'X' * payload_size))
931 for i in range(count)]
933 def gen_pkts(self, sw_intf, src, dst, count=1,
935 return [Ether(dst=self.omac) /
937 IP(src="1.1.1.1", dst="1.1.1.2") /
938 UDP(sport=1144, dport=2233) /
939 Raw(b'X' * payload_size)
940 for i in range(count)]
942 def verify_decrypted(self, p, rxs):
944 self.assert_equal(rx[Ether].dst, self.omac)
945 self.assert_equal(rx[Dot1Q].vlan, 11)
946 self.assert_equal(rx[IP].dst, "1.1.1.2")
948 def verify_encrypted(self, p, sa, rxs):
951 pkt = sa.decrypt(rx[IP])
952 if not pkt.haslayer(IP):
953 pkt = IP(pkt[Raw].load)
954 self.assert_packet_checksums_valid(pkt)
955 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
956 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
957 self.assertTrue(pkt.haslayer(GRE))
959 self.assertEqual(e[Ether].dst, self.omac)
960 self.assertFalse(e.haslayer(Dot1Q))
961 self.assertEqual(e[IP].dst, "1.1.1.2")
962 except (IndexError, AssertionError):
963 self.logger.debug(ppp("Unexpected packet:", rx))
965 self.logger.debug(ppp("Decrypted packet:", pkt))
971 super(TestIpsecGreTebVlanIfEsp, self).setUp()
973 self.tun_if = self.pg0
977 bd1 = VppBridgeDomain(self, 1)
980 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
981 self.vapi.l2_interface_vlan_tag_rewrite(
982 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
984 self.pg1_11.admin_up()
986 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
987 p.auth_algo_vpp_id, p.auth_key,
988 p.crypt_algo_vpp_id, p.crypt_key,
989 self.vpp_esp_protocol,
992 p.tun_sa_out.add_vpp_config()
994 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
995 p.auth_algo_vpp_id, p.auth_key,
996 p.crypt_algo_vpp_id, p.crypt_key,
997 self.vpp_esp_protocol,
1000 p.tun_sa_in.add_vpp_config()
1002 p.tun_if = VppGreInterface(self,
1004 self.pg0.remote_ip4,
1005 type=(VppEnum.vl_api_gre_tunnel_type_t.
1006 GRE_API_TUNNEL_TYPE_TEB))
1007 p.tun_if.add_vpp_config()
1009 p.tun_protect = VppIpsecTunProtect(self,
1014 p.tun_protect.add_vpp_config()
1017 p.tun_if.config_ip4()
1018 config_tun_params(p, self.encryption_type, p.tun_if)
1020 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1021 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1023 self.vapi.cli("clear ipsec sa")
1026 p = self.ipv4_params
1027 p.tun_if.unconfig_ip4()
1028 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1029 self.pg1_11.admin_down()
1030 self.pg1_11.remove_vpp_config()
1033 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1035 """ Ipsec GRE TEB ESP - Tra tests """
1036 tun4_encrypt_node_name = "esp4-encrypt-tun"
1037 tun4_decrypt_node_name = "esp4-decrypt-tun"
1038 encryption_type = ESP
1039 omac = "00:11:22:33:44:55"
1041 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1043 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1044 sa.encrypt(IP(src=self.pg0.remote_ip4,
1045 dst=self.pg0.local_ip4) /
1047 Ether(dst=self.omac) /
1048 IP(src="1.1.1.1", dst="1.1.1.2") /
1049 UDP(sport=1144, dport=2233) /
1050 Raw(b'X' * payload_size))
1051 for i in range(count)]
1053 def gen_pkts(self, sw_intf, src, dst, count=1,
1055 return [Ether(dst=self.omac) /
1056 IP(src="1.1.1.1", dst="1.1.1.2") /
1057 UDP(sport=1144, dport=2233) /
1058 Raw(b'X' * payload_size)
1059 for i in range(count)]
1061 def verify_decrypted(self, p, rxs):
1063 self.assert_equal(rx[Ether].dst, self.omac)
1064 self.assert_equal(rx[IP].dst, "1.1.1.2")
1066 def verify_encrypted(self, p, sa, rxs):
1069 pkt = sa.decrypt(rx[IP])
1070 if not pkt.haslayer(IP):
1071 pkt = IP(pkt[Raw].load)
1072 self.assert_packet_checksums_valid(pkt)
1073 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1074 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1075 self.assertTrue(pkt.haslayer(GRE))
1077 self.assertEqual(e[Ether].dst, self.omac)
1078 self.assertEqual(e[IP].dst, "1.1.1.2")
1079 except (IndexError, AssertionError):
1080 self.logger.debug(ppp("Unexpected packet:", rx))
1082 self.logger.debug(ppp("Decrypted packet:", pkt))
1088 super(TestIpsecGreTebIfEspTra, self).setUp()
1090 self.tun_if = self.pg0
1092 p = self.ipv4_params
1094 bd1 = VppBridgeDomain(self, 1)
1095 bd1.add_vpp_config()
1097 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1098 p.auth_algo_vpp_id, p.auth_key,
1099 p.crypt_algo_vpp_id, p.crypt_key,
1100 self.vpp_esp_protocol)
1101 p.tun_sa_out.add_vpp_config()
1103 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1104 p.auth_algo_vpp_id, p.auth_key,
1105 p.crypt_algo_vpp_id, p.crypt_key,
1106 self.vpp_esp_protocol)
1107 p.tun_sa_in.add_vpp_config()
1109 p.tun_if = VppGreInterface(self,
1111 self.pg0.remote_ip4,
1112 type=(VppEnum.vl_api_gre_tunnel_type_t.
1113 GRE_API_TUNNEL_TYPE_TEB))
1114 p.tun_if.add_vpp_config()
1116 p.tun_protect = VppIpsecTunProtect(self,
1121 p.tun_protect.add_vpp_config()
1124 p.tun_if.config_ip4()
1125 config_tra_params(p, self.encryption_type, p.tun_if)
1127 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1128 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1130 self.vapi.cli("clear ipsec sa")
1133 p = self.ipv4_params
1134 p.tun_if.unconfig_ip4()
1135 super(TestIpsecGreTebIfEspTra, self).tearDown()
1138 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1140 """ Ipsec GRE TEB UDP ESP - Tra tests """
1141 tun4_encrypt_node_name = "esp4-encrypt-tun"
1142 tun4_decrypt_node_name = "esp4-decrypt-tun"
1143 encryption_type = ESP
1144 omac = "00:11:22:33:44:55"
1146 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1148 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1149 sa.encrypt(IP(src=self.pg0.remote_ip4,
1150 dst=self.pg0.local_ip4) /
1152 Ether(dst=self.omac) /
1153 IP(src="1.1.1.1", dst="1.1.1.2") /
1154 UDP(sport=1144, dport=2233) /
1155 Raw(b'X' * payload_size))
1156 for i in range(count)]
1158 def gen_pkts(self, sw_intf, src, dst, count=1,
1160 return [Ether(dst=self.omac) /
1161 IP(src="1.1.1.1", dst="1.1.1.2") /
1162 UDP(sport=1144, dport=2233) /
1163 Raw(b'X' * payload_size)
1164 for i in range(count)]
1166 def verify_decrypted(self, p, rxs):
1168 self.assert_equal(rx[Ether].dst, self.omac)
1169 self.assert_equal(rx[IP].dst, "1.1.1.2")
1171 def verify_encrypted(self, p, sa, rxs):
1173 self.assertTrue(rx.haslayer(UDP))
1174 self.assertEqual(rx[UDP].dport, 4545)
1175 self.assertEqual(rx[UDP].sport, 5454)
1177 pkt = sa.decrypt(rx[IP])
1178 if not pkt.haslayer(IP):
1179 pkt = IP(pkt[Raw].load)
1180 self.assert_packet_checksums_valid(pkt)
1181 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1182 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1183 self.assertTrue(pkt.haslayer(GRE))
1185 self.assertEqual(e[Ether].dst, self.omac)
1186 self.assertEqual(e[IP].dst, "1.1.1.2")
1187 except (IndexError, AssertionError):
1188 self.logger.debug(ppp("Unexpected packet:", rx))
1190 self.logger.debug(ppp("Decrypted packet:", pkt))
1196 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1198 self.tun_if = self.pg0
1200 p = self.ipv4_params
1201 p = self.ipv4_params
1202 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1203 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1204 p.nat_header = UDP(sport=5454, dport=4545)
1206 bd1 = VppBridgeDomain(self, 1)
1207 bd1.add_vpp_config()
1209 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1210 p.auth_algo_vpp_id, p.auth_key,
1211 p.crypt_algo_vpp_id, p.crypt_key,
1212 self.vpp_esp_protocol,
1216 p.tun_sa_out.add_vpp_config()
1218 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1219 p.auth_algo_vpp_id, p.auth_key,
1220 p.crypt_algo_vpp_id, p.crypt_key,
1221 self.vpp_esp_protocol,
1223 VppEnum.vl_api_ipsec_sad_flags_t.
1224 IPSEC_API_SAD_FLAG_IS_INBOUND),
1227 p.tun_sa_in.add_vpp_config()
1229 p.tun_if = VppGreInterface(self,
1231 self.pg0.remote_ip4,
1232 type=(VppEnum.vl_api_gre_tunnel_type_t.
1233 GRE_API_TUNNEL_TYPE_TEB))
1234 p.tun_if.add_vpp_config()
1236 p.tun_protect = VppIpsecTunProtect(self,
1241 p.tun_protect.add_vpp_config()
1244 p.tun_if.config_ip4()
1245 config_tra_params(p, self.encryption_type, p.tun_if)
1247 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1248 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1250 self.vapi.cli("clear ipsec sa")
1251 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1254 p = self.ipv4_params
1255 p.tun_if.unconfig_ip4()
1256 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1259 class TestIpsecGreIfEsp(TemplateIpsec,
1261 """ Ipsec GRE ESP - TUN tests """
1262 tun4_encrypt_node_name = "esp4-encrypt-tun"
1263 tun4_decrypt_node_name = "esp4-decrypt-tun"
1264 encryption_type = ESP
1266 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1268 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1269 sa.encrypt(IP(src=self.pg0.remote_ip4,
1270 dst=self.pg0.local_ip4) /
1272 IP(src=self.pg1.local_ip4,
1273 dst=self.pg1.remote_ip4) /
1274 UDP(sport=1144, dport=2233) /
1275 Raw(b'X' * payload_size))
1276 for i in range(count)]
1278 def gen_pkts(self, sw_intf, src, dst, count=1,
1280 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1281 IP(src="1.1.1.1", dst="1.1.1.2") /
1282 UDP(sport=1144, dport=2233) /
1283 Raw(b'X' * payload_size)
1284 for i in range(count)]
1286 def verify_decrypted(self, p, rxs):
1288 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1289 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1291 def verify_encrypted(self, p, sa, rxs):
1294 pkt = sa.decrypt(rx[IP])
1295 if not pkt.haslayer(IP):
1296 pkt = IP(pkt[Raw].load)
1297 self.assert_packet_checksums_valid(pkt)
1298 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1299 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1300 self.assertTrue(pkt.haslayer(GRE))
1302 self.assertEqual(e[IP].dst, "1.1.1.2")
1303 except (IndexError, AssertionError):
1304 self.logger.debug(ppp("Unexpected packet:", rx))
1306 self.logger.debug(ppp("Decrypted packet:", pkt))
1312 super(TestIpsecGreIfEsp, self).setUp()
1314 self.tun_if = self.pg0
1316 p = self.ipv4_params
1318 bd1 = VppBridgeDomain(self, 1)
1319 bd1.add_vpp_config()
1321 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1322 p.auth_algo_vpp_id, p.auth_key,
1323 p.crypt_algo_vpp_id, p.crypt_key,
1324 self.vpp_esp_protocol,
1326 self.pg0.remote_ip4)
1327 p.tun_sa_out.add_vpp_config()
1329 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1330 p.auth_algo_vpp_id, p.auth_key,
1331 p.crypt_algo_vpp_id, p.crypt_key,
1332 self.vpp_esp_protocol,
1333 self.pg0.remote_ip4,
1335 p.tun_sa_in.add_vpp_config()
1337 p.tun_if = VppGreInterface(self,
1339 self.pg0.remote_ip4)
1340 p.tun_if.add_vpp_config()
1342 p.tun_protect = VppIpsecTunProtect(self,
1346 p.tun_protect.add_vpp_config()
1349 p.tun_if.config_ip4()
1350 config_tun_params(p, self.encryption_type, p.tun_if)
1352 VppIpRoute(self, "1.1.1.2", 32,
1353 [VppRoutePath(p.tun_if.remote_ip4,
1354 0xffffffff)]).add_vpp_config()
1357 p = self.ipv4_params
1358 p.tun_if.unconfig_ip4()
1359 super(TestIpsecGreIfEsp, self).tearDown()
1362 class TestIpsecGreIfEspTra(TemplateIpsec,
1364 """ Ipsec GRE ESP - TRA tests """
1365 tun4_encrypt_node_name = "esp4-encrypt-tun"
1366 tun4_decrypt_node_name = "esp4-decrypt-tun"
1367 encryption_type = ESP
1369 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1371 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1372 sa.encrypt(IP(src=self.pg0.remote_ip4,
1373 dst=self.pg0.local_ip4) /
1375 IP(src=self.pg1.local_ip4,
1376 dst=self.pg1.remote_ip4) /
1377 UDP(sport=1144, dport=2233) /
1378 Raw(b'X' * payload_size))
1379 for i in range(count)]
1381 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1383 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1384 sa.encrypt(IP(src=self.pg0.remote_ip4,
1385 dst=self.pg0.local_ip4) /
1387 UDP(sport=1144, dport=2233) /
1388 Raw(b'X' * payload_size))
1389 for i in range(count)]
1391 def gen_pkts(self, sw_intf, src, dst, count=1,
1393 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1394 IP(src="1.1.1.1", dst="1.1.1.2") /
1395 UDP(sport=1144, dport=2233) /
1396 Raw(b'X' * payload_size)
1397 for i in range(count)]
1399 def verify_decrypted(self, p, rxs):
1401 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1402 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1404 def verify_encrypted(self, p, sa, rxs):
1407 pkt = sa.decrypt(rx[IP])
1408 if not pkt.haslayer(IP):
1409 pkt = IP(pkt[Raw].load)
1410 self.assert_packet_checksums_valid(pkt)
1411 self.assertTrue(pkt.haslayer(GRE))
1413 self.assertEqual(e[IP].dst, "1.1.1.2")
1414 except (IndexError, AssertionError):
1415 self.logger.debug(ppp("Unexpected packet:", rx))
1417 self.logger.debug(ppp("Decrypted packet:", pkt))
1423 super(TestIpsecGreIfEspTra, self).setUp()
1425 self.tun_if = self.pg0
1427 p = self.ipv4_params
1429 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1430 p.auth_algo_vpp_id, p.auth_key,
1431 p.crypt_algo_vpp_id, p.crypt_key,
1432 self.vpp_esp_protocol)
1433 p.tun_sa_out.add_vpp_config()
1435 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1436 p.auth_algo_vpp_id, p.auth_key,
1437 p.crypt_algo_vpp_id, p.crypt_key,
1438 self.vpp_esp_protocol)
1439 p.tun_sa_in.add_vpp_config()
1441 p.tun_if = VppGreInterface(self,
1443 self.pg0.remote_ip4)
1444 p.tun_if.add_vpp_config()
1446 p.tun_protect = VppIpsecTunProtect(self,
1450 p.tun_protect.add_vpp_config()
1453 p.tun_if.config_ip4()
1454 config_tra_params(p, self.encryption_type, p.tun_if)
1456 VppIpRoute(self, "1.1.1.2", 32,
1457 [VppRoutePath(p.tun_if.remote_ip4,
1458 0xffffffff)]).add_vpp_config()
1461 p = self.ipv4_params
1462 p.tun_if.unconfig_ip4()
1463 super(TestIpsecGreIfEspTra, self).tearDown()
1465 def test_gre_non_ip(self):
1466 p = self.ipv4_params
1467 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1468 src=p.remote_tun_if_host,
1469 dst=self.pg1.remote_ip6)
1470 self.send_and_assert_no_replies(self.tun_if, tx)
1471 node_name = ('/err/%s/unsupported payload' %
1472 self.tun4_decrypt_node_name)
1473 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1476 class TestIpsecGre6IfEspTra(TemplateIpsec,
1478 """ Ipsec GRE ESP - TRA tests """
1479 tun6_encrypt_node_name = "esp6-encrypt-tun"
1480 tun6_decrypt_node_name = "esp6-decrypt-tun"
1481 encryption_type = ESP
1483 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1485 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1486 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1487 dst=self.pg0.local_ip6) /
1489 IPv6(src=self.pg1.local_ip6,
1490 dst=self.pg1.remote_ip6) /
1491 UDP(sport=1144, dport=2233) /
1492 Raw(b'X' * payload_size))
1493 for i in range(count)]
1495 def gen_pkts6(self, sw_intf, src, dst, count=1,
1497 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1498 IPv6(src="1::1", dst="1::2") /
1499 UDP(sport=1144, dport=2233) /
1500 Raw(b'X' * payload_size)
1501 for i in range(count)]
1503 def verify_decrypted6(self, p, rxs):
1505 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1506 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1508 def verify_encrypted6(self, p, sa, rxs):
1511 pkt = sa.decrypt(rx[IPv6])
1512 if not pkt.haslayer(IPv6):
1513 pkt = IPv6(pkt[Raw].load)
1514 self.assert_packet_checksums_valid(pkt)
1515 self.assertTrue(pkt.haslayer(GRE))
1517 self.assertEqual(e[IPv6].dst, "1::2")
1518 except (IndexError, AssertionError):
1519 self.logger.debug(ppp("Unexpected packet:", rx))
1521 self.logger.debug(ppp("Decrypted packet:", pkt))
1527 super(TestIpsecGre6IfEspTra, self).setUp()
1529 self.tun_if = self.pg0
1531 p = self.ipv6_params
1533 bd1 = VppBridgeDomain(self, 1)
1534 bd1.add_vpp_config()
1536 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1537 p.auth_algo_vpp_id, p.auth_key,
1538 p.crypt_algo_vpp_id, p.crypt_key,
1539 self.vpp_esp_protocol)
1540 p.tun_sa_out.add_vpp_config()
1542 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1543 p.auth_algo_vpp_id, p.auth_key,
1544 p.crypt_algo_vpp_id, p.crypt_key,
1545 self.vpp_esp_protocol)
1546 p.tun_sa_in.add_vpp_config()
1548 p.tun_if = VppGreInterface(self,
1550 self.pg0.remote_ip6)
1551 p.tun_if.add_vpp_config()
1553 p.tun_protect = VppIpsecTunProtect(self,
1557 p.tun_protect.add_vpp_config()
1560 p.tun_if.config_ip6()
1561 config_tra_params(p, self.encryption_type, p.tun_if)
1563 r = VppIpRoute(self, "1::2", 128,
1564 [VppRoutePath(p.tun_if.remote_ip6,
1566 proto=DpoProto.DPO_PROTO_IP6)])
1570 p = self.ipv6_params
1571 p.tun_if.unconfig_ip6()
1572 super(TestIpsecGre6IfEspTra, self).tearDown()
1575 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1576 """ Ipsec mGRE ESP v4 TRA tests """
1577 tun4_encrypt_node_name = "esp4-encrypt-tun"
1578 tun4_decrypt_node_name = "esp4-decrypt-tun"
1579 encryption_type = ESP
1581 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1583 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1584 sa.encrypt(IP(src=p.tun_dst,
1585 dst=self.pg0.local_ip4) /
1587 IP(src=self.pg1.local_ip4,
1588 dst=self.pg1.remote_ip4) /
1589 UDP(sport=1144, dport=2233) /
1590 Raw(b'X' * payload_size))
1591 for i in range(count)]
1593 def gen_pkts(self, sw_intf, src, dst, count=1,
1595 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1596 IP(src="1.1.1.1", dst=dst) /
1597 UDP(sport=1144, dport=2233) /
1598 Raw(b'X' * payload_size)
1599 for i in range(count)]
1601 def verify_decrypted(self, p, rxs):
1603 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1604 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1606 def verify_encrypted(self, p, sa, rxs):
1609 pkt = sa.decrypt(rx[IP])
1610 if not pkt.haslayer(IP):
1611 pkt = IP(pkt[Raw].load)
1612 self.assert_packet_checksums_valid(pkt)
1613 self.assertTrue(pkt.haslayer(GRE))
1615 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1616 except (IndexError, AssertionError):
1617 self.logger.debug(ppp("Unexpected packet:", rx))
1619 self.logger.debug(ppp("Decrypted packet:", pkt))
1625 super(TestIpsecMGreIfEspTra4, self).setUp()
1628 self.tun_if = self.pg0
1629 p = self.ipv4_params
1630 p.tun_if = VppGreInterface(self,
1633 mode=(VppEnum.vl_api_tunnel_mode_t.
1634 TUNNEL_API_MODE_MP))
1635 p.tun_if.add_vpp_config()
1637 p.tun_if.config_ip4()
1638 p.tun_if.generate_remote_hosts(N_NHS)
1639 self.pg0.generate_remote_hosts(N_NHS)
1640 self.pg0.configure_ipv4_neighbors()
1642 # setup some SAs for several next-hops on the interface
1643 self.multi_params = []
1645 for ii in range(N_NHS):
1646 p = copy.copy(self.ipv4_params)
1648 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1649 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1650 p.scapy_tun_spi = p.scapy_tun_spi + ii
1651 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1652 p.vpp_tun_spi = p.vpp_tun_spi + ii
1654 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1655 p.scapy_tra_spi = p.scapy_tra_spi + ii
1656 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1657 p.vpp_tra_spi = p.vpp_tra_spi + ii
1658 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1659 p.auth_algo_vpp_id, p.auth_key,
1660 p.crypt_algo_vpp_id, p.crypt_key,
1661 self.vpp_esp_protocol)
1662 p.tun_sa_out.add_vpp_config()
1664 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1665 p.auth_algo_vpp_id, p.auth_key,
1666 p.crypt_algo_vpp_id, p.crypt_key,
1667 self.vpp_esp_protocol)
1668 p.tun_sa_in.add_vpp_config()
1670 p.tun_protect = VppIpsecTunProtect(
1675 nh=p.tun_if.remote_hosts[ii].ip4)
1676 p.tun_protect.add_vpp_config()
1677 config_tra_params(p, self.encryption_type, p.tun_if)
1678 self.multi_params.append(p)
1680 VppIpRoute(self, p.remote_tun_if_host, 32,
1681 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1682 p.tun_if.sw_if_index)]).add_vpp_config()
1684 # in this v4 variant add the teibs after the protect
1685 p.teib = VppTeib(self, p.tun_if,
1686 p.tun_if.remote_hosts[ii].ip4,
1687 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1688 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1689 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1692 p = self.ipv4_params
1693 p.tun_if.unconfig_ip4()
1694 super(TestIpsecMGreIfEspTra4, self).tearDown()
1696 def test_tun_44(self):
1699 for p in self.multi_params:
1700 self.verify_tun_44(p, count=N_PKTS)
1701 p.teib.remove_vpp_config()
1702 self.verify_tun_dropped_44(p, count=N_PKTS)
1703 p.teib.add_vpp_config()
1704 self.verify_tun_44(p, count=N_PKTS)
1707 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1708 """ Ipsec mGRE ESP v6 TRA tests """
1709 tun6_encrypt_node_name = "esp6-encrypt-tun"
1710 tun6_decrypt_node_name = "esp6-decrypt-tun"
1711 encryption_type = ESP
1713 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1715 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1716 sa.encrypt(IPv6(src=p.tun_dst,
1717 dst=self.pg0.local_ip6) /
1719 IPv6(src=self.pg1.local_ip6,
1720 dst=self.pg1.remote_ip6) /
1721 UDP(sport=1144, dport=2233) /
1722 Raw(b'X' * payload_size))
1723 for i in range(count)]
1725 def gen_pkts6(self, sw_intf, src, dst, count=1,
1727 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1728 IPv6(src="1::1", dst=dst) /
1729 UDP(sport=1144, dport=2233) /
1730 Raw(b'X' * payload_size)
1731 for i in range(count)]
1733 def verify_decrypted6(self, p, rxs):
1735 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1736 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1738 def verify_encrypted6(self, p, sa, rxs):
1741 pkt = sa.decrypt(rx[IPv6])
1742 if not pkt.haslayer(IPv6):
1743 pkt = IPv6(pkt[Raw].load)
1744 self.assert_packet_checksums_valid(pkt)
1745 self.assertTrue(pkt.haslayer(GRE))
1747 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1748 except (IndexError, AssertionError):
1749 self.logger.debug(ppp("Unexpected packet:", rx))
1751 self.logger.debug(ppp("Decrypted packet:", pkt))
1757 super(TestIpsecMGreIfEspTra6, self).setUp()
1759 self.vapi.cli("set logging class ipsec level debug")
1762 self.tun_if = self.pg0
1763 p = self.ipv6_params
1764 p.tun_if = VppGreInterface(self,
1767 mode=(VppEnum.vl_api_tunnel_mode_t.
1768 TUNNEL_API_MODE_MP))
1769 p.tun_if.add_vpp_config()
1771 p.tun_if.config_ip6()
1772 p.tun_if.generate_remote_hosts(N_NHS)
1773 self.pg0.generate_remote_hosts(N_NHS)
1774 self.pg0.configure_ipv6_neighbors()
1776 # setup some SAs for several next-hops on the interface
1777 self.multi_params = []
1779 for ii in range(N_NHS):
1780 p = copy.copy(self.ipv6_params)
1782 p.remote_tun_if_host = "1::%d" % (ii + 1)
1783 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1784 p.scapy_tun_spi = p.scapy_tun_spi + ii
1785 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1786 p.vpp_tun_spi = p.vpp_tun_spi + ii
1788 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1789 p.scapy_tra_spi = p.scapy_tra_spi + ii
1790 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1791 p.vpp_tra_spi = p.vpp_tra_spi + ii
1792 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1793 p.auth_algo_vpp_id, p.auth_key,
1794 p.crypt_algo_vpp_id, p.crypt_key,
1795 self.vpp_esp_protocol)
1796 p.tun_sa_out.add_vpp_config()
1798 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1799 p.auth_algo_vpp_id, p.auth_key,
1800 p.crypt_algo_vpp_id, p.crypt_key,
1801 self.vpp_esp_protocol)
1802 p.tun_sa_in.add_vpp_config()
1804 # in this v6 variant add the teibs first then the protection
1805 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1806 VppTeib(self, p.tun_if,
1807 p.tun_if.remote_hosts[ii].ip6,
1808 p.tun_dst).add_vpp_config()
1810 p.tun_protect = VppIpsecTunProtect(
1815 nh=p.tun_if.remote_hosts[ii].ip6)
1816 p.tun_protect.add_vpp_config()
1817 config_tra_params(p, self.encryption_type, p.tun_if)
1818 self.multi_params.append(p)
1820 VppIpRoute(self, p.remote_tun_if_host, 128,
1821 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1822 p.tun_if.sw_if_index)]).add_vpp_config()
1823 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1825 self.logger.info(self.vapi.cli("sh log"))
1826 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1827 self.logger.info(self.vapi.cli("sh adj 41"))
1830 p = self.ipv6_params
1831 p.tun_if.unconfig_ip6()
1832 super(TestIpsecMGreIfEspTra6, self).tearDown()
1834 def test_tun_66(self):
1836 for p in self.multi_params:
1837 self.verify_tun_66(p, count=63)
1840 class TestIpsec4TunProtect(TemplateIpsec,
1841 TemplateIpsec4TunProtect,
1843 """ IPsec IPv4 Tunnel protect - transport mode"""
1846 super(TestIpsec4TunProtect, self).setUp()
1848 self.tun_if = self.pg0
1851 super(TestIpsec4TunProtect, self).tearDown()
1853 def test_tun_44(self):
1854 """IPSEC tunnel protect"""
1856 p = self.ipv4_params
1858 self.config_network(p)
1859 self.config_sa_tra(p)
1860 self.config_protect(p)
1862 self.verify_tun_44(p, count=127)
1863 c = p.tun_if.get_rx_stats()
1864 self.assertEqual(c['packets'], 127)
1865 c = p.tun_if.get_tx_stats()
1866 self.assertEqual(c['packets'], 127)
1868 self.vapi.cli("clear ipsec sa")
1869 self.verify_tun_64(p, count=127)
1870 c = p.tun_if.get_rx_stats()
1871 self.assertEqual(c['packets'], 254)
1872 c = p.tun_if.get_tx_stats()
1873 self.assertEqual(c['packets'], 254)
1875 # rekey - create new SAs and update the tunnel protection
1877 np.crypt_key = b'X' + p.crypt_key[1:]
1878 np.scapy_tun_spi += 100
1879 np.scapy_tun_sa_id += 1
1880 np.vpp_tun_spi += 100
1881 np.vpp_tun_sa_id += 1
1882 np.tun_if.local_spi = p.vpp_tun_spi
1883 np.tun_if.remote_spi = p.scapy_tun_spi
1885 self.config_sa_tra(np)
1886 self.config_protect(np)
1889 self.verify_tun_44(np, count=127)
1890 c = p.tun_if.get_rx_stats()
1891 self.assertEqual(c['packets'], 381)
1892 c = p.tun_if.get_tx_stats()
1893 self.assertEqual(c['packets'], 381)
1896 self.unconfig_protect(np)
1897 self.unconfig_sa(np)
1898 self.unconfig_network(p)
1901 class TestIpsec4TunProtectUdp(TemplateIpsec,
1902 TemplateIpsec4TunProtect,
1904 """ IPsec IPv4 Tunnel protect - transport mode"""
1907 super(TestIpsec4TunProtectUdp, self).setUp()
1909 self.tun_if = self.pg0
1911 p = self.ipv4_params
1912 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1913 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1914 p.nat_header = UDP(sport=4500, dport=4500)
1915 self.config_network(p)
1916 self.config_sa_tra(p)
1917 self.config_protect(p)
1920 p = self.ipv4_params
1921 self.unconfig_protect(p)
1923 self.unconfig_network(p)
1924 super(TestIpsec4TunProtectUdp, self).tearDown()
1926 def verify_encrypted(self, p, sa, rxs):
1927 # ensure encrypted packets are recieved with the default UDP ports
1929 self.assertEqual(rx[UDP].sport, 4500)
1930 self.assertEqual(rx[UDP].dport, 4500)
1931 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1933 def test_tun_44(self):
1934 """IPSEC UDP tunnel protect"""
1936 p = self.ipv4_params
1938 self.verify_tun_44(p, count=127)
1939 c = p.tun_if.get_rx_stats()
1940 self.assertEqual(c['packets'], 127)
1941 c = p.tun_if.get_tx_stats()
1942 self.assertEqual(c['packets'], 127)
1944 def test_keepalive(self):
1945 """ IPSEC NAT Keepalive """
1946 self.verify_keepalive(self.ipv4_params)
1949 class TestIpsec4TunProtectTun(TemplateIpsec,
1950 TemplateIpsec4TunProtect,
1952 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1954 encryption_type = ESP
1955 tun4_encrypt_node_name = "esp4-encrypt-tun"
1956 tun4_decrypt_node_name = "esp4-decrypt-tun"
1959 super(TestIpsec4TunProtectTun, self).setUp()
1961 self.tun_if = self.pg0
1964 super(TestIpsec4TunProtectTun, self).tearDown()
1966 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1968 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1969 sa.encrypt(IP(src=sw_intf.remote_ip4,
1970 dst=sw_intf.local_ip4) /
1971 IP(src=src, dst=dst) /
1972 UDP(sport=1144, dport=2233) /
1973 Raw(b'X' * payload_size))
1974 for i in range(count)]
1976 def gen_pkts(self, sw_intf, src, dst, count=1,
1978 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1979 IP(src=src, dst=dst) /
1980 UDP(sport=1144, dport=2233) /
1981 Raw(b'X' * payload_size)
1982 for i in range(count)]
1984 def verify_decrypted(self, p, rxs):
1986 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1987 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1988 self.assert_packet_checksums_valid(rx)
1990 def verify_encrypted(self, p, sa, rxs):
1993 pkt = sa.decrypt(rx[IP])
1994 if not pkt.haslayer(IP):
1995 pkt = IP(pkt[Raw].load)
1996 self.assert_packet_checksums_valid(pkt)
1997 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1998 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1999 inner = pkt[IP].payload
2000 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2002 except (IndexError, AssertionError):
2003 self.logger.debug(ppp("Unexpected packet:", rx))
2005 self.logger.debug(ppp("Decrypted packet:", pkt))
2010 def test_tun_44(self):
2011 """IPSEC tunnel protect """
2013 p = self.ipv4_params
2015 self.config_network(p)
2016 self.config_sa_tun(p)
2017 self.config_protect(p)
2019 # also add an output features on the tunnel and physical interface
2020 # so we test they still work
2021 r_all = AclRule(True,
2022 src_prefix="0.0.0.0/0",
2023 dst_prefix="0.0.0.0/0",
2025 a = VppAcl(self, [r_all]).add_vpp_config()
2027 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2028 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2030 self.verify_tun_44(p, count=127)
2032 c = p.tun_if.get_rx_stats()
2033 self.assertEqual(c['packets'], 127)
2034 c = p.tun_if.get_tx_stats()
2035 self.assertEqual(c['packets'], 127)
2037 # rekey - create new SAs and update the tunnel protection
2039 np.crypt_key = b'X' + p.crypt_key[1:]
2040 np.scapy_tun_spi += 100
2041 np.scapy_tun_sa_id += 1
2042 np.vpp_tun_spi += 100
2043 np.vpp_tun_sa_id += 1
2044 np.tun_if.local_spi = p.vpp_tun_spi
2045 np.tun_if.remote_spi = p.scapy_tun_spi
2047 self.config_sa_tun(np)
2048 self.config_protect(np)
2051 self.verify_tun_44(np, count=127)
2052 c = p.tun_if.get_rx_stats()
2053 self.assertEqual(c['packets'], 254)
2054 c = p.tun_if.get_tx_stats()
2055 self.assertEqual(c['packets'], 254)
2058 self.unconfig_protect(np)
2059 self.unconfig_sa(np)
2060 self.unconfig_network(p)
2063 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2064 TemplateIpsec4TunProtect,
2066 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2068 encryption_type = ESP
2069 tun4_encrypt_node_name = "esp4-encrypt-tun"
2070 tun4_decrypt_node_name = "esp4-decrypt-tun"
2073 super(TestIpsec4TunProtectTunDrop, self).setUp()
2075 self.tun_if = self.pg0
2078 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2080 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2082 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2083 sa.encrypt(IP(src=sw_intf.remote_ip4,
2085 IP(src=src, dst=dst) /
2086 UDP(sport=1144, dport=2233) /
2087 Raw(b'X' * payload_size))
2088 for i in range(count)]
2090 def test_tun_drop_44(self):
2091 """IPSEC tunnel protect bogus tunnel header """
2093 p = self.ipv4_params
2095 self.config_network(p)
2096 self.config_sa_tun(p)
2097 self.config_protect(p)
2099 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2100 src=p.remote_tun_if_host,
2101 dst=self.pg1.remote_ip4,
2103 self.send_and_assert_no_replies(self.tun_if, tx)
2106 self.unconfig_protect(p)
2108 self.unconfig_network(p)
2111 class TestIpsec6TunProtect(TemplateIpsec,
2112 TemplateIpsec6TunProtect,
2114 """ IPsec IPv6 Tunnel protect - transport mode"""
2116 encryption_type = ESP
2117 tun6_encrypt_node_name = "esp6-encrypt-tun"
2118 tun6_decrypt_node_name = "esp6-decrypt-tun"
2121 super(TestIpsec6TunProtect, self).setUp()
2123 self.tun_if = self.pg0
2126 super(TestIpsec6TunProtect, self).tearDown()
2128 def test_tun_66(self):
2129 """IPSEC tunnel protect 6o6"""
2131 p = self.ipv6_params
2133 self.config_network(p)
2134 self.config_sa_tra(p)
2135 self.config_protect(p)
2137 self.verify_tun_66(p, count=127)
2138 c = p.tun_if.get_rx_stats()
2139 self.assertEqual(c['packets'], 127)
2140 c = p.tun_if.get_tx_stats()
2141 self.assertEqual(c['packets'], 127)
2143 # rekey - create new SAs and update the tunnel protection
2145 np.crypt_key = b'X' + p.crypt_key[1:]
2146 np.scapy_tun_spi += 100
2147 np.scapy_tun_sa_id += 1
2148 np.vpp_tun_spi += 100
2149 np.vpp_tun_sa_id += 1
2150 np.tun_if.local_spi = p.vpp_tun_spi
2151 np.tun_if.remote_spi = p.scapy_tun_spi
2153 self.config_sa_tra(np)
2154 self.config_protect(np)
2157 self.verify_tun_66(np, count=127)
2158 c = p.tun_if.get_rx_stats()
2159 self.assertEqual(c['packets'], 254)
2160 c = p.tun_if.get_tx_stats()
2161 self.assertEqual(c['packets'], 254)
2163 # bounce the interface state
2164 p.tun_if.admin_down()
2165 self.verify_drop_tun_66(np, count=127)
2166 node = ('/err/ipsec6-tun-input/%s' %
2167 'ipsec packets received on disabled interface')
2168 self.assertEqual(127, self.statistics.get_err_counter(node))
2170 self.verify_tun_66(np, count=127)
2173 # 1) add two input SAs [old, new]
2174 # 2) swap output SA to [new]
2175 # 3) use only [new] input SA
2177 np3.crypt_key = b'Z' + p.crypt_key[1:]
2178 np3.scapy_tun_spi += 100
2179 np3.scapy_tun_sa_id += 1
2180 np3.vpp_tun_spi += 100
2181 np3.vpp_tun_sa_id += 1
2182 np3.tun_if.local_spi = p.vpp_tun_spi
2183 np3.tun_if.remote_spi = p.scapy_tun_spi
2185 self.config_sa_tra(np3)
2188 p.tun_protect.update_vpp_config(np.tun_sa_out,
2189 [np.tun_sa_in, np3.tun_sa_in])
2190 self.verify_tun_66(np, np, count=127)
2191 self.verify_tun_66(np3, np, count=127)
2194 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2195 [np.tun_sa_in, np3.tun_sa_in])
2196 self.verify_tun_66(np, np3, count=127)
2197 self.verify_tun_66(np3, np3, count=127)
2200 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2202 self.verify_tun_66(np3, np3, count=127)
2203 self.verify_drop_tun_66(np, count=127)
2205 c = p.tun_if.get_rx_stats()
2206 self.assertEqual(c['packets'], 127*9)
2207 c = p.tun_if.get_tx_stats()
2208 self.assertEqual(c['packets'], 127*8)
2209 self.unconfig_sa(np)
2212 self.unconfig_protect(np3)
2213 self.unconfig_sa(np3)
2214 self.unconfig_network(p)
2216 def test_tun_46(self):
2217 """IPSEC tunnel protect 4o6"""
2219 p = self.ipv6_params
2221 self.config_network(p)
2222 self.config_sa_tra(p)
2223 self.config_protect(p)
2225 self.verify_tun_46(p, count=127)
2226 c = p.tun_if.get_rx_stats()
2227 self.assertEqual(c['packets'], 127)
2228 c = p.tun_if.get_tx_stats()
2229 self.assertEqual(c['packets'], 127)
2232 self.unconfig_protect(p)
2234 self.unconfig_network(p)
2237 class TestIpsec6TunProtectTun(TemplateIpsec,
2238 TemplateIpsec6TunProtect,
2240 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2242 encryption_type = ESP
2243 tun6_encrypt_node_name = "esp6-encrypt-tun"
2244 tun6_decrypt_node_name = "esp6-decrypt-tun"
2247 super(TestIpsec6TunProtectTun, self).setUp()
2249 self.tun_if = self.pg0
2252 super(TestIpsec6TunProtectTun, self).tearDown()
2254 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2256 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2257 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2258 dst=sw_intf.local_ip6) /
2259 IPv6(src=src, dst=dst) /
2260 UDP(sport=1166, dport=2233) /
2261 Raw(b'X' * payload_size))
2262 for i in range(count)]
2264 def gen_pkts6(self, sw_intf, src, dst, count=1,
2266 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2267 IPv6(src=src, dst=dst) /
2268 UDP(sport=1166, dport=2233) /
2269 Raw(b'X' * payload_size)
2270 for i in range(count)]
2272 def verify_decrypted6(self, p, rxs):
2274 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2275 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2276 self.assert_packet_checksums_valid(rx)
2278 def verify_encrypted6(self, p, sa, rxs):
2281 pkt = sa.decrypt(rx[IPv6])
2282 if not pkt.haslayer(IPv6):
2283 pkt = IPv6(pkt[Raw].load)
2284 self.assert_packet_checksums_valid(pkt)
2285 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2286 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2287 inner = pkt[IPv6].payload
2288 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2290 except (IndexError, AssertionError):
2291 self.logger.debug(ppp("Unexpected packet:", rx))
2293 self.logger.debug(ppp("Decrypted packet:", pkt))
2298 def test_tun_66(self):
2299 """IPSEC tunnel protect """
2301 p = self.ipv6_params
2303 self.config_network(p)
2304 self.config_sa_tun(p)
2305 self.config_protect(p)
2307 self.verify_tun_66(p, count=127)
2309 c = p.tun_if.get_rx_stats()
2310 self.assertEqual(c['packets'], 127)
2311 c = p.tun_if.get_tx_stats()
2312 self.assertEqual(c['packets'], 127)
2314 # rekey - create new SAs and update the tunnel protection
2316 np.crypt_key = b'X' + p.crypt_key[1:]
2317 np.scapy_tun_spi += 100
2318 np.scapy_tun_sa_id += 1
2319 np.vpp_tun_spi += 100
2320 np.vpp_tun_sa_id += 1
2321 np.tun_if.local_spi = p.vpp_tun_spi
2322 np.tun_if.remote_spi = p.scapy_tun_spi
2324 self.config_sa_tun(np)
2325 self.config_protect(np)
2328 self.verify_tun_66(np, count=127)
2329 c = p.tun_if.get_rx_stats()
2330 self.assertEqual(c['packets'], 254)
2331 c = p.tun_if.get_tx_stats()
2332 self.assertEqual(c['packets'], 254)
2335 self.unconfig_protect(np)
2336 self.unconfig_sa(np)
2337 self.unconfig_network(p)
2340 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2341 TemplateIpsec6TunProtect,
2343 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2345 encryption_type = ESP
2346 tun6_encrypt_node_name = "esp6-encrypt-tun"
2347 tun6_decrypt_node_name = "esp6-decrypt-tun"
2350 super(TestIpsec6TunProtectTunDrop, self).setUp()
2352 self.tun_if = self.pg0
2355 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2357 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2359 # the IP destination of the revelaed packet does not match
2360 # that assigned to the tunnel
2361 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2362 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2364 IPv6(src=src, dst=dst) /
2365 UDP(sport=1144, dport=2233) /
2366 Raw(b'X' * payload_size))
2367 for i in range(count)]
2369 def test_tun_drop_66(self):
2370 """IPSEC 6 tunnel protect bogus tunnel header """
2372 p = self.ipv6_params
2374 self.config_network(p)
2375 self.config_sa_tun(p)
2376 self.config_protect(p)
2378 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2379 src=p.remote_tun_if_host,
2380 dst=self.pg1.remote_ip6,
2382 self.send_and_assert_no_replies(self.tun_if, tx)
2384 self.unconfig_protect(p)
2386 self.unconfig_network(p)
2389 class TemplateIpsecItf4(object):
2390 """ IPsec Interface IPv4 """
2392 encryption_type = ESP
2393 tun4_encrypt_node_name = "esp4-encrypt-tun"
2394 tun4_decrypt_node_name = "esp4-decrypt-tun"
2395 tun4_input_node = "ipsec4-tun-input"
2397 def config_sa_tun(self, p, src, dst):
2398 config_tun_params(p, self.encryption_type, None, src, dst)
2400 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2401 p.auth_algo_vpp_id, p.auth_key,
2402 p.crypt_algo_vpp_id, p.crypt_key,
2403 self.vpp_esp_protocol,
2406 p.tun_sa_out.add_vpp_config()
2408 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2409 p.auth_algo_vpp_id, p.auth_key,
2410 p.crypt_algo_vpp_id, p.crypt_key,
2411 self.vpp_esp_protocol,
2414 p.tun_sa_in.add_vpp_config()
2416 def config_protect(self, p):
2417 p.tun_protect = VppIpsecTunProtect(self,
2421 p.tun_protect.add_vpp_config()
2423 def config_network(self, p, instance=0xffffffff):
2424 p.tun_if = VppIpsecInterface(self, instance=instance)
2426 p.tun_if.add_vpp_config()
2428 p.tun_if.config_ip4()
2429 p.tun_if.config_ip6()
2431 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2432 [VppRoutePath(p.tun_if.remote_ip4,
2434 p.route.add_vpp_config()
2435 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2436 [VppRoutePath(p.tun_if.remote_ip6,
2438 proto=DpoProto.DPO_PROTO_IP6)])
2441 def unconfig_network(self, p):
2442 p.route.remove_vpp_config()
2443 p.tun_if.remove_vpp_config()
2445 def unconfig_protect(self, p):
2446 p.tun_protect.remove_vpp_config()
2448 def unconfig_sa(self, p):
2449 p.tun_sa_out.remove_vpp_config()
2450 p.tun_sa_in.remove_vpp_config()
2453 class TestIpsecItf4(TemplateIpsec,
2456 """ IPsec Interface IPv4 """
2459 super(TestIpsecItf4, self).setUp()
2461 self.tun_if = self.pg0
2464 super(TestIpsecItf4, self).tearDown()
2466 def test_tun_instance_44(self):
2467 p = self.ipv4_params
2468 self.config_network(p, instance=3)
2470 with self.assertRaises(CliFailedCommandError):
2471 self.vapi.cli("show interface ipsec0")
2473 output = self.vapi.cli("show interface ipsec3")
2474 self.assertTrue("unknown" not in output)
2476 self.unconfig_network(p)
2478 def test_tun_44(self):
2479 """IPSEC interface IPv4"""
2482 p = self.ipv4_params
2484 self.config_network(p)
2485 self.config_sa_tun(p,
2487 self.pg0.remote_ip4)
2488 self.config_protect(p)
2490 self.verify_tun_44(p, count=n_pkts)
2491 c = p.tun_if.get_rx_stats()
2492 self.assertEqual(c['packets'], n_pkts)
2493 c = p.tun_if.get_tx_stats()
2494 self.assertEqual(c['packets'], n_pkts)
2496 p.tun_if.admin_down()
2497 self.verify_tun_dropped_44(p, count=n_pkts)
2499 self.verify_tun_44(p, count=n_pkts)
2501 c = p.tun_if.get_rx_stats()
2502 self.assertEqual(c['packets'], 3*n_pkts)
2503 c = p.tun_if.get_tx_stats()
2504 self.assertEqual(c['packets'], 2*n_pkts)
2506 # it's a v6 packet when its encrypted
2507 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2509 self.verify_tun_64(p, count=n_pkts)
2510 c = p.tun_if.get_rx_stats()
2511 self.assertEqual(c['packets'], 4*n_pkts)
2512 c = p.tun_if.get_tx_stats()
2513 self.assertEqual(c['packets'], 3*n_pkts)
2515 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2517 self.vapi.cli("clear interfaces")
2519 # rekey - create new SAs and update the tunnel protection
2521 np.crypt_key = b'X' + p.crypt_key[1:]
2522 np.scapy_tun_spi += 100
2523 np.scapy_tun_sa_id += 1
2524 np.vpp_tun_spi += 100
2525 np.vpp_tun_sa_id += 1
2526 np.tun_if.local_spi = p.vpp_tun_spi
2527 np.tun_if.remote_spi = p.scapy_tun_spi
2529 self.config_sa_tun(np,
2531 self.pg0.remote_ip4)
2532 self.config_protect(np)
2535 self.verify_tun_44(np, count=n_pkts)
2536 c = p.tun_if.get_rx_stats()
2537 self.assertEqual(c['packets'], n_pkts)
2538 c = p.tun_if.get_tx_stats()
2539 self.assertEqual(c['packets'], n_pkts)
2542 self.unconfig_protect(np)
2543 self.unconfig_sa(np)
2544 self.unconfig_network(p)
2546 def test_tun_44_null(self):
2547 """IPSEC interface IPv4 NULL auth/crypto"""
2550 p = copy.copy(self.ipv4_params)
2552 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2553 IPSEC_API_INTEG_ALG_NONE)
2554 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2555 IPSEC_API_CRYPTO_ALG_NONE)
2556 p.crypt_algo = "NULL"
2557 p.auth_algo = "NULL"
2559 self.config_network(p)
2560 self.config_sa_tun(p,
2562 self.pg0.remote_ip4)
2563 self.config_protect(p)
2565 self.verify_tun_44(p, count=n_pkts)
2568 self.unconfig_protect(p)
2570 self.unconfig_network(p)
2573 class TemplateIpsecItf6(object):
2574 """ IPsec Interface IPv6 """
2576 encryption_type = ESP
2577 tun6_encrypt_node_name = "esp6-encrypt-tun"
2578 tun6_decrypt_node_name = "esp6-decrypt-tun"
2579 tun6_input_node = "ipsec6-tun-input"
2581 def config_sa_tun(self, p, src, dst):
2582 config_tun_params(p, self.encryption_type, None, src, dst)
2584 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2585 p.auth_algo_vpp_id, p.auth_key,
2586 p.crypt_algo_vpp_id, p.crypt_key,
2587 self.vpp_esp_protocol,
2590 p.tun_sa_out.add_vpp_config()
2592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2593 p.auth_algo_vpp_id, p.auth_key,
2594 p.crypt_algo_vpp_id, p.crypt_key,
2595 self.vpp_esp_protocol,
2598 p.tun_sa_in.add_vpp_config()
2600 def config_protect(self, p):
2601 p.tun_protect = VppIpsecTunProtect(self,
2605 p.tun_protect.add_vpp_config()
2607 def config_network(self, p):
2608 p.tun_if = VppIpsecInterface(self)
2610 p.tun_if.add_vpp_config()
2612 p.tun_if.config_ip4()
2613 p.tun_if.config_ip6()
2615 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2616 [VppRoutePath(p.tun_if.remote_ip4,
2620 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2621 [VppRoutePath(p.tun_if.remote_ip6,
2623 proto=DpoProto.DPO_PROTO_IP6)])
2624 p.route.add_vpp_config()
2626 def unconfig_network(self, p):
2627 p.route.remove_vpp_config()
2628 p.tun_if.remove_vpp_config()
2630 def unconfig_protect(self, p):
2631 p.tun_protect.remove_vpp_config()
2633 def unconfig_sa(self, p):
2634 p.tun_sa_out.remove_vpp_config()
2635 p.tun_sa_in.remove_vpp_config()
2638 class TestIpsecItf6(TemplateIpsec,
2641 """ IPsec Interface IPv6 """
2644 super(TestIpsecItf6, self).setUp()
2646 self.tun_if = self.pg0
2649 super(TestIpsecItf6, self).tearDown()
2651 def test_tun_44(self):
2652 """IPSEC interface IPv6"""
2655 p = self.ipv6_params
2657 self.config_network(p)
2658 self.config_sa_tun(p,
2660 self.pg0.remote_ip6)
2661 self.config_protect(p)
2663 self.verify_tun_66(p, count=n_pkts)
2664 c = p.tun_if.get_rx_stats()
2665 self.assertEqual(c['packets'], n_pkts)
2666 c = p.tun_if.get_tx_stats()
2667 self.assertEqual(c['packets'], n_pkts)
2669 p.tun_if.admin_down()
2670 self.verify_drop_tun_66(p, count=n_pkts)
2672 self.verify_tun_66(p, count=n_pkts)
2674 c = p.tun_if.get_rx_stats()
2675 self.assertEqual(c['packets'], 3*n_pkts)
2676 c = p.tun_if.get_tx_stats()
2677 self.assertEqual(c['packets'], 2*n_pkts)
2679 # it's a v4 packet when its encrypted
2680 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2682 self.verify_tun_46(p, count=n_pkts)
2683 c = p.tun_if.get_rx_stats()
2684 self.assertEqual(c['packets'], 4*n_pkts)
2685 c = p.tun_if.get_tx_stats()
2686 self.assertEqual(c['packets'], 3*n_pkts)
2688 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2690 self.vapi.cli("clear interfaces")
2692 # rekey - create new SAs and update the tunnel protection
2694 np.crypt_key = b'X' + p.crypt_key[1:]
2695 np.scapy_tun_spi += 100
2696 np.scapy_tun_sa_id += 1
2697 np.vpp_tun_spi += 100
2698 np.vpp_tun_sa_id += 1
2699 np.tun_if.local_spi = p.vpp_tun_spi
2700 np.tun_if.remote_spi = p.scapy_tun_spi
2702 self.config_sa_tun(np,
2704 self.pg0.remote_ip6)
2705 self.config_protect(np)
2708 self.verify_tun_66(np, count=n_pkts)
2709 c = p.tun_if.get_rx_stats()
2710 self.assertEqual(c['packets'], n_pkts)
2711 c = p.tun_if.get_tx_stats()
2712 self.assertEqual(c['packets'], n_pkts)
2715 self.unconfig_protect(np)
2716 self.unconfig_sa(np)
2717 self.unconfig_network(p)
2720 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
2721 """ Ipsec P2MP ESP v4 tests """
2722 tun4_encrypt_node_name = "esp4-encrypt-tun"
2723 tun4_decrypt_node_name = "esp4-decrypt-tun"
2724 encryption_type = ESP
2726 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2728 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2729 sa.encrypt(IP(src=self.pg1.local_ip4,
2730 dst=self.pg1.remote_ip4) /
2731 UDP(sport=1144, dport=2233) /
2732 Raw(b'X' * payload_size))
2733 for i in range(count)]
2735 def gen_pkts(self, sw_intf, src, dst, count=1,
2737 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2738 IP(src="1.1.1.1", dst=dst) /
2739 UDP(sport=1144, dport=2233) /
2740 Raw(b'X' * payload_size)
2741 for i in range(count)]
2743 def verify_decrypted(self, p, rxs):
2745 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
2746 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2748 def verify_encrypted(self, p, sa, rxs):
2751 self.assertEqual(rx[IP].tos,
2752 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
2753 pkt = sa.decrypt(rx[IP])
2754 if not pkt.haslayer(IP):
2755 pkt = IP(pkt[Raw].load)
2756 self.assert_packet_checksums_valid(pkt)
2758 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
2759 except (IndexError, AssertionError):
2760 self.logger.debug(ppp("Unexpected packet:", rx))
2762 self.logger.debug(ppp("Decrypted packet:", pkt))
2768 super(TestIpsecMIfEsp4, self).setUp()
2771 self.tun_if = self.pg0
2772 p = self.ipv4_params
2773 p.tun_if = VppIpsecInterface(self,
2774 mode=(VppEnum.vl_api_tunnel_mode_t.
2775 TUNNEL_API_MODE_MP))
2776 p.tun_if.add_vpp_config()
2778 p.tun_if.config_ip4()
2779 p.tun_if.unconfig_ip4()
2780 p.tun_if.config_ip4()
2781 p.tun_if.generate_remote_hosts(N_NHS)
2782 self.pg0.generate_remote_hosts(N_NHS)
2783 self.pg0.configure_ipv4_neighbors()
2785 # setup some SAs for several next-hops on the interface
2786 self.multi_params = []
2788 for ii in range(N_NHS):
2789 p = copy.copy(self.ipv4_params)
2791 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
2792 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
2793 p.scapy_tun_spi = p.scapy_tun_spi + ii
2794 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
2795 p.vpp_tun_spi = p.vpp_tun_spi + ii
2797 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
2798 p.scapy_tra_spi = p.scapy_tra_spi + ii
2799 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
2800 p.vpp_tra_spi = p.vpp_tra_spi + ii
2801 p.tun_sa_out = VppIpsecSA(
2802 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2803 p.auth_algo_vpp_id, p.auth_key,
2804 p.crypt_algo_vpp_id, p.crypt_key,
2805 self.vpp_esp_protocol,
2807 self.pg0.remote_hosts[ii].ip4,
2808 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2809 p.tun_sa_out.add_vpp_config()
2811 p.tun_sa_in = VppIpsecSA(
2812 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2813 p.auth_algo_vpp_id, p.auth_key,
2814 p.crypt_algo_vpp_id, p.crypt_key,
2815 self.vpp_esp_protocol,
2816 self.pg0.remote_hosts[ii].ip4,
2818 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF)
2819 p.tun_sa_in.add_vpp_config()
2821 p.tun_protect = VppIpsecTunProtect(
2826 nh=p.tun_if.remote_hosts[ii].ip4)
2827 p.tun_protect.add_vpp_config()
2828 config_tun_params(p, self.encryption_type, None,
2830 self.pg0.remote_hosts[ii].ip4)
2831 self.multi_params.append(p)
2833 VppIpRoute(self, p.remote_tun_if_host, 32,
2834 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
2835 p.tun_if.sw_if_index)]).add_vpp_config()
2837 p.tun_dst = self.pg0.remote_hosts[ii].ip4
2840 p = self.ipv4_params
2841 p.tun_if.unconfig_ip4()
2842 super(TestIpsecMIfEsp4, self).tearDown()
2844 def test_tun_44(self):
2847 for p in self.multi_params:
2848 self.verify_tun_44(p, count=N_PKTS)
2851 if __name__ == '__main__':
2852 unittest.main(testRunner=VppTestRunner)