5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
24 from vpp_acl import AclRule, VppAcl, VppAclInterface
27 def config_tun_params(p, encryption_type, tun_if):
28 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
29 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
30 IPSEC_API_SAD_FLAG_USE_ESN))
31 crypt_key = mk_scapy_crypt_key(p)
32 p.tun_dst = tun_if.remote_ip
33 p.tun_src = tun_if.local_ip
34 p.scapy_tun_sa = SecurityAssociation(
35 encryption_type, spi=p.vpp_tun_spi,
36 crypt_algo=p.crypt_algo,
38 auth_algo=p.auth_algo, auth_key=p.auth_key,
39 tunnel_header=ip_class_by_addr_type[p.addr_type](
42 nat_t_header=p.nat_header,
44 p.vpp_tun_sa = SecurityAssociation(
45 encryption_type, spi=p.scapy_tun_spi,
46 crypt_algo=p.crypt_algo,
48 auth_algo=p.auth_algo, auth_key=p.auth_key,
49 tunnel_header=ip_class_by_addr_type[p.addr_type](
52 nat_t_header=p.nat_header,
56 def config_tra_params(p, encryption_type, tun_if):
57 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
58 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
59 IPSEC_API_SAD_FLAG_USE_ESN))
60 crypt_key = mk_scapy_crypt_key(p)
61 p.tun_dst = tun_if.remote_ip
62 p.tun_src = tun_if.local_ip
63 p.scapy_tun_sa = SecurityAssociation(
64 encryption_type, spi=p.vpp_tun_spi,
65 crypt_algo=p.crypt_algo,
67 auth_algo=p.auth_algo, auth_key=p.auth_key,
69 nat_t_header=p.nat_header)
70 p.vpp_tun_sa = SecurityAssociation(
71 encryption_type, spi=p.scapy_tun_spi,
72 crypt_algo=p.crypt_algo,
74 auth_algo=p.auth_algo, auth_key=p.auth_key,
76 nat_t_header=p.nat_header)
79 class TemplateIpsec4TunIfEsp(TemplateIpsec):
80 """ IPsec tunnel interface tests """
86 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
89 def tearDownClass(cls):
90 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
93 super(TemplateIpsec4TunIfEsp, self).setUp()
95 self.tun_if = self.pg0
99 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
100 p.scapy_tun_spi, p.crypt_algo_vpp_id,
101 p.crypt_key, p.crypt_key,
102 p.auth_algo_vpp_id, p.auth_key,
104 p.tun_if.add_vpp_config()
106 p.tun_if.config_ip4()
107 p.tun_if.config_ip6()
108 config_tun_params(p, self.encryption_type, p.tun_if)
110 r = VppIpRoute(self, p.remote_tun_if_host, 32,
111 [VppRoutePath(p.tun_if.remote_ip4,
114 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
115 [VppRoutePath(p.tun_if.remote_ip6,
117 proto=DpoProto.DPO_PROTO_IP6)])
121 super(TemplateIpsec4TunIfEsp, self).tearDown()
124 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
125 """ IPsec UDP tunnel interface tests """
127 tun4_encrypt_node_name = "esp4-encrypt-tun"
128 tun4_decrypt_node_name = "esp4-decrypt-tun"
129 encryption_type = ESP
133 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
136 def tearDownClass(cls):
137 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
139 def verify_encrypted(self, p, sa, rxs):
142 # ensure the UDP ports are correct before we decrypt
144 self.assertTrue(rx.haslayer(UDP))
145 self.assert_equal(rx[UDP].sport, 4500)
146 self.assert_equal(rx[UDP].dport, 4500)
148 pkt = sa.decrypt(rx[IP])
149 if not pkt.haslayer(IP):
150 pkt = IP(pkt[Raw].load)
152 self.assert_packet_checksums_valid(pkt)
153 self.assert_equal(pkt[IP].dst, "1.1.1.1")
154 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
155 except (IndexError, AssertionError):
156 self.logger.debug(ppp("Unexpected packet:", rx))
158 self.logger.debug(ppp("Decrypted packet:", pkt))
164 super(TemplateIpsec4TunIfEspUdp, self).setUp()
167 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
168 IPSEC_API_SAD_FLAG_UDP_ENCAP)
169 p.nat_header = UDP(sport=5454, dport=4500)
171 def config_network(self):
173 self.tun_if = self.pg0
175 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
176 p.scapy_tun_spi, p.crypt_algo_vpp_id,
177 p.crypt_key, p.crypt_key,
178 p.auth_algo_vpp_id, p.auth_key,
179 p.auth_key, udp_encap=True)
180 p.tun_if.add_vpp_config()
182 p.tun_if.config_ip4()
183 p.tun_if.config_ip6()
184 config_tun_params(p, self.encryption_type, p.tun_if)
186 r = VppIpRoute(self, p.remote_tun_if_host, 32,
187 [VppRoutePath(p.tun_if.remote_ip4,
190 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
191 [VppRoutePath(p.tun_if.remote_ip6,
193 proto=DpoProto.DPO_PROTO_IP6)])
197 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
200 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
201 """ Ipsec ESP - TUN tests """
202 tun4_encrypt_node_name = "esp4-encrypt-tun"
203 tun4_decrypt_node_name = "esp4-decrypt-tun"
205 def test_tun_basic64(self):
206 """ ipsec 6o4 tunnel basic test """
207 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
209 self.verify_tun_64(self.params[socket.AF_INET], count=1)
211 def test_tun_burst64(self):
212 """ ipsec 6o4 tunnel basic test """
213 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
215 self.verify_tun_64(self.params[socket.AF_INET], count=257)
217 def test_tun_basic_frag44(self):
218 """ ipsec 4o4 tunnel frag basic test """
219 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
223 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
225 self.verify_tun_44(self.params[socket.AF_INET],
226 count=1, payload_size=1800, n_rx=2)
227 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
231 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
232 """ Ipsec ESP UDP tests """
234 tun4_input_node = "ipsec4-tun-input"
237 super(TemplateIpsec4TunIfEspUdp, self).setUp()
238 self.config_network()
240 def test_keepalive(self):
241 """ IPSEC NAT Keepalive """
242 self.verify_keepalive(self.ipv4_params)
245 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
246 """ Ipsec ESP UDP GCM tests """
248 tun4_input_node = "ipsec4-tun-input"
251 super(TemplateIpsec4TunIfEspUdp, self).setUp()
253 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
254 IPSEC_API_INTEG_ALG_NONE)
255 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
256 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
257 p.crypt_algo = "AES-GCM"
259 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
261 self.config_network()
264 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
265 """ Ipsec ESP - TCP tests """
269 class TemplateIpsec6TunIfEsp(TemplateIpsec):
270 """ IPsec tunnel interface tests """
272 encryption_type = ESP
275 super(TemplateIpsec6TunIfEsp, self).setUp()
277 self.tun_if = self.pg0
280 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
281 p.scapy_tun_spi, p.crypt_algo_vpp_id,
282 p.crypt_key, p.crypt_key,
283 p.auth_algo_vpp_id, p.auth_key,
284 p.auth_key, is_ip6=True)
285 p.tun_if.add_vpp_config()
287 p.tun_if.config_ip6()
288 p.tun_if.config_ip4()
289 config_tun_params(p, self.encryption_type, p.tun_if)
291 r = VppIpRoute(self, p.remote_tun_if_host, 128,
292 [VppRoutePath(p.tun_if.remote_ip6,
294 proto=DpoProto.DPO_PROTO_IP6)])
296 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
297 [VppRoutePath(p.tun_if.remote_ip4,
302 super(TemplateIpsec6TunIfEsp, self).tearDown()
305 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
307 """ Ipsec ESP - TUN tests """
308 tun6_encrypt_node_name = "esp6-encrypt-tun"
309 tun6_decrypt_node_name = "esp6-decrypt-tun"
311 def test_tun_basic46(self):
312 """ ipsec 4o6 tunnel basic test """
313 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
314 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
316 def test_tun_burst46(self):
317 """ ipsec 4o6 tunnel burst test """
318 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
319 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
322 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
323 IpsecTun6HandoffTests):
324 """ Ipsec ESP 6 Handoff tests """
325 tun6_encrypt_node_name = "esp6-encrypt-tun"
326 tun6_decrypt_node_name = "esp6-decrypt-tun"
329 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
330 IpsecTun4HandoffTests):
331 """ Ipsec ESP 4 Handoff tests """
332 tun4_encrypt_node_name = "esp4-encrypt-tun"
333 tun4_decrypt_node_name = "esp4-decrypt-tun"
336 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
337 """ IPsec IPv4 Multi Tunnel interface """
339 encryption_type = ESP
340 tun4_encrypt_node_name = "esp4-encrypt-tun"
341 tun4_decrypt_node_name = "esp4-decrypt-tun"
344 super(TestIpsec4MultiTunIfEsp, self).setUp()
346 self.tun_if = self.pg0
348 self.multi_params = []
349 self.pg0.generate_remote_hosts(10)
350 self.pg0.configure_ipv4_neighbors()
353 p = copy.copy(self.ipv4_params)
355 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
356 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
357 p.scapy_tun_spi = p.scapy_tun_spi + ii
358 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
359 p.vpp_tun_spi = p.vpp_tun_spi + ii
361 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
362 p.scapy_tra_spi = p.scapy_tra_spi + ii
363 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
364 p.vpp_tra_spi = p.vpp_tra_spi + ii
365 p.tun_dst = self.pg0.remote_hosts[ii].ip4
367 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
370 p.crypt_key, p.crypt_key,
371 p.auth_algo_vpp_id, p.auth_key,
374 p.tun_if.add_vpp_config()
376 p.tun_if.config_ip4()
377 config_tun_params(p, self.encryption_type, p.tun_if)
378 self.multi_params.append(p)
380 VppIpRoute(self, p.remote_tun_if_host, 32,
381 [VppRoutePath(p.tun_if.remote_ip4,
382 0xffffffff)]).add_vpp_config()
385 super(TestIpsec4MultiTunIfEsp, self).tearDown()
387 def test_tun_44(self):
388 """Multiple IPSEC tunnel interfaces """
389 for p in self.multi_params:
390 self.verify_tun_44(p, count=127)
391 c = p.tun_if.get_rx_stats()
392 self.assertEqual(c['packets'], 127)
393 c = p.tun_if.get_tx_stats()
394 self.assertEqual(c['packets'], 127)
396 def test_tun_rr_44(self):
397 """ Round-robin packets acrros multiple interface """
399 for p in self.multi_params:
400 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
401 src=p.remote_tun_if_host,
402 dst=self.pg1.remote_ip4)
403 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
405 for rx, p in zip(rxs, self.multi_params):
406 self.verify_decrypted(p, [rx])
409 for p in self.multi_params:
410 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
411 dst=p.remote_tun_if_host)
412 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
414 for rx, p in zip(rxs, self.multi_params):
415 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
418 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
419 """ IPsec IPv4 Tunnel interface all Algos """
421 encryption_type = ESP
422 tun4_encrypt_node_name = "esp4-encrypt-tun"
423 tun4_decrypt_node_name = "esp4-decrypt-tun"
425 def config_network(self, p):
427 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
430 p.crypt_key, p.crypt_key,
431 p.auth_algo_vpp_id, p.auth_key,
434 p.tun_if.add_vpp_config()
436 p.tun_if.config_ip4()
437 config_tun_params(p, self.encryption_type, p.tun_if)
438 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
439 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
441 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
442 [VppRoutePath(p.tun_if.remote_ip4,
444 p.route.add_vpp_config()
446 def unconfig_network(self, p):
447 p.tun_if.unconfig_ip4()
448 p.tun_if.remove_vpp_config()
449 p.route.remove_vpp_config()
452 super(TestIpsec4TunIfEspAll, self).setUp()
454 self.tun_if = self.pg0
457 super(TestIpsec4TunIfEspAll, self).tearDown()
461 # change the key and the SPI
463 p.crypt_key = b'X' + p.crypt_key[1:]
465 p.scapy_tun_sa_id += 1
468 p.tun_if.local_spi = p.vpp_tun_spi
469 p.tun_if.remote_spi = p.scapy_tun_spi
471 config_tun_params(p, self.encryption_type, p.tun_if)
473 p.tun_sa_in = VppIpsecSA(self,
480 self.vpp_esp_protocol,
483 p.tun_sa_out = VppIpsecSA(self,
490 self.vpp_esp_protocol,
493 p.tun_sa_in.add_vpp_config()
494 p.tun_sa_out.add_vpp_config()
496 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
497 sa_id=p.tun_sa_in.id,
499 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
500 sa_id=p.tun_sa_out.id,
502 self.logger.info(self.vapi.cli("sh ipsec sa"))
504 def test_tun_44(self):
505 """IPSEC tunnel all algos """
507 # foreach VPP crypto engine
508 engines = ["ia32", "ipsecmb", "openssl"]
510 # foreach crypto algorithm
511 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
512 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
513 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
514 IPSEC_API_INTEG_ALG_NONE),
515 'scapy-crypto': "AES-GCM",
516 'scapy-integ': "NULL",
517 'key': b"JPjyOWBeVEQiMe7h",
519 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
520 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
521 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
522 IPSEC_API_INTEG_ALG_NONE),
523 'scapy-crypto': "AES-GCM",
524 'scapy-integ': "NULL",
525 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
527 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
528 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
529 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
530 IPSEC_API_INTEG_ALG_NONE),
531 'scapy-crypto': "AES-GCM",
532 'scapy-integ': "NULL",
533 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
535 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
536 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
537 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
538 IPSEC_API_INTEG_ALG_SHA1_96),
539 'scapy-crypto': "AES-CBC",
540 'scapy-integ': "HMAC-SHA1-96",
542 'key': b"JPjyOWBeVEQiMe7h"},
543 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
544 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
545 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
546 IPSEC_API_INTEG_ALG_SHA1_96),
547 'scapy-crypto': "AES-CBC",
548 'scapy-integ': "HMAC-SHA1-96",
550 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
551 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
552 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
553 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
554 IPSEC_API_INTEG_ALG_SHA1_96),
555 'scapy-crypto': "AES-CBC",
556 'scapy-integ': "HMAC-SHA1-96",
558 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
559 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
560 IPSEC_API_CRYPTO_ALG_NONE),
561 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
562 IPSEC_API_INTEG_ALG_SHA1_96),
563 'scapy-crypto': "NULL",
564 'scapy-integ': "HMAC-SHA1-96",
566 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
568 for engine in engines:
569 self.vapi.cli("set crypto handler all %s" % engine)
572 # loop through each of the algorithms
575 # with self.subTest(algo=algo['scapy']):
577 p = copy.copy(self.ipv4_params)
578 p.auth_algo_vpp_id = algo['vpp-integ']
579 p.crypt_algo_vpp_id = algo['vpp-crypto']
580 p.crypt_algo = algo['scapy-crypto']
581 p.auth_algo = algo['scapy-integ']
582 p.crypt_key = algo['key']
583 p.salt = algo['salt']
585 self.config_network(p)
587 self.verify_tun_44(p, count=127)
588 c = p.tun_if.get_rx_stats()
589 self.assertEqual(c['packets'], 127)
590 c = p.tun_if.get_tx_stats()
591 self.assertEqual(c['packets'], 127)
597 self.verify_tun_44(p, count=127)
599 self.unconfig_network(p)
600 p.tun_sa_out.remove_vpp_config()
601 p.tun_sa_in.remove_vpp_config()
604 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
605 """ IPsec IPv4 Tunnel interface no Algos """
607 encryption_type = ESP
608 tun4_encrypt_node_name = "esp4-encrypt-tun"
609 tun4_decrypt_node_name = "esp4-decrypt-tun"
611 def config_network(self, p):
613 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
614 IPSEC_API_INTEG_ALG_NONE)
618 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
619 IPSEC_API_CRYPTO_ALG_NONE)
620 p.crypt_algo = 'NULL'
623 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
626 p.crypt_key, p.crypt_key,
627 p.auth_algo_vpp_id, p.auth_key,
630 p.tun_if.add_vpp_config()
632 p.tun_if.config_ip4()
633 config_tun_params(p, self.encryption_type, p.tun_if)
634 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
635 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
637 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
638 [VppRoutePath(p.tun_if.remote_ip4,
640 p.route.add_vpp_config()
642 def unconfig_network(self, p):
643 p.tun_if.unconfig_ip4()
644 p.tun_if.remove_vpp_config()
645 p.route.remove_vpp_config()
648 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
650 self.tun_if = self.pg0
653 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
655 def test_tun_44(self):
656 """ IPSec SA with NULL algos """
659 self.config_network(p)
661 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
662 dst=p.remote_tun_if_host)
663 self.send_and_assert_no_replies(self.pg1, tx)
665 self.unconfig_network(p)
668 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
669 """ IPsec IPv6 Multi Tunnel interface """
671 encryption_type = ESP
672 tun6_encrypt_node_name = "esp6-encrypt-tun"
673 tun6_decrypt_node_name = "esp6-decrypt-tun"
676 super(TestIpsec6MultiTunIfEsp, self).setUp()
678 self.tun_if = self.pg0
680 self.multi_params = []
681 self.pg0.generate_remote_hosts(10)
682 self.pg0.configure_ipv6_neighbors()
685 p = copy.copy(self.ipv6_params)
687 p.remote_tun_if_host = "1111::%d" % (ii + 1)
688 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
689 p.scapy_tun_spi = p.scapy_tun_spi + ii
690 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
691 p.vpp_tun_spi = p.vpp_tun_spi + ii
693 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
694 p.scapy_tra_spi = p.scapy_tra_spi + ii
695 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
696 p.vpp_tra_spi = p.vpp_tra_spi + ii
698 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
701 p.crypt_key, p.crypt_key,
702 p.auth_algo_vpp_id, p.auth_key,
703 p.auth_key, is_ip6=True,
704 dst=self.pg0.remote_hosts[ii].ip6)
705 p.tun_if.add_vpp_config()
707 p.tun_if.config_ip6()
708 config_tun_params(p, self.encryption_type, p.tun_if)
709 self.multi_params.append(p)
711 r = VppIpRoute(self, p.remote_tun_if_host, 128,
712 [VppRoutePath(p.tun_if.remote_ip6,
714 proto=DpoProto.DPO_PROTO_IP6)])
718 super(TestIpsec6MultiTunIfEsp, self).tearDown()
720 def test_tun_66(self):
721 """Multiple IPSEC tunnel interfaces """
722 for p in self.multi_params:
723 self.verify_tun_66(p, count=127)
724 c = p.tun_if.get_rx_stats()
725 self.assertEqual(c['packets'], 127)
726 c = p.tun_if.get_tx_stats()
727 self.assertEqual(c['packets'], 127)
730 class TestIpsecGreTebIfEsp(TemplateIpsec,
732 """ Ipsec GRE TEB ESP - TUN tests """
733 tun4_encrypt_node_name = "esp4-encrypt-tun"
734 tun4_decrypt_node_name = "esp4-decrypt-tun"
735 encryption_type = ESP
736 omac = "00:11:22:33:44:55"
738 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
740 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
741 sa.encrypt(IP(src=self.pg0.remote_ip4,
742 dst=self.pg0.local_ip4) /
744 Ether(dst=self.omac) /
745 IP(src="1.1.1.1", dst="1.1.1.2") /
746 UDP(sport=1144, dport=2233) /
747 Raw(b'X' * payload_size))
748 for i in range(count)]
750 def gen_pkts(self, sw_intf, src, dst, count=1,
752 return [Ether(dst=self.omac) /
753 IP(src="1.1.1.1", dst="1.1.1.2") /
754 UDP(sport=1144, dport=2233) /
755 Raw(b'X' * payload_size)
756 for i in range(count)]
758 def verify_decrypted(self, p, rxs):
760 self.assert_equal(rx[Ether].dst, self.omac)
761 self.assert_equal(rx[IP].dst, "1.1.1.2")
763 def verify_encrypted(self, p, sa, rxs):
766 pkt = sa.decrypt(rx[IP])
767 if not pkt.haslayer(IP):
768 pkt = IP(pkt[Raw].load)
769 self.assert_packet_checksums_valid(pkt)
770 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
771 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
772 self.assertTrue(pkt.haslayer(GRE))
774 self.assertEqual(e[Ether].dst, self.omac)
775 self.assertEqual(e[IP].dst, "1.1.1.2")
776 except (IndexError, AssertionError):
777 self.logger.debug(ppp("Unexpected packet:", rx))
779 self.logger.debug(ppp("Decrypted packet:", pkt))
785 super(TestIpsecGreTebIfEsp, self).setUp()
787 self.tun_if = self.pg0
791 bd1 = VppBridgeDomain(self, 1)
794 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
795 p.auth_algo_vpp_id, p.auth_key,
796 p.crypt_algo_vpp_id, p.crypt_key,
797 self.vpp_esp_protocol,
800 p.tun_sa_out.add_vpp_config()
802 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
803 p.auth_algo_vpp_id, p.auth_key,
804 p.crypt_algo_vpp_id, p.crypt_key,
805 self.vpp_esp_protocol,
808 p.tun_sa_in.add_vpp_config()
810 p.tun_if = VppGreInterface(self,
813 type=(VppEnum.vl_api_gre_tunnel_type_t.
814 GRE_API_TUNNEL_TYPE_TEB))
815 p.tun_if.add_vpp_config()
817 p.tun_protect = VppIpsecTunProtect(self,
822 p.tun_protect.add_vpp_config()
825 p.tun_if.config_ip4()
826 config_tun_params(p, self.encryption_type, p.tun_if)
828 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
829 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
831 self.vapi.cli("clear ipsec sa")
832 self.vapi.cli("sh adj")
833 self.vapi.cli("sh ipsec tun")
837 p.tun_if.unconfig_ip4()
838 super(TestIpsecGreTebIfEsp, self).tearDown()
841 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
843 """ Ipsec GRE TEB ESP - TUN tests """
844 tun4_encrypt_node_name = "esp4-encrypt-tun"
845 tun4_decrypt_node_name = "esp4-decrypt-tun"
846 encryption_type = ESP
847 omac = "00:11:22:33:44:55"
849 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
851 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
852 sa.encrypt(IP(src=self.pg0.remote_ip4,
853 dst=self.pg0.local_ip4) /
855 Ether(dst=self.omac) /
856 IP(src="1.1.1.1", dst="1.1.1.2") /
857 UDP(sport=1144, dport=2233) /
858 Raw(b'X' * payload_size))
859 for i in range(count)]
861 def gen_pkts(self, sw_intf, src, dst, count=1,
863 return [Ether(dst=self.omac) /
865 IP(src="1.1.1.1", dst="1.1.1.2") /
866 UDP(sport=1144, dport=2233) /
867 Raw(b'X' * payload_size)
868 for i in range(count)]
870 def verify_decrypted(self, p, rxs):
872 self.assert_equal(rx[Ether].dst, self.omac)
873 self.assert_equal(rx[Dot1Q].vlan, 11)
874 self.assert_equal(rx[IP].dst, "1.1.1.2")
876 def verify_encrypted(self, p, sa, rxs):
879 pkt = sa.decrypt(rx[IP])
880 if not pkt.haslayer(IP):
881 pkt = IP(pkt[Raw].load)
882 self.assert_packet_checksums_valid(pkt)
883 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
884 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
885 self.assertTrue(pkt.haslayer(GRE))
887 self.assertEqual(e[Ether].dst, self.omac)
888 self.assertFalse(e.haslayer(Dot1Q))
889 self.assertEqual(e[IP].dst, "1.1.1.2")
890 except (IndexError, AssertionError):
891 self.logger.debug(ppp("Unexpected packet:", rx))
893 self.logger.debug(ppp("Decrypted packet:", pkt))
899 super(TestIpsecGreTebVlanIfEsp, self).setUp()
901 self.tun_if = self.pg0
905 bd1 = VppBridgeDomain(self, 1)
908 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
909 self.vapi.l2_interface_vlan_tag_rewrite(
910 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
912 self.pg1_11.admin_up()
914 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
915 p.auth_algo_vpp_id, p.auth_key,
916 p.crypt_algo_vpp_id, p.crypt_key,
917 self.vpp_esp_protocol,
920 p.tun_sa_out.add_vpp_config()
922 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
923 p.auth_algo_vpp_id, p.auth_key,
924 p.crypt_algo_vpp_id, p.crypt_key,
925 self.vpp_esp_protocol,
928 p.tun_sa_in.add_vpp_config()
930 p.tun_if = VppGreInterface(self,
933 type=(VppEnum.vl_api_gre_tunnel_type_t.
934 GRE_API_TUNNEL_TYPE_TEB))
935 p.tun_if.add_vpp_config()
937 p.tun_protect = VppIpsecTunProtect(self,
942 p.tun_protect.add_vpp_config()
945 p.tun_if.config_ip4()
946 config_tun_params(p, self.encryption_type, p.tun_if)
948 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
949 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
951 self.vapi.cli("clear ipsec sa")
955 p.tun_if.unconfig_ip4()
956 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
957 self.pg1_11.admin_down()
958 self.pg1_11.remove_vpp_config()
961 class TestIpsecGreTebIfEspTra(TemplateIpsec,
963 """ Ipsec GRE TEB ESP - Tra tests """
964 tun4_encrypt_node_name = "esp4-encrypt-tun"
965 tun4_decrypt_node_name = "esp4-decrypt-tun"
966 encryption_type = ESP
967 omac = "00:11:22:33:44:55"
969 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
971 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
972 sa.encrypt(IP(src=self.pg0.remote_ip4,
973 dst=self.pg0.local_ip4) /
975 Ether(dst=self.omac) /
976 IP(src="1.1.1.1", dst="1.1.1.2") /
977 UDP(sport=1144, dport=2233) /
978 Raw(b'X' * payload_size))
979 for i in range(count)]
981 def gen_pkts(self, sw_intf, src, dst, count=1,
983 return [Ether(dst=self.omac) /
984 IP(src="1.1.1.1", dst="1.1.1.2") /
985 UDP(sport=1144, dport=2233) /
986 Raw(b'X' * payload_size)
987 for i in range(count)]
989 def verify_decrypted(self, p, rxs):
991 self.assert_equal(rx[Ether].dst, self.omac)
992 self.assert_equal(rx[IP].dst, "1.1.1.2")
994 def verify_encrypted(self, p, sa, rxs):
997 pkt = sa.decrypt(rx[IP])
998 if not pkt.haslayer(IP):
999 pkt = IP(pkt[Raw].load)
1000 self.assert_packet_checksums_valid(pkt)
1001 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1002 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1003 self.assertTrue(pkt.haslayer(GRE))
1005 self.assertEqual(e[Ether].dst, self.omac)
1006 self.assertEqual(e[IP].dst, "1.1.1.2")
1007 except (IndexError, AssertionError):
1008 self.logger.debug(ppp("Unexpected packet:", rx))
1010 self.logger.debug(ppp("Decrypted packet:", pkt))
1016 super(TestIpsecGreTebIfEspTra, self).setUp()
1018 self.tun_if = self.pg0
1020 p = self.ipv4_params
1022 bd1 = VppBridgeDomain(self, 1)
1023 bd1.add_vpp_config()
1025 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1026 p.auth_algo_vpp_id, p.auth_key,
1027 p.crypt_algo_vpp_id, p.crypt_key,
1028 self.vpp_esp_protocol)
1029 p.tun_sa_out.add_vpp_config()
1031 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1032 p.auth_algo_vpp_id, p.auth_key,
1033 p.crypt_algo_vpp_id, p.crypt_key,
1034 self.vpp_esp_protocol)
1035 p.tun_sa_in.add_vpp_config()
1037 p.tun_if = VppGreInterface(self,
1039 self.pg0.remote_ip4,
1040 type=(VppEnum.vl_api_gre_tunnel_type_t.
1041 GRE_API_TUNNEL_TYPE_TEB))
1042 p.tun_if.add_vpp_config()
1044 p.tun_protect = VppIpsecTunProtect(self,
1049 p.tun_protect.add_vpp_config()
1052 p.tun_if.config_ip4()
1053 config_tra_params(p, self.encryption_type, p.tun_if)
1055 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1056 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1058 self.vapi.cli("clear ipsec sa")
1061 p = self.ipv4_params
1062 p.tun_if.unconfig_ip4()
1063 super(TestIpsecGreTebIfEspTra, self).tearDown()
1066 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1068 """ Ipsec GRE TEB UDP ESP - Tra tests """
1069 tun4_encrypt_node_name = "esp4-encrypt-tun"
1070 tun4_decrypt_node_name = "esp4-decrypt-tun"
1071 encryption_type = ESP
1072 omac = "00:11:22:33:44:55"
1074 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1076 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1077 sa.encrypt(IP(src=self.pg0.remote_ip4,
1078 dst=self.pg0.local_ip4) /
1080 Ether(dst=self.omac) /
1081 IP(src="1.1.1.1", dst="1.1.1.2") /
1082 UDP(sport=1144, dport=2233) /
1083 Raw(b'X' * payload_size))
1084 for i in range(count)]
1086 def gen_pkts(self, sw_intf, src, dst, count=1,
1088 return [Ether(dst=self.omac) /
1089 IP(src="1.1.1.1", dst="1.1.1.2") /
1090 UDP(sport=1144, dport=2233) /
1091 Raw(b'X' * payload_size)
1092 for i in range(count)]
1094 def verify_decrypted(self, p, rxs):
1096 self.assert_equal(rx[Ether].dst, self.omac)
1097 self.assert_equal(rx[IP].dst, "1.1.1.2")
1099 def verify_encrypted(self, p, sa, rxs):
1101 self.assertTrue(rx.haslayer(UDP))
1102 self.assertEqual(rx[UDP].dport, 4545)
1103 self.assertEqual(rx[UDP].sport, 5454)
1105 pkt = sa.decrypt(rx[IP])
1106 if not pkt.haslayer(IP):
1107 pkt = IP(pkt[Raw].load)
1108 self.assert_packet_checksums_valid(pkt)
1109 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1110 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1111 self.assertTrue(pkt.haslayer(GRE))
1113 self.assertEqual(e[Ether].dst, self.omac)
1114 self.assertEqual(e[IP].dst, "1.1.1.2")
1115 except (IndexError, AssertionError):
1116 self.logger.debug(ppp("Unexpected packet:", rx))
1118 self.logger.debug(ppp("Decrypted packet:", pkt))
1124 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1126 self.tun_if = self.pg0
1128 p = self.ipv4_params
1129 p = self.ipv4_params
1130 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1131 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1132 p.nat_header = UDP(sport=5454, dport=4545)
1134 bd1 = VppBridgeDomain(self, 1)
1135 bd1.add_vpp_config()
1137 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1138 p.auth_algo_vpp_id, p.auth_key,
1139 p.crypt_algo_vpp_id, p.crypt_key,
1140 self.vpp_esp_protocol,
1144 p.tun_sa_out.add_vpp_config()
1146 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1147 p.auth_algo_vpp_id, p.auth_key,
1148 p.crypt_algo_vpp_id, p.crypt_key,
1149 self.vpp_esp_protocol,
1151 VppEnum.vl_api_ipsec_sad_flags_t.
1152 IPSEC_API_SAD_FLAG_IS_INBOUND),
1155 p.tun_sa_in.add_vpp_config()
1157 p.tun_if = VppGreInterface(self,
1159 self.pg0.remote_ip4,
1160 type=(VppEnum.vl_api_gre_tunnel_type_t.
1161 GRE_API_TUNNEL_TYPE_TEB))
1162 p.tun_if.add_vpp_config()
1164 p.tun_protect = VppIpsecTunProtect(self,
1169 p.tun_protect.add_vpp_config()
1172 p.tun_if.config_ip4()
1173 config_tra_params(p, self.encryption_type, p.tun_if)
1175 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1176 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1178 self.vapi.cli("clear ipsec sa")
1179 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1182 p = self.ipv4_params
1183 p.tun_if.unconfig_ip4()
1184 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1187 class TestIpsecGreIfEsp(TemplateIpsec,
1189 """ Ipsec GRE ESP - TUN tests """
1190 tun4_encrypt_node_name = "esp4-encrypt-tun"
1191 tun4_decrypt_node_name = "esp4-decrypt-tun"
1192 encryption_type = ESP
1194 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1196 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1197 sa.encrypt(IP(src=self.pg0.remote_ip4,
1198 dst=self.pg0.local_ip4) /
1200 IP(src=self.pg1.local_ip4,
1201 dst=self.pg1.remote_ip4) /
1202 UDP(sport=1144, dport=2233) /
1203 Raw(b'X' * payload_size))
1204 for i in range(count)]
1206 def gen_pkts(self, sw_intf, src, dst, count=1,
1208 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1209 IP(src="1.1.1.1", dst="1.1.1.2") /
1210 UDP(sport=1144, dport=2233) /
1211 Raw(b'X' * payload_size)
1212 for i in range(count)]
1214 def verify_decrypted(self, p, rxs):
1216 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1217 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1219 def verify_encrypted(self, p, sa, rxs):
1222 pkt = sa.decrypt(rx[IP])
1223 if not pkt.haslayer(IP):
1224 pkt = IP(pkt[Raw].load)
1225 self.assert_packet_checksums_valid(pkt)
1226 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1227 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1228 self.assertTrue(pkt.haslayer(GRE))
1230 self.assertEqual(e[IP].dst, "1.1.1.2")
1231 except (IndexError, AssertionError):
1232 self.logger.debug(ppp("Unexpected packet:", rx))
1234 self.logger.debug(ppp("Decrypted packet:", pkt))
1240 super(TestIpsecGreIfEsp, self).setUp()
1242 self.tun_if = self.pg0
1244 p = self.ipv4_params
1246 bd1 = VppBridgeDomain(self, 1)
1247 bd1.add_vpp_config()
1249 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1250 p.auth_algo_vpp_id, p.auth_key,
1251 p.crypt_algo_vpp_id, p.crypt_key,
1252 self.vpp_esp_protocol,
1254 self.pg0.remote_ip4)
1255 p.tun_sa_out.add_vpp_config()
1257 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1258 p.auth_algo_vpp_id, p.auth_key,
1259 p.crypt_algo_vpp_id, p.crypt_key,
1260 self.vpp_esp_protocol,
1261 self.pg0.remote_ip4,
1263 p.tun_sa_in.add_vpp_config()
1265 p.tun_if = VppGreInterface(self,
1267 self.pg0.remote_ip4)
1268 p.tun_if.add_vpp_config()
1270 p.tun_protect = VppIpsecTunProtect(self,
1274 p.tun_protect.add_vpp_config()
1277 p.tun_if.config_ip4()
1278 config_tun_params(p, self.encryption_type, p.tun_if)
1280 VppIpRoute(self, "1.1.1.2", 32,
1281 [VppRoutePath(p.tun_if.remote_ip4,
1282 0xffffffff)]).add_vpp_config()
1285 p = self.ipv4_params
1286 p.tun_if.unconfig_ip4()
1287 super(TestIpsecGreIfEsp, self).tearDown()
1290 class TestIpsecGreIfEspTra(TemplateIpsec,
1292 """ Ipsec GRE ESP - TRA tests """
1293 tun4_encrypt_node_name = "esp4-encrypt-tun"
1294 tun4_decrypt_node_name = "esp4-decrypt-tun"
1295 encryption_type = ESP
1297 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1299 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1300 sa.encrypt(IP(src=self.pg0.remote_ip4,
1301 dst=self.pg0.local_ip4) /
1303 IP(src=self.pg1.local_ip4,
1304 dst=self.pg1.remote_ip4) /
1305 UDP(sport=1144, dport=2233) /
1306 Raw(b'X' * payload_size))
1307 for i in range(count)]
1309 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1311 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1312 sa.encrypt(IP(src=self.pg0.remote_ip4,
1313 dst=self.pg0.local_ip4) /
1315 UDP(sport=1144, dport=2233) /
1316 Raw(b'X' * payload_size))
1317 for i in range(count)]
1319 def gen_pkts(self, sw_intf, src, dst, count=1,
1321 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1322 IP(src="1.1.1.1", dst="1.1.1.2") /
1323 UDP(sport=1144, dport=2233) /
1324 Raw(b'X' * payload_size)
1325 for i in range(count)]
1327 def verify_decrypted(self, p, rxs):
1329 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1330 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1332 def verify_encrypted(self, p, sa, rxs):
1335 pkt = sa.decrypt(rx[IP])
1336 if not pkt.haslayer(IP):
1337 pkt = IP(pkt[Raw].load)
1338 self.assert_packet_checksums_valid(pkt)
1339 self.assertTrue(pkt.haslayer(GRE))
1341 self.assertEqual(e[IP].dst, "1.1.1.2")
1342 except (IndexError, AssertionError):
1343 self.logger.debug(ppp("Unexpected packet:", rx))
1345 self.logger.debug(ppp("Decrypted packet:", pkt))
1351 super(TestIpsecGreIfEspTra, self).setUp()
1353 self.tun_if = self.pg0
1355 p = self.ipv4_params
1357 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1358 p.auth_algo_vpp_id, p.auth_key,
1359 p.crypt_algo_vpp_id, p.crypt_key,
1360 self.vpp_esp_protocol)
1361 p.tun_sa_out.add_vpp_config()
1363 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1364 p.auth_algo_vpp_id, p.auth_key,
1365 p.crypt_algo_vpp_id, p.crypt_key,
1366 self.vpp_esp_protocol)
1367 p.tun_sa_in.add_vpp_config()
1369 p.tun_if = VppGreInterface(self,
1371 self.pg0.remote_ip4)
1372 p.tun_if.add_vpp_config()
1374 p.tun_protect = VppIpsecTunProtect(self,
1378 p.tun_protect.add_vpp_config()
1381 p.tun_if.config_ip4()
1382 config_tra_params(p, self.encryption_type, p.tun_if)
1384 VppIpRoute(self, "1.1.1.2", 32,
1385 [VppRoutePath(p.tun_if.remote_ip4,
1386 0xffffffff)]).add_vpp_config()
1389 p = self.ipv4_params
1390 p.tun_if.unconfig_ip4()
1391 super(TestIpsecGreIfEspTra, self).tearDown()
1393 def test_gre_non_ip(self):
1394 p = self.ipv4_params
1395 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1396 src=p.remote_tun_if_host,
1397 dst=self.pg1.remote_ip6)
1398 self.send_and_assert_no_replies(self.tun_if, tx)
1399 node_name = ('/err/%s/unsupported payload' %
1400 self.tun4_decrypt_node_name)
1401 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1404 class TestIpsecGre6IfEspTra(TemplateIpsec,
1406 """ Ipsec GRE ESP - TRA tests """
1407 tun6_encrypt_node_name = "esp6-encrypt-tun"
1408 tun6_decrypt_node_name = "esp6-decrypt-tun"
1409 encryption_type = ESP
1411 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1413 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1414 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1415 dst=self.pg0.local_ip6) /
1417 IPv6(src=self.pg1.local_ip6,
1418 dst=self.pg1.remote_ip6) /
1419 UDP(sport=1144, dport=2233) /
1420 Raw(b'X' * payload_size))
1421 for i in range(count)]
1423 def gen_pkts6(self, sw_intf, src, dst, count=1,
1425 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1426 IPv6(src="1::1", dst="1::2") /
1427 UDP(sport=1144, dport=2233) /
1428 Raw(b'X' * payload_size)
1429 for i in range(count)]
1431 def verify_decrypted6(self, p, rxs):
1433 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1434 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1436 def verify_encrypted6(self, p, sa, rxs):
1439 pkt = sa.decrypt(rx[IPv6])
1440 if not pkt.haslayer(IPv6):
1441 pkt = IPv6(pkt[Raw].load)
1442 self.assert_packet_checksums_valid(pkt)
1443 self.assertTrue(pkt.haslayer(GRE))
1445 self.assertEqual(e[IPv6].dst, "1::2")
1446 except (IndexError, AssertionError):
1447 self.logger.debug(ppp("Unexpected packet:", rx))
1449 self.logger.debug(ppp("Decrypted packet:", pkt))
1455 super(TestIpsecGre6IfEspTra, self).setUp()
1457 self.tun_if = self.pg0
1459 p = self.ipv6_params
1461 bd1 = VppBridgeDomain(self, 1)
1462 bd1.add_vpp_config()
1464 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1465 p.auth_algo_vpp_id, p.auth_key,
1466 p.crypt_algo_vpp_id, p.crypt_key,
1467 self.vpp_esp_protocol)
1468 p.tun_sa_out.add_vpp_config()
1470 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1471 p.auth_algo_vpp_id, p.auth_key,
1472 p.crypt_algo_vpp_id, p.crypt_key,
1473 self.vpp_esp_protocol)
1474 p.tun_sa_in.add_vpp_config()
1476 p.tun_if = VppGreInterface(self,
1478 self.pg0.remote_ip6)
1479 p.tun_if.add_vpp_config()
1481 p.tun_protect = VppIpsecTunProtect(self,
1485 p.tun_protect.add_vpp_config()
1488 p.tun_if.config_ip6()
1489 config_tra_params(p, self.encryption_type, p.tun_if)
1491 r = VppIpRoute(self, "1::2", 128,
1492 [VppRoutePath(p.tun_if.remote_ip6,
1494 proto=DpoProto.DPO_PROTO_IP6)])
1498 p = self.ipv6_params
1499 p.tun_if.unconfig_ip6()
1500 super(TestIpsecGre6IfEspTra, self).tearDown()
1503 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1504 """ Ipsec mGRE ESP v4 TRA tests """
1505 tun4_encrypt_node_name = "esp4-encrypt-tun"
1506 tun4_decrypt_node_name = "esp4-decrypt-tun"
1507 encryption_type = ESP
1509 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1511 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1512 sa.encrypt(IP(src=p.tun_dst,
1513 dst=self.pg0.local_ip4) /
1515 IP(src=self.pg1.local_ip4,
1516 dst=self.pg1.remote_ip4) /
1517 UDP(sport=1144, dport=2233) /
1518 Raw(b'X' * payload_size))
1519 for i in range(count)]
1521 def gen_pkts(self, sw_intf, src, dst, count=1,
1523 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1524 IP(src="1.1.1.1", dst=dst) /
1525 UDP(sport=1144, dport=2233) /
1526 Raw(b'X' * payload_size)
1527 for i in range(count)]
1529 def verify_decrypted(self, p, rxs):
1531 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1532 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1534 def verify_encrypted(self, p, sa, rxs):
1537 pkt = sa.decrypt(rx[IP])
1538 if not pkt.haslayer(IP):
1539 pkt = IP(pkt[Raw].load)
1540 self.assert_packet_checksums_valid(pkt)
1541 self.assertTrue(pkt.haslayer(GRE))
1543 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1544 except (IndexError, AssertionError):
1545 self.logger.debug(ppp("Unexpected packet:", rx))
1547 self.logger.debug(ppp("Decrypted packet:", pkt))
1553 super(TestIpsecMGreIfEspTra4, self).setUp()
1556 self.tun_if = self.pg0
1557 p = self.ipv4_params
1558 p.tun_if = VppGreInterface(self,
1561 mode=(VppEnum.vl_api_tunnel_mode_t.
1562 TUNNEL_API_MODE_MP))
1563 p.tun_if.add_vpp_config()
1565 p.tun_if.config_ip4()
1566 p.tun_if.generate_remote_hosts(N_NHS)
1567 self.pg0.generate_remote_hosts(N_NHS)
1568 self.pg0.configure_ipv4_neighbors()
1570 # setup some SAs for several next-hops on the interface
1571 self.multi_params = []
1574 p = copy.copy(self.ipv4_params)
1576 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1577 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1578 p.scapy_tun_spi = p.scapy_tun_spi + ii
1579 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1580 p.vpp_tun_spi = p.vpp_tun_spi + ii
1582 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1583 p.scapy_tra_spi = p.scapy_tra_spi + ii
1584 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1585 p.vpp_tra_spi = p.vpp_tra_spi + ii
1586 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1587 p.auth_algo_vpp_id, p.auth_key,
1588 p.crypt_algo_vpp_id, p.crypt_key,
1589 self.vpp_esp_protocol)
1590 p.tun_sa_out.add_vpp_config()
1592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1593 p.auth_algo_vpp_id, p.auth_key,
1594 p.crypt_algo_vpp_id, p.crypt_key,
1595 self.vpp_esp_protocol)
1596 p.tun_sa_in.add_vpp_config()
1598 p.tun_protect = VppIpsecTunProtect(
1603 nh=p.tun_if.remote_hosts[ii].ip4)
1604 p.tun_protect.add_vpp_config()
1605 config_tra_params(p, self.encryption_type, p.tun_if)
1606 self.multi_params.append(p)
1608 VppIpRoute(self, p.remote_tun_if_host, 32,
1609 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1610 p.tun_if.sw_if_index)]).add_vpp_config()
1612 # in this v4 variant add the teibs after the protect
1613 p.teib = VppTeib(self, p.tun_if,
1614 p.tun_if.remote_hosts[ii].ip4,
1615 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1616 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1617 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1620 p = self.ipv4_params
1621 p.tun_if.unconfig_ip4()
1622 super(TestIpsecMGreIfEspTra4, self).tearDown()
1624 def test_tun_44(self):
1627 for p in self.multi_params:
1628 self.verify_tun_44(p, count=N_PKTS)
1629 p.teib.remove_vpp_config()
1630 self.verify_tun_dropped_44(p, count=N_PKTS)
1631 p.teib.add_vpp_config()
1632 self.verify_tun_44(p, count=N_PKTS)
1635 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1636 """ Ipsec mGRE ESP v6 TRA tests """
1637 tun6_encrypt_node_name = "esp6-encrypt-tun"
1638 tun6_decrypt_node_name = "esp6-decrypt-tun"
1639 encryption_type = ESP
1641 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1643 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1644 sa.encrypt(IPv6(src=p.tun_dst,
1645 dst=self.pg0.local_ip6) /
1647 IPv6(src=self.pg1.local_ip6,
1648 dst=self.pg1.remote_ip6) /
1649 UDP(sport=1144, dport=2233) /
1650 Raw(b'X' * payload_size))
1651 for i in range(count)]
1653 def gen_pkts6(self, sw_intf, src, dst, count=1,
1655 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1656 IPv6(src="1::1", dst=dst) /
1657 UDP(sport=1144, dport=2233) /
1658 Raw(b'X' * payload_size)
1659 for i in range(count)]
1661 def verify_decrypted6(self, p, rxs):
1663 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1664 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1666 def verify_encrypted6(self, p, sa, rxs):
1669 pkt = sa.decrypt(rx[IPv6])
1670 if not pkt.haslayer(IPv6):
1671 pkt = IPv6(pkt[Raw].load)
1672 self.assert_packet_checksums_valid(pkt)
1673 self.assertTrue(pkt.haslayer(GRE))
1675 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1676 except (IndexError, AssertionError):
1677 self.logger.debug(ppp("Unexpected packet:", rx))
1679 self.logger.debug(ppp("Decrypted packet:", pkt))
1685 super(TestIpsecMGreIfEspTra6, self).setUp()
1687 self.vapi.cli("set logging class ipsec level debug")
1690 self.tun_if = self.pg0
1691 p = self.ipv6_params
1692 p.tun_if = VppGreInterface(self,
1695 mode=(VppEnum.vl_api_tunnel_mode_t.
1696 TUNNEL_API_MODE_MP))
1697 p.tun_if.add_vpp_config()
1699 p.tun_if.config_ip6()
1700 p.tun_if.generate_remote_hosts(N_NHS)
1701 self.pg0.generate_remote_hosts(N_NHS)
1702 self.pg0.configure_ipv6_neighbors()
1704 # setup some SAs for several next-hops on the interface
1705 self.multi_params = []
1707 for ii in range(N_NHS):
1708 p = copy.copy(self.ipv6_params)
1710 p.remote_tun_if_host = "1::%d" % (ii + 1)
1711 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1712 p.scapy_tun_spi = p.scapy_tun_spi + ii
1713 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1714 p.vpp_tun_spi = p.vpp_tun_spi + ii
1716 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1717 p.scapy_tra_spi = p.scapy_tra_spi + ii
1718 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1719 p.vpp_tra_spi = p.vpp_tra_spi + ii
1720 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1721 p.auth_algo_vpp_id, p.auth_key,
1722 p.crypt_algo_vpp_id, p.crypt_key,
1723 self.vpp_esp_protocol)
1724 p.tun_sa_out.add_vpp_config()
1726 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1727 p.auth_algo_vpp_id, p.auth_key,
1728 p.crypt_algo_vpp_id, p.crypt_key,
1729 self.vpp_esp_protocol)
1730 p.tun_sa_in.add_vpp_config()
1732 # in this v6 variant add the teibs first then the protection
1733 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1734 VppTeib(self, p.tun_if,
1735 p.tun_if.remote_hosts[ii].ip6,
1736 p.tun_dst).add_vpp_config()
1738 p.tun_protect = VppIpsecTunProtect(
1743 nh=p.tun_if.remote_hosts[ii].ip6)
1744 p.tun_protect.add_vpp_config()
1745 config_tra_params(p, self.encryption_type, p.tun_if)
1746 self.multi_params.append(p)
1748 VppIpRoute(self, p.remote_tun_if_host, 128,
1749 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1750 p.tun_if.sw_if_index)]).add_vpp_config()
1751 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1753 self.logger.info(self.vapi.cli("sh log"))
1754 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1755 self.logger.info(self.vapi.cli("sh adj 41"))
1758 p = self.ipv6_params
1759 p.tun_if.unconfig_ip6()
1760 super(TestIpsecMGreIfEspTra6, self).tearDown()
1762 def test_tun_66(self):
1764 for p in self.multi_params:
1765 self.verify_tun_66(p, count=63)
1768 class TemplateIpsec4TunProtect(object):
1769 """ IPsec IPv4 Tunnel protect """
1771 encryption_type = ESP
1772 tun4_encrypt_node_name = "esp4-encrypt-tun"
1773 tun4_decrypt_node_name = "esp4-decrypt-tun"
1774 tun4_input_node = "ipsec4-tun-input"
1776 def config_sa_tra(self, p):
1777 config_tun_params(p, self.encryption_type, p.tun_if)
1779 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1780 p.auth_algo_vpp_id, p.auth_key,
1781 p.crypt_algo_vpp_id, p.crypt_key,
1782 self.vpp_esp_protocol,
1784 p.tun_sa_out.add_vpp_config()
1786 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1787 p.auth_algo_vpp_id, p.auth_key,
1788 p.crypt_algo_vpp_id, p.crypt_key,
1789 self.vpp_esp_protocol,
1791 p.tun_sa_in.add_vpp_config()
1793 def config_sa_tun(self, p):
1794 config_tun_params(p, self.encryption_type, p.tun_if)
1796 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1797 p.auth_algo_vpp_id, p.auth_key,
1798 p.crypt_algo_vpp_id, p.crypt_key,
1799 self.vpp_esp_protocol,
1800 self.tun_if.local_addr[p.addr_type],
1801 self.tun_if.remote_addr[p.addr_type],
1803 p.tun_sa_out.add_vpp_config()
1805 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1806 p.auth_algo_vpp_id, p.auth_key,
1807 p.crypt_algo_vpp_id, p.crypt_key,
1808 self.vpp_esp_protocol,
1809 self.tun_if.remote_addr[p.addr_type],
1810 self.tun_if.local_addr[p.addr_type],
1812 p.tun_sa_in.add_vpp_config()
1814 def config_protect(self, p):
1815 p.tun_protect = VppIpsecTunProtect(self,
1819 p.tun_protect.add_vpp_config()
1821 def config_network(self, p):
1822 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1824 self.pg0.remote_ip4)
1825 p.tun_if.add_vpp_config()
1827 p.tun_if.config_ip4()
1828 p.tun_if.config_ip6()
1830 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1831 [VppRoutePath(p.tun_if.remote_ip4,
1833 p.route.add_vpp_config()
1834 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1835 [VppRoutePath(p.tun_if.remote_ip6,
1837 proto=DpoProto.DPO_PROTO_IP6)])
1840 def unconfig_network(self, p):
1841 p.route.remove_vpp_config()
1842 p.tun_if.remove_vpp_config()
1844 def unconfig_protect(self, p):
1845 p.tun_protect.remove_vpp_config()
1847 def unconfig_sa(self, p):
1848 p.tun_sa_out.remove_vpp_config()
1849 p.tun_sa_in.remove_vpp_config()
1852 class TestIpsec4TunProtect(TemplateIpsec,
1853 TemplateIpsec4TunProtect,
1855 """ IPsec IPv4 Tunnel protect - transport mode"""
1858 super(TestIpsec4TunProtect, self).setUp()
1860 self.tun_if = self.pg0
1863 super(TestIpsec4TunProtect, self).tearDown()
1865 def test_tun_44(self):
1866 """IPSEC tunnel protect"""
1868 p = self.ipv4_params
1870 self.config_network(p)
1871 self.config_sa_tra(p)
1872 self.config_protect(p)
1874 self.verify_tun_44(p, count=127)
1875 c = p.tun_if.get_rx_stats()
1876 self.assertEqual(c['packets'], 127)
1877 c = p.tun_if.get_tx_stats()
1878 self.assertEqual(c['packets'], 127)
1880 self.vapi.cli("clear ipsec sa")
1881 self.verify_tun_64(p, count=127)
1882 c = p.tun_if.get_rx_stats()
1883 self.assertEqual(c['packets'], 254)
1884 c = p.tun_if.get_tx_stats()
1885 self.assertEqual(c['packets'], 254)
1887 # rekey - create new SAs and update the tunnel protection
1889 np.crypt_key = b'X' + p.crypt_key[1:]
1890 np.scapy_tun_spi += 100
1891 np.scapy_tun_sa_id += 1
1892 np.vpp_tun_spi += 100
1893 np.vpp_tun_sa_id += 1
1894 np.tun_if.local_spi = p.vpp_tun_spi
1895 np.tun_if.remote_spi = p.scapy_tun_spi
1897 self.config_sa_tra(np)
1898 self.config_protect(np)
1901 self.verify_tun_44(np, count=127)
1902 c = p.tun_if.get_rx_stats()
1903 self.assertEqual(c['packets'], 381)
1904 c = p.tun_if.get_tx_stats()
1905 self.assertEqual(c['packets'], 381)
1908 self.unconfig_protect(np)
1909 self.unconfig_sa(np)
1910 self.unconfig_network(p)
1913 class TestIpsec4TunProtectUdp(TemplateIpsec,
1914 TemplateIpsec4TunProtect,
1916 """ IPsec IPv4 Tunnel protect - transport mode"""
1919 super(TestIpsec4TunProtectUdp, self).setUp()
1921 self.tun_if = self.pg0
1923 p = self.ipv4_params
1924 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1925 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1926 p.nat_header = UDP(sport=4500, dport=4500)
1927 self.config_network(p)
1928 self.config_sa_tra(p)
1929 self.config_protect(p)
1932 p = self.ipv4_params
1933 self.unconfig_protect(p)
1935 self.unconfig_network(p)
1936 super(TestIpsec4TunProtectUdp, self).tearDown()
1938 def verify_encrypted(self, p, sa, rxs):
1939 # ensure encrypted packets are recieved with the default UDP ports
1941 self.assertEqual(rx[UDP].sport, 4500)
1942 self.assertEqual(rx[UDP].dport, 4500)
1943 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1945 def test_tun_44(self):
1946 """IPSEC UDP tunnel protect"""
1948 p = self.ipv4_params
1950 self.verify_tun_44(p, count=127)
1951 c = p.tun_if.get_rx_stats()
1952 self.assertEqual(c['packets'], 127)
1953 c = p.tun_if.get_tx_stats()
1954 self.assertEqual(c['packets'], 127)
1956 def test_keepalive(self):
1957 """ IPSEC NAT Keepalive """
1958 self.verify_keepalive(self.ipv4_params)
1961 class TestIpsec4TunProtectTun(TemplateIpsec,
1962 TemplateIpsec4TunProtect,
1964 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1966 encryption_type = ESP
1967 tun4_encrypt_node_name = "esp4-encrypt-tun"
1968 tun4_decrypt_node_name = "esp4-decrypt-tun"
1971 super(TestIpsec4TunProtectTun, self).setUp()
1973 self.tun_if = self.pg0
1976 super(TestIpsec4TunProtectTun, self).tearDown()
1978 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1980 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1981 sa.encrypt(IP(src=sw_intf.remote_ip4,
1982 dst=sw_intf.local_ip4) /
1983 IP(src=src, dst=dst) /
1984 UDP(sport=1144, dport=2233) /
1985 Raw(b'X' * payload_size))
1986 for i in range(count)]
1988 def gen_pkts(self, sw_intf, src, dst, count=1,
1990 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1991 IP(src=src, dst=dst) /
1992 UDP(sport=1144, dport=2233) /
1993 Raw(b'X' * payload_size)
1994 for i in range(count)]
1996 def verify_decrypted(self, p, rxs):
1998 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1999 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2000 self.assert_packet_checksums_valid(rx)
2002 def verify_encrypted(self, p, sa, rxs):
2005 pkt = sa.decrypt(rx[IP])
2006 if not pkt.haslayer(IP):
2007 pkt = IP(pkt[Raw].load)
2008 self.assert_packet_checksums_valid(pkt)
2009 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2010 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2011 inner = pkt[IP].payload
2012 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2014 except (IndexError, AssertionError):
2015 self.logger.debug(ppp("Unexpected packet:", rx))
2017 self.logger.debug(ppp("Decrypted packet:", pkt))
2022 def test_tun_44(self):
2023 """IPSEC tunnel protect """
2025 p = self.ipv4_params
2027 self.config_network(p)
2028 self.config_sa_tun(p)
2029 self.config_protect(p)
2031 # also add an output features on the tunnel and physical interface
2032 # so we test they still work
2033 r_all = AclRule(True,
2034 src_prefix="0.0.0.0/0",
2035 dst_prefix="0.0.0.0/0",
2037 a = VppAcl(self, [r_all]).add_vpp_config()
2039 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2040 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2042 self.verify_tun_44(p, count=127)
2044 c = p.tun_if.get_rx_stats()
2045 self.assertEqual(c['packets'], 127)
2046 c = p.tun_if.get_tx_stats()
2047 self.assertEqual(c['packets'], 127)
2049 # rekey - create new SAs and update the tunnel protection
2051 np.crypt_key = b'X' + p.crypt_key[1:]
2052 np.scapy_tun_spi += 100
2053 np.scapy_tun_sa_id += 1
2054 np.vpp_tun_spi += 100
2055 np.vpp_tun_sa_id += 1
2056 np.tun_if.local_spi = p.vpp_tun_spi
2057 np.tun_if.remote_spi = p.scapy_tun_spi
2059 self.config_sa_tun(np)
2060 self.config_protect(np)
2063 self.verify_tun_44(np, count=127)
2064 c = p.tun_if.get_rx_stats()
2065 self.assertEqual(c['packets'], 254)
2066 c = p.tun_if.get_tx_stats()
2067 self.assertEqual(c['packets'], 254)
2070 self.unconfig_protect(np)
2071 self.unconfig_sa(np)
2072 self.unconfig_network(p)
2075 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2076 TemplateIpsec4TunProtect,
2078 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2080 encryption_type = ESP
2081 tun4_encrypt_node_name = "esp4-encrypt-tun"
2082 tun4_decrypt_node_name = "esp4-decrypt-tun"
2085 super(TestIpsec4TunProtectTunDrop, self).setUp()
2087 self.tun_if = self.pg0
2090 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2092 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2094 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2095 sa.encrypt(IP(src=sw_intf.remote_ip4,
2097 IP(src=src, dst=dst) /
2098 UDP(sport=1144, dport=2233) /
2099 Raw(b'X' * payload_size))
2100 for i in range(count)]
2102 def test_tun_drop_44(self):
2103 """IPSEC tunnel protect bogus tunnel header """
2105 p = self.ipv4_params
2107 self.config_network(p)
2108 self.config_sa_tun(p)
2109 self.config_protect(p)
2111 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2112 src=p.remote_tun_if_host,
2113 dst=self.pg1.remote_ip4,
2115 self.send_and_assert_no_replies(self.tun_if, tx)
2118 self.unconfig_protect(p)
2120 self.unconfig_network(p)
2123 class TemplateIpsec6TunProtect(object):
2124 """ IPsec IPv6 Tunnel protect """
2126 def config_sa_tra(self, p):
2127 config_tun_params(p, self.encryption_type, p.tun_if)
2129 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2130 p.auth_algo_vpp_id, p.auth_key,
2131 p.crypt_algo_vpp_id, p.crypt_key,
2132 self.vpp_esp_protocol)
2133 p.tun_sa_out.add_vpp_config()
2135 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2136 p.auth_algo_vpp_id, p.auth_key,
2137 p.crypt_algo_vpp_id, p.crypt_key,
2138 self.vpp_esp_protocol)
2139 p.tun_sa_in.add_vpp_config()
2141 def config_sa_tun(self, p):
2142 config_tun_params(p, self.encryption_type, p.tun_if)
2144 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2145 p.auth_algo_vpp_id, p.auth_key,
2146 p.crypt_algo_vpp_id, p.crypt_key,
2147 self.vpp_esp_protocol,
2148 self.tun_if.local_addr[p.addr_type],
2149 self.tun_if.remote_addr[p.addr_type])
2150 p.tun_sa_out.add_vpp_config()
2152 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2153 p.auth_algo_vpp_id, p.auth_key,
2154 p.crypt_algo_vpp_id, p.crypt_key,
2155 self.vpp_esp_protocol,
2156 self.tun_if.remote_addr[p.addr_type],
2157 self.tun_if.local_addr[p.addr_type])
2158 p.tun_sa_in.add_vpp_config()
2160 def config_protect(self, p):
2161 p.tun_protect = VppIpsecTunProtect(self,
2165 p.tun_protect.add_vpp_config()
2167 def config_network(self, p):
2168 p.tun_if = VppIpIpTunInterface(self, self.pg0,
2170 self.pg0.remote_ip6)
2171 p.tun_if.add_vpp_config()
2173 p.tun_if.config_ip6()
2174 p.tun_if.config_ip4()
2176 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2177 [VppRoutePath(p.tun_if.remote_ip6,
2179 proto=DpoProto.DPO_PROTO_IP6)])
2180 p.route.add_vpp_config()
2181 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2182 [VppRoutePath(p.tun_if.remote_ip4,
2186 def unconfig_network(self, p):
2187 p.route.remove_vpp_config()
2188 p.tun_if.remove_vpp_config()
2190 def unconfig_protect(self, p):
2191 p.tun_protect.remove_vpp_config()
2193 def unconfig_sa(self, p):
2194 p.tun_sa_out.remove_vpp_config()
2195 p.tun_sa_in.remove_vpp_config()
2198 class TestIpsec6TunProtect(TemplateIpsec,
2199 TemplateIpsec6TunProtect,
2201 """ IPsec IPv6 Tunnel protect - transport mode"""
2203 encryption_type = ESP
2204 tun6_encrypt_node_name = "esp6-encrypt-tun"
2205 tun6_decrypt_node_name = "esp6-decrypt-tun"
2208 super(TestIpsec6TunProtect, self).setUp()
2210 self.tun_if = self.pg0
2213 super(TestIpsec6TunProtect, self).tearDown()
2215 def test_tun_66(self):
2216 """IPSEC tunnel protect 6o6"""
2218 p = self.ipv6_params
2220 self.config_network(p)
2221 self.config_sa_tra(p)
2222 self.config_protect(p)
2224 self.verify_tun_66(p, count=127)
2225 c = p.tun_if.get_rx_stats()
2226 self.assertEqual(c['packets'], 127)
2227 c = p.tun_if.get_tx_stats()
2228 self.assertEqual(c['packets'], 127)
2230 # rekey - create new SAs and update the tunnel protection
2232 np.crypt_key = b'X' + p.crypt_key[1:]
2233 np.scapy_tun_spi += 100
2234 np.scapy_tun_sa_id += 1
2235 np.vpp_tun_spi += 100
2236 np.vpp_tun_sa_id += 1
2237 np.tun_if.local_spi = p.vpp_tun_spi
2238 np.tun_if.remote_spi = p.scapy_tun_spi
2240 self.config_sa_tra(np)
2241 self.config_protect(np)
2244 self.verify_tun_66(np, count=127)
2245 c = p.tun_if.get_rx_stats()
2246 self.assertEqual(c['packets'], 254)
2247 c = p.tun_if.get_tx_stats()
2248 self.assertEqual(c['packets'], 254)
2250 # bounce the interface state
2251 p.tun_if.admin_down()
2252 self.verify_drop_tun_66(np, count=127)
2253 node = ('/err/ipsec6-tun-input/%s' %
2254 'ipsec packets received on disabled interface')
2255 self.assertEqual(127, self.statistics.get_err_counter(node))
2257 self.verify_tun_66(np, count=127)
2260 # 1) add two input SAs [old, new]
2261 # 2) swap output SA to [new]
2262 # 3) use only [new] input SA
2264 np3.crypt_key = b'Z' + p.crypt_key[1:]
2265 np3.scapy_tun_spi += 100
2266 np3.scapy_tun_sa_id += 1
2267 np3.vpp_tun_spi += 100
2268 np3.vpp_tun_sa_id += 1
2269 np3.tun_if.local_spi = p.vpp_tun_spi
2270 np3.tun_if.remote_spi = p.scapy_tun_spi
2272 self.config_sa_tra(np3)
2275 p.tun_protect.update_vpp_config(np.tun_sa_out,
2276 [np.tun_sa_in, np3.tun_sa_in])
2277 self.verify_tun_66(np, np, count=127)
2278 self.verify_tun_66(np3, np, count=127)
2281 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2282 [np.tun_sa_in, np3.tun_sa_in])
2283 self.verify_tun_66(np, np3, count=127)
2284 self.verify_tun_66(np3, np3, count=127)
2287 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2289 self.verify_tun_66(np3, np3, count=127)
2290 self.verify_drop_tun_66(np, count=127)
2292 c = p.tun_if.get_rx_stats()
2293 self.assertEqual(c['packets'], 127*9)
2294 c = p.tun_if.get_tx_stats()
2295 self.assertEqual(c['packets'], 127*8)
2296 self.unconfig_sa(np)
2299 self.unconfig_protect(np3)
2300 self.unconfig_sa(np3)
2301 self.unconfig_network(p)
2303 def test_tun_46(self):
2304 """IPSEC tunnel protect 4o6"""
2306 p = self.ipv6_params
2308 self.config_network(p)
2309 self.config_sa_tra(p)
2310 self.config_protect(p)
2312 self.verify_tun_46(p, count=127)
2313 c = p.tun_if.get_rx_stats()
2314 self.assertEqual(c['packets'], 127)
2315 c = p.tun_if.get_tx_stats()
2316 self.assertEqual(c['packets'], 127)
2319 self.unconfig_protect(p)
2321 self.unconfig_network(p)
2324 class TestIpsec6TunProtectTun(TemplateIpsec,
2325 TemplateIpsec6TunProtect,
2327 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2329 encryption_type = ESP
2330 tun6_encrypt_node_name = "esp6-encrypt-tun"
2331 tun6_decrypt_node_name = "esp6-decrypt-tun"
2334 super(TestIpsec6TunProtectTun, self).setUp()
2336 self.tun_if = self.pg0
2339 super(TestIpsec6TunProtectTun, self).tearDown()
2341 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2343 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2344 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2345 dst=sw_intf.local_ip6) /
2346 IPv6(src=src, dst=dst) /
2347 UDP(sport=1166, dport=2233) /
2348 Raw(b'X' * payload_size))
2349 for i in range(count)]
2351 def gen_pkts6(self, sw_intf, src, dst, count=1,
2353 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2354 IPv6(src=src, dst=dst) /
2355 UDP(sport=1166, dport=2233) /
2356 Raw(b'X' * payload_size)
2357 for i in range(count)]
2359 def verify_decrypted6(self, p, rxs):
2361 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2362 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2363 self.assert_packet_checksums_valid(rx)
2365 def verify_encrypted6(self, p, sa, rxs):
2368 pkt = sa.decrypt(rx[IPv6])
2369 if not pkt.haslayer(IPv6):
2370 pkt = IPv6(pkt[Raw].load)
2371 self.assert_packet_checksums_valid(pkt)
2372 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2373 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2374 inner = pkt[IPv6].payload
2375 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2377 except (IndexError, AssertionError):
2378 self.logger.debug(ppp("Unexpected packet:", rx))
2380 self.logger.debug(ppp("Decrypted packet:", pkt))
2385 def test_tun_66(self):
2386 """IPSEC tunnel protect """
2388 p = self.ipv6_params
2390 self.config_network(p)
2391 self.config_sa_tun(p)
2392 self.config_protect(p)
2394 self.verify_tun_66(p, count=127)
2396 c = p.tun_if.get_rx_stats()
2397 self.assertEqual(c['packets'], 127)
2398 c = p.tun_if.get_tx_stats()
2399 self.assertEqual(c['packets'], 127)
2401 # rekey - create new SAs and update the tunnel protection
2403 np.crypt_key = b'X' + p.crypt_key[1:]
2404 np.scapy_tun_spi += 100
2405 np.scapy_tun_sa_id += 1
2406 np.vpp_tun_spi += 100
2407 np.vpp_tun_sa_id += 1
2408 np.tun_if.local_spi = p.vpp_tun_spi
2409 np.tun_if.remote_spi = p.scapy_tun_spi
2411 self.config_sa_tun(np)
2412 self.config_protect(np)
2415 self.verify_tun_66(np, count=127)
2416 c = p.tun_if.get_rx_stats()
2417 self.assertEqual(c['packets'], 254)
2418 c = p.tun_if.get_tx_stats()
2419 self.assertEqual(c['packets'], 254)
2422 self.unconfig_protect(np)
2423 self.unconfig_sa(np)
2424 self.unconfig_network(p)
2427 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2428 TemplateIpsec6TunProtect,
2430 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2432 encryption_type = ESP
2433 tun6_encrypt_node_name = "esp6-encrypt-tun"
2434 tun6_decrypt_node_name = "esp6-decrypt-tun"
2437 super(TestIpsec6TunProtectTunDrop, self).setUp()
2439 self.tun_if = self.pg0
2442 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2444 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2446 # the IP destination of the revelaed packet does not match
2447 # that assigned to the tunnel
2448 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2449 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2451 IPv6(src=src, dst=dst) /
2452 UDP(sport=1144, dport=2233) /
2453 Raw(b'X' * payload_size))
2454 for i in range(count)]
2456 def test_tun_drop_66(self):
2457 """IPSEC 6 tunnel protect bogus tunnel header """
2459 p = self.ipv6_params
2461 self.config_network(p)
2462 self.config_sa_tun(p)
2463 self.config_protect(p)
2465 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2466 src=p.remote_tun_if_host,
2467 dst=self.pg1.remote_ip6,
2469 self.send_and_assert_no_replies(self.tun_if, tx)
2471 self.unconfig_protect(p)
2473 self.unconfig_network(p)
2476 if __name__ == '__main__':
2477 unittest.main(testRunner=VppTestRunner)