5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key
13 from vpp_ipsec_tun_interface import VppIpsecTunInterface
14 from vpp_gre_interface import VppGreInterface
15 from vpp_ipip_tun_interface import VppIpIpTunInterface
16 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
17 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
18 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_papi import VppEnum
23 def config_tun_params(p, encryption_type, tun_if):
24 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
25 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
26 IPSEC_API_SAD_FLAG_USE_ESN))
27 crypt_key = mk_scapy_crypt_key(p)
28 p.scapy_tun_sa = SecurityAssociation(
29 encryption_type, spi=p.vpp_tun_spi,
30 crypt_algo=p.crypt_algo,
32 auth_algo=p.auth_algo, auth_key=p.auth_key,
33 tunnel_header=ip_class_by_addr_type[p.addr_type](
36 nat_t_header=p.nat_header,
38 p.vpp_tun_sa = SecurityAssociation(
39 encryption_type, spi=p.scapy_tun_spi,
40 crypt_algo=p.crypt_algo,
42 auth_algo=p.auth_algo, auth_key=p.auth_key,
43 tunnel_header=ip_class_by_addr_type[p.addr_type](
46 nat_t_header=p.nat_header,
50 class TemplateIpsec4TunIfEsp(TemplateIpsec):
51 """ IPsec tunnel interface tests """
57 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
60 def tearDownClass(cls):
61 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
64 super(TemplateIpsec4TunIfEsp, self).setUp()
66 self.tun_if = self.pg0
70 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
71 p.scapy_tun_spi, p.crypt_algo_vpp_id,
72 p.crypt_key, p.crypt_key,
73 p.auth_algo_vpp_id, p.auth_key,
75 p.tun_if.add_vpp_config()
79 config_tun_params(p, self.encryption_type, p.tun_if)
81 r = VppIpRoute(self, p.remote_tun_if_host, 32,
82 [VppRoutePath(p.tun_if.remote_ip4,
85 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
86 [VppRoutePath(p.tun_if.remote_ip6,
88 proto=DpoProto.DPO_PROTO_IP6)])
92 super(TemplateIpsec4TunIfEsp, self).tearDown()
95 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
96 """ IPsec UDP tunnel interface tests """
98 tun4_encrypt_node_name = "esp4-encrypt-tun"
99 tun4_decrypt_node_name = "esp4-decrypt-tun"
100 encryption_type = ESP
104 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
107 def tearDownClass(cls):
108 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
111 super(TemplateIpsec4TunIfEspUdp, self).setUp()
113 self.tun_if = self.pg0
116 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
117 IPSEC_API_SAD_FLAG_UDP_ENCAP)
118 p.nat_header = UDP(sport=5454, dport=4500)
120 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
121 p.scapy_tun_spi, p.crypt_algo_vpp_id,
122 p.crypt_key, p.crypt_key,
123 p.auth_algo_vpp_id, p.auth_key,
124 p.auth_key, udp_encap=True)
125 p.tun_if.add_vpp_config()
127 p.tun_if.config_ip4()
128 p.tun_if.config_ip6()
129 config_tun_params(p, self.encryption_type, p.tun_if)
131 r = VppIpRoute(self, p.remote_tun_if_host, 32,
132 [VppRoutePath(p.tun_if.remote_ip4,
135 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
136 [VppRoutePath(p.tun_if.remote_ip6,
138 proto=DpoProto.DPO_PROTO_IP6)])
142 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
145 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
146 """ Ipsec ESP - TUN tests """
147 tun4_encrypt_node_name = "esp4-encrypt-tun"
148 tun4_decrypt_node_name = "esp4-decrypt-tun"
150 def test_tun_basic64(self):
151 """ ipsec 6o4 tunnel basic test """
152 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
154 self.verify_tun_64(self.params[socket.AF_INET], count=1)
156 def test_tun_burst64(self):
157 """ ipsec 6o4 tunnel basic test """
158 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
160 self.verify_tun_64(self.params[socket.AF_INET], count=257)
162 def test_tun_basic_frag44(self):
163 """ ipsec 4o4 tunnel frag basic test """
164 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
168 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
170 self.verify_tun_44(self.params[socket.AF_INET],
171 count=1, payload_size=1800, n_rx=2)
172 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
176 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
177 """ Ipsec ESP UDP tests """
179 tun4_input_node = "ipsec4-tun-input"
181 def test_keepalive(self):
182 """ IPSEC NAT Keepalive """
183 self.verify_keepalive(self.ipv4_params)
186 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
187 """ Ipsec ESP - TCP tests """
191 class TemplateIpsec6TunIfEsp(TemplateIpsec):
192 """ IPsec tunnel interface tests """
194 encryption_type = ESP
197 super(TemplateIpsec6TunIfEsp, self).setUp()
199 self.tun_if = self.pg0
202 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
203 p.scapy_tun_spi, p.crypt_algo_vpp_id,
204 p.crypt_key, p.crypt_key,
205 p.auth_algo_vpp_id, p.auth_key,
206 p.auth_key, is_ip6=True)
207 p.tun_if.add_vpp_config()
209 p.tun_if.config_ip6()
210 p.tun_if.config_ip4()
211 config_tun_params(p, self.encryption_type, p.tun_if)
213 r = VppIpRoute(self, p.remote_tun_if_host, 128,
214 [VppRoutePath(p.tun_if.remote_ip6,
216 proto=DpoProto.DPO_PROTO_IP6)])
218 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
219 [VppRoutePath(p.tun_if.remote_ip4,
224 super(TemplateIpsec6TunIfEsp, self).tearDown()
227 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
228 """ Ipsec ESP - TUN tests """
229 tun6_encrypt_node_name = "esp6-encrypt-tun"
230 tun6_decrypt_node_name = "esp6-decrypt-tun"
232 def test_tun_basic46(self):
233 """ ipsec 4o6 tunnel basic test """
234 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
235 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
237 def test_tun_burst46(self):
238 """ ipsec 4o6 tunnel burst test """
239 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
240 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
243 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
244 """ IPsec IPv4 Multi Tunnel interface """
246 encryption_type = ESP
247 tun4_encrypt_node_name = "esp4-encrypt-tun"
248 tun4_decrypt_node_name = "esp4-decrypt-tun"
251 super(TestIpsec4MultiTunIfEsp, self).setUp()
253 self.tun_if = self.pg0
255 self.multi_params = []
256 self.pg0.generate_remote_hosts(10)
257 self.pg0.configure_ipv4_neighbors()
260 p = copy.copy(self.ipv4_params)
262 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
263 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
264 p.scapy_tun_spi = p.scapy_tun_spi + ii
265 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
266 p.vpp_tun_spi = p.vpp_tun_spi + ii
268 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
269 p.scapy_tra_spi = p.scapy_tra_spi + ii
270 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
271 p.vpp_tra_spi = p.vpp_tra_spi + ii
273 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
276 p.crypt_key, p.crypt_key,
277 p.auth_algo_vpp_id, p.auth_key,
279 dst=self.pg0.remote_hosts[ii].ip4)
280 p.tun_if.add_vpp_config()
282 p.tun_if.config_ip4()
283 config_tun_params(p, self.encryption_type, p.tun_if)
284 self.multi_params.append(p)
286 VppIpRoute(self, p.remote_tun_if_host, 32,
287 [VppRoutePath(p.tun_if.remote_ip4,
288 0xffffffff)]).add_vpp_config()
291 super(TestIpsec4MultiTunIfEsp, self).tearDown()
293 def test_tun_44(self):
294 """Multiple IPSEC tunnel interfaces """
295 for p in self.multi_params:
296 self.verify_tun_44(p, count=127)
297 c = p.tun_if.get_rx_stats()
298 self.assertEqual(c['packets'], 127)
299 c = p.tun_if.get_tx_stats()
300 self.assertEqual(c['packets'], 127)
303 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
304 """ IPsec IPv4 Tunnel interface all Algos """
306 encryption_type = ESP
307 tun4_encrypt_node_name = "esp4-encrypt-tun"
308 tun4_decrypt_node_name = "esp4-decrypt-tun"
310 def config_network(self, p):
312 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
315 p.crypt_key, p.crypt_key,
316 p.auth_algo_vpp_id, p.auth_key,
319 p.tun_if.add_vpp_config()
321 p.tun_if.config_ip4()
322 config_tun_params(p, self.encryption_type, p.tun_if)
323 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
324 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
326 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
327 [VppRoutePath(p.tun_if.remote_ip4,
329 p.route.add_vpp_config()
331 def unconfig_network(self, p):
332 p.tun_if.unconfig_ip4()
333 p.tun_if.remove_vpp_config()
334 p.route.remove_vpp_config()
337 super(TestIpsec4TunIfEspAll, self).setUp()
339 self.tun_if = self.pg0
342 super(TestIpsec4TunIfEspAll, self).tearDown()
346 # change the key and the SPI
348 p.crypt_key = b'X' + p.crypt_key[1:]
350 p.scapy_tun_sa_id += 1
353 p.tun_if.local_spi = p.vpp_tun_spi
354 p.tun_if.remote_spi = p.scapy_tun_spi
356 config_tun_params(p, self.encryption_type, p.tun_if)
358 p.tun_sa_in = VppIpsecSA(self,
365 self.vpp_esp_protocol,
368 p.tun_sa_out = VppIpsecSA(self,
375 self.vpp_esp_protocol,
378 p.tun_sa_in.add_vpp_config()
379 p.tun_sa_out.add_vpp_config()
381 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
382 sa_id=p.tun_sa_in.id,
384 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
385 sa_id=p.tun_sa_out.id,
387 self.logger.info(self.vapi.cli("sh ipsec sa"))
389 def test_tun_44(self):
390 """IPSEC tunnel all algos """
392 # foreach VPP crypto engine
393 engines = ["ia32", "ipsecmb", "openssl"]
395 # foreach crypto algorithm
396 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
397 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
398 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
399 IPSEC_API_INTEG_ALG_NONE),
400 'scapy-crypto': "AES-GCM",
401 'scapy-integ': "NULL",
402 'key': b"JPjyOWBeVEQiMe7h",
404 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
405 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
406 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
407 IPSEC_API_INTEG_ALG_NONE),
408 'scapy-crypto': "AES-GCM",
409 'scapy-integ': "NULL",
410 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
412 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
413 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
414 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
415 IPSEC_API_INTEG_ALG_NONE),
416 'scapy-crypto': "AES-GCM",
417 'scapy-integ': "NULL",
418 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
420 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
421 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
422 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
423 IPSEC_API_INTEG_ALG_SHA1_96),
424 'scapy-crypto': "AES-CBC",
425 'scapy-integ': "HMAC-SHA1-96",
427 'key': b"JPjyOWBeVEQiMe7h"},
428 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
429 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
430 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
431 IPSEC_API_INTEG_ALG_SHA1_96),
432 'scapy-crypto': "AES-CBC",
433 'scapy-integ': "HMAC-SHA1-96",
435 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
436 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
437 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
438 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
439 IPSEC_API_INTEG_ALG_SHA1_96),
440 'scapy-crypto': "AES-CBC",
441 'scapy-integ': "HMAC-SHA1-96",
443 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
444 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
445 IPSEC_API_CRYPTO_ALG_NONE),
446 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
447 IPSEC_API_INTEG_ALG_SHA1_96),
448 'scapy-crypto': "NULL",
449 'scapy-integ': "HMAC-SHA1-96",
451 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
453 for engine in engines:
454 self.vapi.cli("set crypto handler all %s" % engine)
457 # loop through each of the algorithms
460 # with self.subTest(algo=algo['scapy']):
462 p = copy.copy(self.ipv4_params)
463 p.auth_algo_vpp_id = algo['vpp-integ']
464 p.crypt_algo_vpp_id = algo['vpp-crypto']
465 p.crypt_algo = algo['scapy-crypto']
466 p.auth_algo = algo['scapy-integ']
467 p.crypt_key = algo['key']
468 p.salt = algo['salt']
470 self.config_network(p)
472 self.verify_tun_44(p, count=127)
473 c = p.tun_if.get_rx_stats()
474 self.assertEqual(c['packets'], 127)
475 c = p.tun_if.get_tx_stats()
476 self.assertEqual(c['packets'], 127)
482 self.verify_tun_44(p, count=127)
484 self.unconfig_network(p)
485 p.tun_sa_out.remove_vpp_config()
486 p.tun_sa_in.remove_vpp_config()
489 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
490 """ IPsec IPv6 Multi Tunnel interface """
492 encryption_type = ESP
493 tun6_encrypt_node_name = "esp6-encrypt-tun"
494 tun6_decrypt_node_name = "esp6-decrypt-tun"
497 super(TestIpsec6MultiTunIfEsp, self).setUp()
499 self.tun_if = self.pg0
501 self.multi_params = []
502 self.pg0.generate_remote_hosts(10)
503 self.pg0.configure_ipv6_neighbors()
506 p = copy.copy(self.ipv6_params)
508 p.remote_tun_if_host = "1111::%d" % (ii + 1)
509 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
510 p.scapy_tun_spi = p.scapy_tun_spi + ii
511 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
512 p.vpp_tun_spi = p.vpp_tun_spi + ii
514 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
515 p.scapy_tra_spi = p.scapy_tra_spi + ii
516 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
517 p.vpp_tra_spi = p.vpp_tra_spi + ii
519 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
522 p.crypt_key, p.crypt_key,
523 p.auth_algo_vpp_id, p.auth_key,
524 p.auth_key, is_ip6=True,
525 dst=self.pg0.remote_hosts[ii].ip6)
526 p.tun_if.add_vpp_config()
528 p.tun_if.config_ip6()
529 config_tun_params(p, self.encryption_type, p.tun_if)
530 self.multi_params.append(p)
532 r = VppIpRoute(self, p.remote_tun_if_host, 128,
533 [VppRoutePath(p.tun_if.remote_ip6,
535 proto=DpoProto.DPO_PROTO_IP6)])
539 super(TestIpsec6MultiTunIfEsp, self).tearDown()
541 def test_tun_66(self):
542 """Multiple IPSEC tunnel interfaces """
543 for p in self.multi_params:
544 self.verify_tun_66(p, count=127)
545 c = p.tun_if.get_rx_stats()
546 self.assertEqual(c['packets'], 127)
547 c = p.tun_if.get_tx_stats()
548 self.assertEqual(c['packets'], 127)
551 class TestIpsecGreTebIfEsp(TemplateIpsec,
553 """ Ipsec GRE TEB ESP - TUN tests """
554 tun4_encrypt_node_name = "esp4-encrypt-tun"
555 tun4_decrypt_node_name = "esp4-decrypt-tun"
556 encryption_type = ESP
557 omac = "00:11:22:33:44:55"
559 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
561 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
562 sa.encrypt(IP(src=self.pg0.remote_ip4,
563 dst=self.pg0.local_ip4) /
565 Ether(dst=self.omac) /
566 IP(src="1.1.1.1", dst="1.1.1.2") /
567 UDP(sport=1144, dport=2233) /
568 Raw(b'X' * payload_size))
569 for i in range(count)]
571 def gen_pkts(self, sw_intf, src, dst, count=1,
573 return [Ether(dst=self.omac) /
574 IP(src="1.1.1.1", dst="1.1.1.2") /
575 UDP(sport=1144, dport=2233) /
576 Raw(b'X' * payload_size)
577 for i in range(count)]
579 def verify_decrypted(self, p, rxs):
581 self.assert_equal(rx[Ether].dst, self.omac)
582 self.assert_equal(rx[IP].dst, "1.1.1.2")
584 def verify_encrypted(self, p, sa, rxs):
587 pkt = sa.decrypt(rx[IP])
588 if not pkt.haslayer(IP):
589 pkt = IP(pkt[Raw].load)
590 self.assert_packet_checksums_valid(pkt)
591 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
592 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
593 self.assertTrue(pkt.haslayer(GRE))
595 self.assertEqual(e[Ether].dst, self.omac)
596 self.assertEqual(e[IP].dst, "1.1.1.2")
597 except (IndexError, AssertionError):
598 self.logger.debug(ppp("Unexpected packet:", rx))
600 self.logger.debug(ppp("Decrypted packet:", pkt))
606 super(TestIpsecGreTebIfEsp, self).setUp()
608 self.tun_if = self.pg0
612 bd1 = VppBridgeDomain(self, 1)
615 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
616 p.auth_algo_vpp_id, p.auth_key,
617 p.crypt_algo_vpp_id, p.crypt_key,
618 self.vpp_esp_protocol,
621 p.tun_sa_out.add_vpp_config()
623 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
624 p.auth_algo_vpp_id, p.auth_key,
625 p.crypt_algo_vpp_id, p.crypt_key,
626 self.vpp_esp_protocol,
629 p.tun_sa_in.add_vpp_config()
631 p.tun_if = VppGreInterface(self,
634 type=(VppEnum.vl_api_gre_tunnel_type_t.
635 GRE_API_TUNNEL_TYPE_TEB))
636 p.tun_if.add_vpp_config()
638 p.tun_protect = VppIpsecTunProtect(self,
643 p.tun_protect.add_vpp_config()
646 p.tun_if.config_ip4()
647 config_tun_params(p, self.encryption_type, p.tun_if)
649 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
650 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
652 self.vapi.cli("clear ipsec sa")
656 p.tun_if.unconfig_ip4()
657 super(TestIpsecGreTebIfEsp, self).tearDown()
660 class TestIpsecGreIfEsp(TemplateIpsec,
662 """ Ipsec GRE ESP - TUN tests """
663 tun4_encrypt_node_name = "esp4-encrypt-tun"
664 tun4_decrypt_node_name = "esp4-decrypt-tun"
665 encryption_type = ESP
667 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
669 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
670 sa.encrypt(IP(src=self.pg0.remote_ip4,
671 dst=self.pg0.local_ip4) /
673 IP(src=self.pg1.local_ip4,
674 dst=self.pg1.remote_ip4) /
675 UDP(sport=1144, dport=2233) /
676 Raw(b'X' * payload_size))
677 for i in range(count)]
679 def gen_pkts(self, sw_intf, src, dst, count=1,
681 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682 IP(src="1.1.1.1", dst="1.1.1.2") /
683 UDP(sport=1144, dport=2233) /
684 Raw(b'X' * payload_size)
685 for i in range(count)]
687 def verify_decrypted(self, p, rxs):
689 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
690 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
692 def verify_encrypted(self, p, sa, rxs):
695 pkt = sa.decrypt(rx[IP])
696 if not pkt.haslayer(IP):
697 pkt = IP(pkt[Raw].load)
698 self.assert_packet_checksums_valid(pkt)
699 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
700 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
701 self.assertTrue(pkt.haslayer(GRE))
703 self.assertEqual(e[IP].dst, "1.1.1.2")
704 except (IndexError, AssertionError):
705 self.logger.debug(ppp("Unexpected packet:", rx))
707 self.logger.debug(ppp("Decrypted packet:", pkt))
713 super(TestIpsecGreIfEsp, self).setUp()
715 self.tun_if = self.pg0
719 bd1 = VppBridgeDomain(self, 1)
722 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
723 p.auth_algo_vpp_id, p.auth_key,
724 p.crypt_algo_vpp_id, p.crypt_key,
725 self.vpp_esp_protocol,
728 p.tun_sa_out.add_vpp_config()
730 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
731 p.auth_algo_vpp_id, p.auth_key,
732 p.crypt_algo_vpp_id, p.crypt_key,
733 self.vpp_esp_protocol,
736 p.tun_sa_in.add_vpp_config()
738 p.tun_if = VppGreInterface(self,
741 p.tun_if.add_vpp_config()
743 p.tun_protect = VppIpsecTunProtect(self,
747 p.tun_protect.add_vpp_config()
750 p.tun_if.config_ip4()
751 config_tun_params(p, self.encryption_type, p.tun_if)
753 VppIpRoute(self, "1.1.1.2", 32,
754 [VppRoutePath(p.tun_if.remote_ip4,
755 0xffffffff)]).add_vpp_config()
759 p.tun_if.unconfig_ip4()
760 super(TestIpsecGreIfEsp, self).tearDown()
763 class TestIpsecGreIfEspTra(TemplateIpsec,
765 """ Ipsec GRE ESP - TRA tests """
766 tun4_encrypt_node_name = "esp4-encrypt-tun"
767 tun4_decrypt_node_name = "esp4-decrypt-tun"
768 encryption_type = ESP
770 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
772 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
773 sa.encrypt(IP(src=self.pg0.remote_ip4,
774 dst=self.pg0.local_ip4) /
776 IP(src=self.pg1.local_ip4,
777 dst=self.pg1.remote_ip4) /
778 UDP(sport=1144, dport=2233) /
779 Raw(b'X' * payload_size))
780 for i in range(count)]
782 def gen_pkts(self, sw_intf, src, dst, count=1,
784 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
785 IP(src="1.1.1.1", dst="1.1.1.2") /
786 UDP(sport=1144, dport=2233) /
787 Raw(b'X' * payload_size)
788 for i in range(count)]
790 def verify_decrypted(self, p, rxs):
792 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
793 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
795 def verify_encrypted(self, p, sa, rxs):
798 pkt = sa.decrypt(rx[IP])
799 if not pkt.haslayer(IP):
800 pkt = IP(pkt[Raw].load)
801 self.assert_packet_checksums_valid(pkt)
802 self.assertTrue(pkt.haslayer(GRE))
804 self.assertEqual(e[IP].dst, "1.1.1.2")
805 except (IndexError, AssertionError):
806 self.logger.debug(ppp("Unexpected packet:", rx))
808 self.logger.debug(ppp("Decrypted packet:", pkt))
814 super(TestIpsecGreIfEspTra, self).setUp()
816 self.tun_if = self.pg0
820 bd1 = VppBridgeDomain(self, 1)
823 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
824 p.auth_algo_vpp_id, p.auth_key,
825 p.crypt_algo_vpp_id, p.crypt_key,
826 self.vpp_esp_protocol)
827 p.tun_sa_out.add_vpp_config()
829 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
830 p.auth_algo_vpp_id, p.auth_key,
831 p.crypt_algo_vpp_id, p.crypt_key,
832 self.vpp_esp_protocol)
833 p.tun_sa_in.add_vpp_config()
835 p.tun_if = VppGreInterface(self,
838 p.tun_if.add_vpp_config()
840 p.tun_protect = VppIpsecTunProtect(self,
844 p.tun_protect.add_vpp_config()
847 p.tun_if.config_ip4()
848 config_tun_params(p, self.encryption_type, p.tun_if)
850 VppIpRoute(self, "1.1.1.2", 32,
851 [VppRoutePath(p.tun_if.remote_ip4,
852 0xffffffff)]).add_vpp_config()
856 p.tun_if.unconfig_ip4()
857 super(TestIpsecGreIfEspTra, self).tearDown()
860 class TemplateIpsec4TunProtect(object):
861 """ IPsec IPv4 Tunnel protect """
863 encryption_type = ESP
864 tun4_encrypt_node_name = "esp4-encrypt-tun"
865 tun4_decrypt_node_name = "esp4-decrypt-tun"
866 tun4_input_node = "ipsec4-tun-input"
868 def config_sa_tra(self, p):
869 config_tun_params(p, self.encryption_type, p.tun_if)
871 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
872 p.auth_algo_vpp_id, p.auth_key,
873 p.crypt_algo_vpp_id, p.crypt_key,
874 self.vpp_esp_protocol,
876 p.tun_sa_out.add_vpp_config()
878 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
879 p.auth_algo_vpp_id, p.auth_key,
880 p.crypt_algo_vpp_id, p.crypt_key,
881 self.vpp_esp_protocol,
883 p.tun_sa_in.add_vpp_config()
885 def config_sa_tun(self, p):
886 config_tun_params(p, self.encryption_type, p.tun_if)
888 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
889 p.auth_algo_vpp_id, p.auth_key,
890 p.crypt_algo_vpp_id, p.crypt_key,
891 self.vpp_esp_protocol,
892 self.tun_if.remote_addr[p.addr_type],
893 self.tun_if.local_addr[p.addr_type],
895 p.tun_sa_out.add_vpp_config()
897 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
898 p.auth_algo_vpp_id, p.auth_key,
899 p.crypt_algo_vpp_id, p.crypt_key,
900 self.vpp_esp_protocol,
901 self.tun_if.remote_addr[p.addr_type],
902 self.tun_if.local_addr[p.addr_type],
904 p.tun_sa_in.add_vpp_config()
906 def config_protect(self, p):
907 p.tun_protect = VppIpsecTunProtect(self,
911 p.tun_protect.add_vpp_config()
913 def config_network(self, p):
914 p.tun_if = VppIpIpTunInterface(self, self.pg0,
917 p.tun_if.add_vpp_config()
919 p.tun_if.config_ip4()
920 p.tun_if.config_ip6()
922 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
923 [VppRoutePath(p.tun_if.remote_ip4,
925 p.route.add_vpp_config()
926 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
927 [VppRoutePath(p.tun_if.remote_ip6,
929 proto=DpoProto.DPO_PROTO_IP6)])
932 def unconfig_network(self, p):
933 p.route.remove_vpp_config()
934 p.tun_if.remove_vpp_config()
936 def unconfig_protect(self, p):
937 p.tun_protect.remove_vpp_config()
939 def unconfig_sa(self, p):
940 p.tun_sa_out.remove_vpp_config()
941 p.tun_sa_in.remove_vpp_config()
944 class TestIpsec4TunProtect(TemplateIpsec,
945 TemplateIpsec4TunProtect,
947 """ IPsec IPv4 Tunnel protect - transport mode"""
950 super(TestIpsec4TunProtect, self).setUp()
952 self.tun_if = self.pg0
955 super(TestIpsec4TunProtect, self).tearDown()
957 def test_tun_44(self):
958 """IPSEC tunnel protect"""
962 self.config_network(p)
963 self.config_sa_tra(p)
964 self.config_protect(p)
966 self.verify_tun_44(p, count=127)
967 c = p.tun_if.get_rx_stats()
968 self.assertEqual(c['packets'], 127)
969 c = p.tun_if.get_tx_stats()
970 self.assertEqual(c['packets'], 127)
972 self.vapi.cli("clear ipsec sa")
973 self.verify_tun_64(p, count=127)
974 c = p.tun_if.get_rx_stats()
975 self.assertEqual(c['packets'], 254)
976 c = p.tun_if.get_tx_stats()
977 self.assertEqual(c['packets'], 254)
979 # rekey - create new SAs and update the tunnel protection
981 np.crypt_key = b'X' + p.crypt_key[1:]
982 np.scapy_tun_spi += 100
983 np.scapy_tun_sa_id += 1
984 np.vpp_tun_spi += 100
985 np.vpp_tun_sa_id += 1
986 np.tun_if.local_spi = p.vpp_tun_spi
987 np.tun_if.remote_spi = p.scapy_tun_spi
989 self.config_sa_tra(np)
990 self.config_protect(np)
993 self.verify_tun_44(np, count=127)
994 c = p.tun_if.get_rx_stats()
995 self.assertEqual(c['packets'], 381)
996 c = p.tun_if.get_tx_stats()
997 self.assertEqual(c['packets'], 381)
1000 self.unconfig_protect(np)
1001 self.unconfig_sa(np)
1002 self.unconfig_network(p)
1005 class TestIpsec4TunProtectUdp(TemplateIpsec,
1006 TemplateIpsec4TunProtect,
1008 """ IPsec IPv4 Tunnel protect - transport mode"""
1011 super(TestIpsec4TunProtectUdp, self).setUp()
1013 self.tun_if = self.pg0
1015 p = self.ipv4_params
1016 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1017 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1018 p.nat_header = UDP(sport=5454, dport=4500)
1019 self.config_network(p)
1020 self.config_sa_tra(p)
1021 self.config_protect(p)
1024 p = self.ipv4_params
1025 self.unconfig_protect(p)
1027 self.unconfig_network(p)
1028 super(TestIpsec4TunProtectUdp, self).tearDown()
1030 def test_tun_44(self):
1031 """IPSEC UDP tunnel protect"""
1033 p = self.ipv4_params
1035 self.verify_tun_44(p, count=127)
1036 c = p.tun_if.get_rx_stats()
1037 self.assertEqual(c['packets'], 127)
1038 c = p.tun_if.get_tx_stats()
1039 self.assertEqual(c['packets'], 127)
1041 def test_keepalive(self):
1042 """ IPSEC NAT Keepalive """
1043 self.verify_keepalive(self.ipv4_params)
1046 class TestIpsec4TunProtectTun(TemplateIpsec,
1047 TemplateIpsec4TunProtect,
1049 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1051 encryption_type = ESP
1052 tun4_encrypt_node_name = "esp4-encrypt-tun"
1053 tun4_decrypt_node_name = "esp4-decrypt-tun"
1056 super(TestIpsec4TunProtectTun, self).setUp()
1058 self.tun_if = self.pg0
1061 super(TestIpsec4TunProtectTun, self).tearDown()
1063 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
1065 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1066 sa.encrypt(IP(src=sw_intf.remote_ip4,
1067 dst=sw_intf.local_ip4) /
1068 IP(src=src, dst=dst) /
1069 UDP(sport=1144, dport=2233) /
1070 Raw(b'X' * payload_size))
1071 for i in range(count)]
1073 def gen_pkts(self, sw_intf, src, dst, count=1,
1075 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1076 IP(src=src, dst=dst) /
1077 UDP(sport=1144, dport=2233) /
1078 Raw(b'X' * payload_size)
1079 for i in range(count)]
1081 def verify_decrypted(self, p, rxs):
1083 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1084 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1085 self.assert_packet_checksums_valid(rx)
1087 def verify_encrypted(self, p, sa, rxs):
1090 pkt = sa.decrypt(rx[IP])
1091 if not pkt.haslayer(IP):
1092 pkt = IP(pkt[Raw].load)
1093 self.assert_packet_checksums_valid(pkt)
1094 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1095 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1096 inner = pkt[IP].payload
1097 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
1099 except (IndexError, AssertionError):
1100 self.logger.debug(ppp("Unexpected packet:", rx))
1102 self.logger.debug(ppp("Decrypted packet:", pkt))
1107 def test_tun_44(self):
1108 """IPSEC tunnel protect """
1110 p = self.ipv4_params
1112 self.config_network(p)
1113 self.config_sa_tun(p)
1114 self.config_protect(p)
1116 self.verify_tun_44(p, count=127)
1118 c = p.tun_if.get_rx_stats()
1119 self.assertEqual(c['packets'], 127)
1120 c = p.tun_if.get_tx_stats()
1121 self.assertEqual(c['packets'], 127)
1123 # rekey - create new SAs and update the tunnel protection
1125 np.crypt_key = b'X' + p.crypt_key[1:]
1126 np.scapy_tun_spi += 100
1127 np.scapy_tun_sa_id += 1
1128 np.vpp_tun_spi += 100
1129 np.vpp_tun_sa_id += 1
1130 np.tun_if.local_spi = p.vpp_tun_spi
1131 np.tun_if.remote_spi = p.scapy_tun_spi
1133 self.config_sa_tun(np)
1134 self.config_protect(np)
1137 self.verify_tun_44(np, count=127)
1138 c = p.tun_if.get_rx_stats()
1139 self.assertEqual(c['packets'], 254)
1140 c = p.tun_if.get_tx_stats()
1141 self.assertEqual(c['packets'], 254)
1144 self.unconfig_protect(np)
1145 self.unconfig_sa(np)
1146 self.unconfig_network(p)
1149 class TemplateIpsec6TunProtect(object):
1150 """ IPsec IPv6 Tunnel protect """
1152 def config_sa_tra(self, p):
1153 config_tun_params(p, self.encryption_type, p.tun_if)
1155 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1156 p.auth_algo_vpp_id, p.auth_key,
1157 p.crypt_algo_vpp_id, p.crypt_key,
1158 self.vpp_esp_protocol)
1159 p.tun_sa_out.add_vpp_config()
1161 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1162 p.auth_algo_vpp_id, p.auth_key,
1163 p.crypt_algo_vpp_id, p.crypt_key,
1164 self.vpp_esp_protocol)
1165 p.tun_sa_in.add_vpp_config()
1167 def config_sa_tun(self, p):
1168 config_tun_params(p, self.encryption_type, p.tun_if)
1170 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1171 p.auth_algo_vpp_id, p.auth_key,
1172 p.crypt_algo_vpp_id, p.crypt_key,
1173 self.vpp_esp_protocol,
1174 self.tun_if.remote_addr[p.addr_type],
1175 self.tun_if.local_addr[p.addr_type])
1176 p.tun_sa_out.add_vpp_config()
1178 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1179 p.auth_algo_vpp_id, p.auth_key,
1180 p.crypt_algo_vpp_id, p.crypt_key,
1181 self.vpp_esp_protocol,
1182 self.tun_if.remote_addr[p.addr_type],
1183 self.tun_if.local_addr[p.addr_type])
1184 p.tun_sa_in.add_vpp_config()
1186 def config_protect(self, p):
1187 p.tun_protect = VppIpsecTunProtect(self,
1191 p.tun_protect.add_vpp_config()
1193 def config_network(self, p):
1194 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1196 self.pg0.remote_ip6)
1197 p.tun_if.add_vpp_config()
1199 p.tun_if.config_ip6()
1200 p.tun_if.config_ip4()
1202 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
1203 [VppRoutePath(p.tun_if.remote_ip6,
1205 proto=DpoProto.DPO_PROTO_IP6)])
1206 p.route.add_vpp_config()
1207 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
1208 [VppRoutePath(p.tun_if.remote_ip4,
1212 def unconfig_network(self, p):
1213 p.route.remove_vpp_config()
1214 p.tun_if.remove_vpp_config()
1216 def unconfig_protect(self, p):
1217 p.tun_protect.remove_vpp_config()
1219 def unconfig_sa(self, p):
1220 p.tun_sa_out.remove_vpp_config()
1221 p.tun_sa_in.remove_vpp_config()
1224 class TestIpsec6TunProtect(TemplateIpsec,
1225 TemplateIpsec6TunProtect,
1227 """ IPsec IPv6 Tunnel protect - transport mode"""
1229 encryption_type = ESP
1230 tun6_encrypt_node_name = "esp6-encrypt-tun"
1231 tun6_decrypt_node_name = "esp6-decrypt-tun"
1234 super(TestIpsec6TunProtect, self).setUp()
1236 self.tun_if = self.pg0
1239 super(TestIpsec6TunProtect, self).tearDown()
1241 def test_tun_66(self):
1242 """IPSEC tunnel protect"""
1244 p = self.ipv6_params
1246 self.config_network(p)
1247 self.config_sa_tra(p)
1248 self.config_protect(p)
1250 self.verify_tun_66(p, count=127)
1251 c = p.tun_if.get_rx_stats()
1252 self.assertEqual(c['packets'], 127)
1253 c = p.tun_if.get_tx_stats()
1254 self.assertEqual(c['packets'], 127)
1256 # rekey - create new SAs and update the tunnel protection
1258 np.crypt_key = b'X' + p.crypt_key[1:]
1259 np.scapy_tun_spi += 100
1260 np.scapy_tun_sa_id += 1
1261 np.vpp_tun_spi += 100
1262 np.vpp_tun_sa_id += 1
1263 np.tun_if.local_spi = p.vpp_tun_spi
1264 np.tun_if.remote_spi = p.scapy_tun_spi
1266 self.config_sa_tra(np)
1267 self.config_protect(np)
1270 self.verify_tun_66(np, count=127)
1271 c = p.tun_if.get_rx_stats()
1272 self.assertEqual(c['packets'], 254)
1273 c = p.tun_if.get_tx_stats()
1274 self.assertEqual(c['packets'], 254)
1277 # 1) add two input SAs [old, new]
1278 # 2) swap output SA to [new]
1279 # 3) use only [new] input SA
1281 np3.crypt_key = b'Z' + p.crypt_key[1:]
1282 np3.scapy_tun_spi += 100
1283 np3.scapy_tun_sa_id += 1
1284 np3.vpp_tun_spi += 100
1285 np3.vpp_tun_sa_id += 1
1286 np3.tun_if.local_spi = p.vpp_tun_spi
1287 np3.tun_if.remote_spi = p.scapy_tun_spi
1289 self.config_sa_tra(np3)
1292 p.tun_protect.update_vpp_config(np.tun_sa_out,
1293 [np.tun_sa_in, np3.tun_sa_in])
1294 self.verify_tun_66(np, np, count=127)
1295 self.verify_tun_66(np3, np, count=127)
1298 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1299 [np.tun_sa_in, np3.tun_sa_in])
1300 self.verify_tun_66(np, np3, count=127)
1301 self.verify_tun_66(np3, np3, count=127)
1304 p.tun_protect.update_vpp_config(np3.tun_sa_out,
1306 self.verify_tun_66(np3, np3, count=127)
1307 self.verify_drop_tun_66(np, count=127)
1309 c = p.tun_if.get_rx_stats()
1310 self.assertEqual(c['packets'], 127*7)
1311 c = p.tun_if.get_tx_stats()
1312 self.assertEqual(c['packets'], 127*7)
1313 self.unconfig_sa(np)
1316 self.unconfig_protect(np3)
1317 self.unconfig_sa(np3)
1318 self.unconfig_network(p)
1320 def test_tun_46(self):
1321 """IPSEC tunnel protect"""
1323 p = self.ipv6_params
1325 self.config_network(p)
1326 self.config_sa_tra(p)
1327 self.config_protect(p)
1329 self.verify_tun_46(p, count=127)
1330 c = p.tun_if.get_rx_stats()
1331 self.assertEqual(c['packets'], 127)
1332 c = p.tun_if.get_tx_stats()
1333 self.assertEqual(c['packets'], 127)
1336 self.unconfig_protect(p)
1338 self.unconfig_network(p)
1341 class TestIpsec6TunProtectTun(TemplateIpsec,
1342 TemplateIpsec6TunProtect,
1344 """ IPsec IPv6 Tunnel protect - tunnel mode"""
1346 encryption_type = ESP
1347 tun6_encrypt_node_name = "esp6-encrypt-tun"
1348 tun6_decrypt_node_name = "esp6-decrypt-tun"
1351 super(TestIpsec6TunProtectTun, self).setUp()
1353 self.tun_if = self.pg0
1356 super(TestIpsec6TunProtectTun, self).tearDown()
1358 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
1360 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1361 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
1362 dst=sw_intf.local_ip6) /
1363 IPv6(src=src, dst=dst) /
1364 UDP(sport=1166, dport=2233) /
1365 Raw(b'X' * payload_size))
1366 for i in range(count)]
1368 def gen_pkts6(self, sw_intf, src, dst, count=1,
1370 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1371 IPv6(src=src, dst=dst) /
1372 UDP(sport=1166, dport=2233) /
1373 Raw(b'X' * payload_size)
1374 for i in range(count)]
1376 def verify_decrypted6(self, p, rxs):
1378 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1379 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1380 self.assert_packet_checksums_valid(rx)
1382 def verify_encrypted6(self, p, sa, rxs):
1385 pkt = sa.decrypt(rx[IPv6])
1386 if not pkt.haslayer(IPv6):
1387 pkt = IPv6(pkt[Raw].load)
1388 self.assert_packet_checksums_valid(pkt)
1389 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
1390 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
1391 inner = pkt[IPv6].payload
1392 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
1394 except (IndexError, AssertionError):
1395 self.logger.debug(ppp("Unexpected packet:", rx))
1397 self.logger.debug(ppp("Decrypted packet:", pkt))
1402 def test_tun_66(self):
1403 """IPSEC tunnel protect """
1405 p = self.ipv6_params
1407 self.config_network(p)
1408 self.config_sa_tun(p)
1409 self.config_protect(p)
1411 self.verify_tun_66(p, count=127)
1413 c = p.tun_if.get_rx_stats()
1414 self.assertEqual(c['packets'], 127)
1415 c = p.tun_if.get_tx_stats()
1416 self.assertEqual(c['packets'], 127)
1418 # rekey - create new SAs and update the tunnel protection
1420 np.crypt_key = b'X' + p.crypt_key[1:]
1421 np.scapy_tun_spi += 100
1422 np.scapy_tun_sa_id += 1
1423 np.vpp_tun_spi += 100
1424 np.vpp_tun_sa_id += 1
1425 np.tun_if.local_spi = p.vpp_tun_spi
1426 np.tun_if.remote_spi = p.scapy_tun_spi
1428 self.config_sa_tun(np)
1429 self.config_protect(np)
1432 self.verify_tun_66(np, count=127)
1433 c = p.tun_if.get_rx_stats()
1434 self.assertEqual(c['packets'], 254)
1435 c = p.tun_if.get_tx_stats()
1436 self.assertEqual(c['packets'], 254)
1439 self.unconfig_protect(np)
1440 self.unconfig_sa(np)
1441 self.unconfig_network(p)
1444 if __name__ == '__main__':
1445 unittest.main(testRunner=VppTestRunner)