5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
12 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
13 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
14 from vpp_ipsec_tun_interface import VppIpsecTunInterface
15 from vpp_gre_interface import VppGreInterface
16 from vpp_ipip_tun_interface import VppIpIpTunInterface
17 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
18 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
19 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
20 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
21 from vpp_teib import VppTeib
23 from vpp_papi import VppEnum
26 def config_tun_params(p, encryption_type, tun_if):
27 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
28 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
29 IPSEC_API_SAD_FLAG_USE_ESN))
30 crypt_key = mk_scapy_crypt_key(p)
31 p.tun_dst = tun_if.remote_ip
32 p.tun_src = tun_if.local_ip
33 p.scapy_tun_sa = SecurityAssociation(
34 encryption_type, spi=p.vpp_tun_spi,
35 crypt_algo=p.crypt_algo,
37 auth_algo=p.auth_algo, auth_key=p.auth_key,
38 tunnel_header=ip_class_by_addr_type[p.addr_type](
41 nat_t_header=p.nat_header,
43 p.vpp_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.scapy_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
55 def config_tra_params(p, encryption_type, tun_if):
56 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
57 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
58 IPSEC_API_SAD_FLAG_USE_ESN))
59 crypt_key = mk_scapy_crypt_key(p)
60 p.tun_dst = tun_if.remote_ip
61 p.tun_src = tun_if.local_ip
62 p.scapy_tun_sa = SecurityAssociation(
63 encryption_type, spi=p.vpp_tun_spi,
64 crypt_algo=p.crypt_algo,
66 auth_algo=p.auth_algo, auth_key=p.auth_key,
68 nat_t_header=p.nat_header)
69 p.vpp_tun_sa = SecurityAssociation(
70 encryption_type, spi=p.scapy_tun_spi,
71 crypt_algo=p.crypt_algo,
73 auth_algo=p.auth_algo, auth_key=p.auth_key,
75 nat_t_header=p.nat_header)
78 class TemplateIpsec4TunIfEsp(TemplateIpsec):
79 """ IPsec tunnel interface tests """
85 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
88 def tearDownClass(cls):
89 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
92 super(TemplateIpsec4TunIfEsp, self).setUp()
94 self.tun_if = self.pg0
98 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
99 p.scapy_tun_spi, p.crypt_algo_vpp_id,
100 p.crypt_key, p.crypt_key,
101 p.auth_algo_vpp_id, p.auth_key,
103 p.tun_if.add_vpp_config()
105 p.tun_if.config_ip4()
106 p.tun_if.config_ip6()
107 config_tun_params(p, self.encryption_type, p.tun_if)
109 r = VppIpRoute(self, p.remote_tun_if_host, 32,
110 [VppRoutePath(p.tun_if.remote_ip4,
113 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
114 [VppRoutePath(p.tun_if.remote_ip6,
116 proto=DpoProto.DPO_PROTO_IP6)])
120 super(TemplateIpsec4TunIfEsp, self).tearDown()
123 class TemplateIpsec4TunIfEspUdp(TemplateIpsec):
124 """ IPsec UDP tunnel interface tests """
126 tun4_encrypt_node_name = "esp4-encrypt-tun"
127 tun4_decrypt_node_name = "esp4-decrypt-tun"
128 encryption_type = ESP
132 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
135 def tearDownClass(cls):
136 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
138 def verify_encrypted(self, p, sa, rxs):
141 # ensure the UDP ports are correct before we decrypt
143 self.assertTrue(rx.haslayer(UDP))
144 self.assert_equal(rx[UDP].sport, 4500)
145 self.assert_equal(rx[UDP].dport, 4500)
147 pkt = sa.decrypt(rx[IP])
148 if not pkt.haslayer(IP):
149 pkt = IP(pkt[Raw].load)
151 self.assert_packet_checksums_valid(pkt)
152 self.assert_equal(pkt[IP].dst, "1.1.1.1")
153 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
154 except (IndexError, AssertionError):
155 self.logger.debug(ppp("Unexpected packet:", rx))
157 self.logger.debug(ppp("Decrypted packet:", pkt))
163 super(TemplateIpsec4TunIfEspUdp, self).setUp()
166 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
167 IPSEC_API_SAD_FLAG_UDP_ENCAP)
168 p.nat_header = UDP(sport=5454, dport=4500)
170 def config_network(self):
172 self.tun_if = self.pg0
174 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
175 p.scapy_tun_spi, p.crypt_algo_vpp_id,
176 p.crypt_key, p.crypt_key,
177 p.auth_algo_vpp_id, p.auth_key,
178 p.auth_key, udp_encap=True)
179 p.tun_if.add_vpp_config()
181 p.tun_if.config_ip4()
182 p.tun_if.config_ip6()
183 config_tun_params(p, self.encryption_type, p.tun_if)
185 r = VppIpRoute(self, p.remote_tun_if_host, 32,
186 [VppRoutePath(p.tun_if.remote_ip4,
189 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
190 [VppRoutePath(p.tun_if.remote_ip6,
192 proto=DpoProto.DPO_PROTO_IP6)])
196 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
199 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
200 """ Ipsec ESP - TUN tests """
201 tun4_encrypt_node_name = "esp4-encrypt-tun"
202 tun4_decrypt_node_name = "esp4-decrypt-tun"
204 def test_tun_basic64(self):
205 """ ipsec 6o4 tunnel basic test """
206 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
208 self.verify_tun_64(self.params[socket.AF_INET], count=1)
210 def test_tun_burst64(self):
211 """ ipsec 6o4 tunnel basic test """
212 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
214 self.verify_tun_64(self.params[socket.AF_INET], count=257)
216 def test_tun_basic_frag44(self):
217 """ ipsec 4o4 tunnel frag basic test """
218 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
222 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
224 self.verify_tun_44(self.params[socket.AF_INET],
225 count=1, payload_size=1800, n_rx=2)
226 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
230 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
231 """ Ipsec ESP UDP tests """
233 tun4_input_node = "ipsec4-tun-input"
236 super(TemplateIpsec4TunIfEspUdp, self).setUp()
237 self.config_network()
239 def test_keepalive(self):
240 """ IPSEC NAT Keepalive """
241 self.verify_keepalive(self.ipv4_params)
244 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
245 """ Ipsec ESP UDP GCM tests """
247 tun4_input_node = "ipsec4-tun-input"
250 super(TemplateIpsec4TunIfEspUdp, self).setUp()
252 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
253 IPSEC_API_INTEG_ALG_NONE)
254 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
255 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
256 p.crypt_algo = "AES-GCM"
258 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
260 self.config_network()
263 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
264 """ Ipsec ESP - TCP tests """
268 class TemplateIpsec6TunIfEsp(TemplateIpsec):
269 """ IPsec tunnel interface tests """
271 encryption_type = ESP
274 super(TemplateIpsec6TunIfEsp, self).setUp()
276 self.tun_if = self.pg0
279 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
280 p.scapy_tun_spi, p.crypt_algo_vpp_id,
281 p.crypt_key, p.crypt_key,
282 p.auth_algo_vpp_id, p.auth_key,
283 p.auth_key, is_ip6=True)
284 p.tun_if.add_vpp_config()
286 p.tun_if.config_ip6()
287 p.tun_if.config_ip4()
288 config_tun_params(p, self.encryption_type, p.tun_if)
290 r = VppIpRoute(self, p.remote_tun_if_host, 128,
291 [VppRoutePath(p.tun_if.remote_ip6,
293 proto=DpoProto.DPO_PROTO_IP6)])
295 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
296 [VppRoutePath(p.tun_if.remote_ip4,
301 super(TemplateIpsec6TunIfEsp, self).tearDown()
304 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
306 """ Ipsec ESP - TUN tests """
307 tun6_encrypt_node_name = "esp6-encrypt-tun"
308 tun6_decrypt_node_name = "esp6-decrypt-tun"
310 def test_tun_basic46(self):
311 """ ipsec 4o6 tunnel basic test """
312 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
313 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
315 def test_tun_burst46(self):
316 """ ipsec 4o6 tunnel burst test """
317 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
318 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
321 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
322 IpsecTun6HandoffTests):
323 """ Ipsec ESP 6 Handoff tests """
324 tun6_encrypt_node_name = "esp6-encrypt-tun"
325 tun6_decrypt_node_name = "esp6-decrypt-tun"
328 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
329 IpsecTun4HandoffTests):
330 """ Ipsec ESP 4 Handoff tests """
331 tun4_encrypt_node_name = "esp4-encrypt-tun"
332 tun4_decrypt_node_name = "esp4-decrypt-tun"
335 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
336 """ IPsec IPv4 Multi Tunnel interface """
338 encryption_type = ESP
339 tun4_encrypt_node_name = "esp4-encrypt-tun"
340 tun4_decrypt_node_name = "esp4-decrypt-tun"
343 super(TestIpsec4MultiTunIfEsp, self).setUp()
345 self.tun_if = self.pg0
347 self.multi_params = []
348 self.pg0.generate_remote_hosts(10)
349 self.pg0.configure_ipv4_neighbors()
352 p = copy.copy(self.ipv4_params)
354 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
355 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
356 p.scapy_tun_spi = p.scapy_tun_spi + ii
357 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
358 p.vpp_tun_spi = p.vpp_tun_spi + ii
360 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
361 p.scapy_tra_spi = p.scapy_tra_spi + ii
362 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
363 p.vpp_tra_spi = p.vpp_tra_spi + ii
364 p.tun_dst = self.pg0.remote_hosts[ii].ip4
366 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
369 p.crypt_key, p.crypt_key,
370 p.auth_algo_vpp_id, p.auth_key,
373 p.tun_if.add_vpp_config()
375 p.tun_if.config_ip4()
376 config_tun_params(p, self.encryption_type, p.tun_if)
377 self.multi_params.append(p)
379 VppIpRoute(self, p.remote_tun_if_host, 32,
380 [VppRoutePath(p.tun_if.remote_ip4,
381 0xffffffff)]).add_vpp_config()
384 super(TestIpsec4MultiTunIfEsp, self).tearDown()
386 def test_tun_44(self):
387 """Multiple IPSEC tunnel interfaces """
388 for p in self.multi_params:
389 self.verify_tun_44(p, count=127)
390 c = p.tun_if.get_rx_stats()
391 self.assertEqual(c['packets'], 127)
392 c = p.tun_if.get_tx_stats()
393 self.assertEqual(c['packets'], 127)
395 def test_tun_rr_44(self):
396 """ Round-robin packets acrros multiple interface """
398 for p in self.multi_params:
399 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
400 src=p.remote_tun_if_host,
401 dst=self.pg1.remote_ip4)
402 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
404 for rx, p in zip(rxs, self.multi_params):
405 self.verify_decrypted(p, [rx])
408 for p in self.multi_params:
409 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
410 dst=p.remote_tun_if_host)
411 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
413 for rx, p in zip(rxs, self.multi_params):
414 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
417 class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4):
418 """ IPsec IPv4 Tunnel interface all Algos """
420 encryption_type = ESP
421 tun4_encrypt_node_name = "esp4-encrypt-tun"
422 tun4_decrypt_node_name = "esp4-decrypt-tun"
424 def config_network(self, p):
426 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
429 p.crypt_key, p.crypt_key,
430 p.auth_algo_vpp_id, p.auth_key,
433 p.tun_if.add_vpp_config()
435 p.tun_if.config_ip4()
436 config_tun_params(p, self.encryption_type, p.tun_if)
437 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
438 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
440 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
441 [VppRoutePath(p.tun_if.remote_ip4,
443 p.route.add_vpp_config()
445 def unconfig_network(self, p):
446 p.tun_if.unconfig_ip4()
447 p.tun_if.remove_vpp_config()
448 p.route.remove_vpp_config()
451 super(TestIpsec4TunIfEspAll, self).setUp()
453 self.tun_if = self.pg0
456 super(TestIpsec4TunIfEspAll, self).tearDown()
460 # change the key and the SPI
462 p.crypt_key = b'X' + p.crypt_key[1:]
464 p.scapy_tun_sa_id += 1
467 p.tun_if.local_spi = p.vpp_tun_spi
468 p.tun_if.remote_spi = p.scapy_tun_spi
470 config_tun_params(p, self.encryption_type, p.tun_if)
472 p.tun_sa_in = VppIpsecSA(self,
479 self.vpp_esp_protocol,
482 p.tun_sa_out = VppIpsecSA(self,
489 self.vpp_esp_protocol,
492 p.tun_sa_in.add_vpp_config()
493 p.tun_sa_out.add_vpp_config()
495 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
496 sa_id=p.tun_sa_in.id,
498 self.vapi.ipsec_tunnel_if_set_sa(sw_if_index=p.tun_if.sw_if_index,
499 sa_id=p.tun_sa_out.id,
501 self.logger.info(self.vapi.cli("sh ipsec sa"))
503 def test_tun_44(self):
504 """IPSEC tunnel all algos """
506 # foreach VPP crypto engine
507 engines = ["ia32", "ipsecmb", "openssl"]
509 # foreach crypto algorithm
510 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
511 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
512 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
513 IPSEC_API_INTEG_ALG_NONE),
514 'scapy-crypto': "AES-GCM",
515 'scapy-integ': "NULL",
516 'key': b"JPjyOWBeVEQiMe7h",
518 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
519 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
520 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
521 IPSEC_API_INTEG_ALG_NONE),
522 'scapy-crypto': "AES-GCM",
523 'scapy-integ': "NULL",
524 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
526 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
527 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
528 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
529 IPSEC_API_INTEG_ALG_NONE),
530 'scapy-crypto': "AES-GCM",
531 'scapy-integ': "NULL",
532 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
534 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
535 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
536 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
537 IPSEC_API_INTEG_ALG_SHA1_96),
538 'scapy-crypto': "AES-CBC",
539 'scapy-integ': "HMAC-SHA1-96",
541 'key': b"JPjyOWBeVEQiMe7h"},
542 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
543 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
544 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
545 IPSEC_API_INTEG_ALG_SHA1_96),
546 'scapy-crypto': "AES-CBC",
547 'scapy-integ': "HMAC-SHA1-96",
549 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
550 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
551 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
552 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
553 IPSEC_API_INTEG_ALG_SHA1_96),
554 'scapy-crypto': "AES-CBC",
555 'scapy-integ': "HMAC-SHA1-96",
557 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
558 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
559 IPSEC_API_CRYPTO_ALG_NONE),
560 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
561 IPSEC_API_INTEG_ALG_SHA1_96),
562 'scapy-crypto': "NULL",
563 'scapy-integ': "HMAC-SHA1-96",
565 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
567 for engine in engines:
568 self.vapi.cli("set crypto handler all %s" % engine)
571 # loop through each of the algorithms
574 # with self.subTest(algo=algo['scapy']):
576 p = copy.copy(self.ipv4_params)
577 p.auth_algo_vpp_id = algo['vpp-integ']
578 p.crypt_algo_vpp_id = algo['vpp-crypto']
579 p.crypt_algo = algo['scapy-crypto']
580 p.auth_algo = algo['scapy-integ']
581 p.crypt_key = algo['key']
582 p.salt = algo['salt']
584 self.config_network(p)
586 self.verify_tun_44(p, count=127)
587 c = p.tun_if.get_rx_stats()
588 self.assertEqual(c['packets'], 127)
589 c = p.tun_if.get_tx_stats()
590 self.assertEqual(c['packets'], 127)
596 self.verify_tun_44(p, count=127)
598 self.unconfig_network(p)
599 p.tun_sa_out.remove_vpp_config()
600 p.tun_sa_in.remove_vpp_config()
603 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4):
604 """ IPsec IPv4 Tunnel interface no Algos """
606 encryption_type = ESP
607 tun4_encrypt_node_name = "esp4-encrypt-tun"
608 tun4_decrypt_node_name = "esp4-decrypt-tun"
610 def config_network(self, p):
612 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
613 IPSEC_API_INTEG_ALG_NONE)
617 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
618 IPSEC_API_CRYPTO_ALG_NONE)
619 p.crypt_algo = 'NULL'
622 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
625 p.crypt_key, p.crypt_key,
626 p.auth_algo_vpp_id, p.auth_key,
629 p.tun_if.add_vpp_config()
631 p.tun_if.config_ip4()
632 config_tun_params(p, self.encryption_type, p.tun_if)
633 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
634 self.logger.info(self.vapi.cli("sh ipsec sa 1"))
636 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
637 [VppRoutePath(p.tun_if.remote_ip4,
639 p.route.add_vpp_config()
641 def unconfig_network(self, p):
642 p.tun_if.unconfig_ip4()
643 p.tun_if.remove_vpp_config()
644 p.route.remove_vpp_config()
647 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
649 self.tun_if = self.pg0
652 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
654 def test_tun_44(self):
655 """ IPSec SA with NULL algos """
658 self.config_network(p)
660 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
661 dst=p.remote_tun_if_host)
662 self.send_and_assert_no_replies(self.pg1, tx)
664 self.unconfig_network(p)
667 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
668 """ IPsec IPv6 Multi Tunnel interface """
670 encryption_type = ESP
671 tun6_encrypt_node_name = "esp6-encrypt-tun"
672 tun6_decrypt_node_name = "esp6-decrypt-tun"
675 super(TestIpsec6MultiTunIfEsp, self).setUp()
677 self.tun_if = self.pg0
679 self.multi_params = []
680 self.pg0.generate_remote_hosts(10)
681 self.pg0.configure_ipv6_neighbors()
684 p = copy.copy(self.ipv6_params)
686 p.remote_tun_if_host = "1111::%d" % (ii + 1)
687 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
688 p.scapy_tun_spi = p.scapy_tun_spi + ii
689 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
690 p.vpp_tun_spi = p.vpp_tun_spi + ii
692 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
693 p.scapy_tra_spi = p.scapy_tra_spi + ii
694 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
695 p.vpp_tra_spi = p.vpp_tra_spi + ii
697 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
700 p.crypt_key, p.crypt_key,
701 p.auth_algo_vpp_id, p.auth_key,
702 p.auth_key, is_ip6=True,
703 dst=self.pg0.remote_hosts[ii].ip6)
704 p.tun_if.add_vpp_config()
706 p.tun_if.config_ip6()
707 config_tun_params(p, self.encryption_type, p.tun_if)
708 self.multi_params.append(p)
710 r = VppIpRoute(self, p.remote_tun_if_host, 128,
711 [VppRoutePath(p.tun_if.remote_ip6,
713 proto=DpoProto.DPO_PROTO_IP6)])
717 super(TestIpsec6MultiTunIfEsp, self).tearDown()
719 def test_tun_66(self):
720 """Multiple IPSEC tunnel interfaces """
721 for p in self.multi_params:
722 self.verify_tun_66(p, count=127)
723 c = p.tun_if.get_rx_stats()
724 self.assertEqual(c['packets'], 127)
725 c = p.tun_if.get_tx_stats()
726 self.assertEqual(c['packets'], 127)
729 class TestIpsecGreTebIfEsp(TemplateIpsec,
731 """ Ipsec GRE TEB ESP - TUN tests """
732 tun4_encrypt_node_name = "esp4-encrypt-tun"
733 tun4_decrypt_node_name = "esp4-decrypt-tun"
734 encryption_type = ESP
735 omac = "00:11:22:33:44:55"
737 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
739 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
740 sa.encrypt(IP(src=self.pg0.remote_ip4,
741 dst=self.pg0.local_ip4) /
743 Ether(dst=self.omac) /
744 IP(src="1.1.1.1", dst="1.1.1.2") /
745 UDP(sport=1144, dport=2233) /
746 Raw(b'X' * payload_size))
747 for i in range(count)]
749 def gen_pkts(self, sw_intf, src, dst, count=1,
751 return [Ether(dst=self.omac) /
752 IP(src="1.1.1.1", dst="1.1.1.2") /
753 UDP(sport=1144, dport=2233) /
754 Raw(b'X' * payload_size)
755 for i in range(count)]
757 def verify_decrypted(self, p, rxs):
759 self.assert_equal(rx[Ether].dst, self.omac)
760 self.assert_equal(rx[IP].dst, "1.1.1.2")
762 def verify_encrypted(self, p, sa, rxs):
765 pkt = sa.decrypt(rx[IP])
766 if not pkt.haslayer(IP):
767 pkt = IP(pkt[Raw].load)
768 self.assert_packet_checksums_valid(pkt)
769 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
770 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
771 self.assertTrue(pkt.haslayer(GRE))
773 self.assertEqual(e[Ether].dst, self.omac)
774 self.assertEqual(e[IP].dst, "1.1.1.2")
775 except (IndexError, AssertionError):
776 self.logger.debug(ppp("Unexpected packet:", rx))
778 self.logger.debug(ppp("Decrypted packet:", pkt))
784 super(TestIpsecGreTebIfEsp, self).setUp()
786 self.tun_if = self.pg0
790 bd1 = VppBridgeDomain(self, 1)
793 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
794 p.auth_algo_vpp_id, p.auth_key,
795 p.crypt_algo_vpp_id, p.crypt_key,
796 self.vpp_esp_protocol,
799 p.tun_sa_out.add_vpp_config()
801 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
802 p.auth_algo_vpp_id, p.auth_key,
803 p.crypt_algo_vpp_id, p.crypt_key,
804 self.vpp_esp_protocol,
807 p.tun_sa_in.add_vpp_config()
809 p.tun_if = VppGreInterface(self,
812 type=(VppEnum.vl_api_gre_tunnel_type_t.
813 GRE_API_TUNNEL_TYPE_TEB))
814 p.tun_if.add_vpp_config()
816 p.tun_protect = VppIpsecTunProtect(self,
821 p.tun_protect.add_vpp_config()
824 p.tun_if.config_ip4()
825 config_tun_params(p, self.encryption_type, p.tun_if)
827 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
828 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
830 self.vapi.cli("clear ipsec sa")
831 self.vapi.cli("sh adj")
832 self.vapi.cli("sh ipsec tun")
836 p.tun_if.unconfig_ip4()
837 super(TestIpsecGreTebIfEsp, self).tearDown()
840 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
842 """ Ipsec GRE TEB ESP - TUN tests """
843 tun4_encrypt_node_name = "esp4-encrypt-tun"
844 tun4_decrypt_node_name = "esp4-decrypt-tun"
845 encryption_type = ESP
846 omac = "00:11:22:33:44:55"
848 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
850 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
851 sa.encrypt(IP(src=self.pg0.remote_ip4,
852 dst=self.pg0.local_ip4) /
854 Ether(dst=self.omac) /
855 IP(src="1.1.1.1", dst="1.1.1.2") /
856 UDP(sport=1144, dport=2233) /
857 Raw(b'X' * payload_size))
858 for i in range(count)]
860 def gen_pkts(self, sw_intf, src, dst, count=1,
862 return [Ether(dst=self.omac) /
864 IP(src="1.1.1.1", dst="1.1.1.2") /
865 UDP(sport=1144, dport=2233) /
866 Raw(b'X' * payload_size)
867 for i in range(count)]
869 def verify_decrypted(self, p, rxs):
871 self.assert_equal(rx[Ether].dst, self.omac)
872 self.assert_equal(rx[Dot1Q].vlan, 11)
873 self.assert_equal(rx[IP].dst, "1.1.1.2")
875 def verify_encrypted(self, p, sa, rxs):
878 pkt = sa.decrypt(rx[IP])
879 if not pkt.haslayer(IP):
880 pkt = IP(pkt[Raw].load)
881 self.assert_packet_checksums_valid(pkt)
882 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
883 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
884 self.assertTrue(pkt.haslayer(GRE))
886 self.assertEqual(e[Ether].dst, self.omac)
887 self.assertFalse(e.haslayer(Dot1Q))
888 self.assertEqual(e[IP].dst, "1.1.1.2")
889 except (IndexError, AssertionError):
890 self.logger.debug(ppp("Unexpected packet:", rx))
892 self.logger.debug(ppp("Decrypted packet:", pkt))
898 super(TestIpsecGreTebVlanIfEsp, self).setUp()
900 self.tun_if = self.pg0
904 bd1 = VppBridgeDomain(self, 1)
907 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
908 self.vapi.l2_interface_vlan_tag_rewrite(
909 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
911 self.pg1_11.admin_up()
913 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
914 p.auth_algo_vpp_id, p.auth_key,
915 p.crypt_algo_vpp_id, p.crypt_key,
916 self.vpp_esp_protocol,
919 p.tun_sa_out.add_vpp_config()
921 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
922 p.auth_algo_vpp_id, p.auth_key,
923 p.crypt_algo_vpp_id, p.crypt_key,
924 self.vpp_esp_protocol,
927 p.tun_sa_in.add_vpp_config()
929 p.tun_if = VppGreInterface(self,
932 type=(VppEnum.vl_api_gre_tunnel_type_t.
933 GRE_API_TUNNEL_TYPE_TEB))
934 p.tun_if.add_vpp_config()
936 p.tun_protect = VppIpsecTunProtect(self,
941 p.tun_protect.add_vpp_config()
944 p.tun_if.config_ip4()
945 config_tun_params(p, self.encryption_type, p.tun_if)
947 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
948 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
950 self.vapi.cli("clear ipsec sa")
954 p.tun_if.unconfig_ip4()
955 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
956 self.pg1_11.admin_down()
957 self.pg1_11.remove_vpp_config()
960 class TestIpsecGreTebIfEspTra(TemplateIpsec,
962 """ Ipsec GRE TEB ESP - Tra tests """
963 tun4_encrypt_node_name = "esp4-encrypt-tun"
964 tun4_decrypt_node_name = "esp4-decrypt-tun"
965 encryption_type = ESP
966 omac = "00:11:22:33:44:55"
968 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
970 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
971 sa.encrypt(IP(src=self.pg0.remote_ip4,
972 dst=self.pg0.local_ip4) /
974 Ether(dst=self.omac) /
975 IP(src="1.1.1.1", dst="1.1.1.2") /
976 UDP(sport=1144, dport=2233) /
977 Raw(b'X' * payload_size))
978 for i in range(count)]
980 def gen_pkts(self, sw_intf, src, dst, count=1,
982 return [Ether(dst=self.omac) /
983 IP(src="1.1.1.1", dst="1.1.1.2") /
984 UDP(sport=1144, dport=2233) /
985 Raw(b'X' * payload_size)
986 for i in range(count)]
988 def verify_decrypted(self, p, rxs):
990 self.assert_equal(rx[Ether].dst, self.omac)
991 self.assert_equal(rx[IP].dst, "1.1.1.2")
993 def verify_encrypted(self, p, sa, rxs):
996 pkt = sa.decrypt(rx[IP])
997 if not pkt.haslayer(IP):
998 pkt = IP(pkt[Raw].load)
999 self.assert_packet_checksums_valid(pkt)
1000 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1001 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1002 self.assertTrue(pkt.haslayer(GRE))
1004 self.assertEqual(e[Ether].dst, self.omac)
1005 self.assertEqual(e[IP].dst, "1.1.1.2")
1006 except (IndexError, AssertionError):
1007 self.logger.debug(ppp("Unexpected packet:", rx))
1009 self.logger.debug(ppp("Decrypted packet:", pkt))
1015 super(TestIpsecGreTebIfEspTra, self).setUp()
1017 self.tun_if = self.pg0
1019 p = self.ipv4_params
1021 bd1 = VppBridgeDomain(self, 1)
1022 bd1.add_vpp_config()
1024 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1025 p.auth_algo_vpp_id, p.auth_key,
1026 p.crypt_algo_vpp_id, p.crypt_key,
1027 self.vpp_esp_protocol)
1028 p.tun_sa_out.add_vpp_config()
1030 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1031 p.auth_algo_vpp_id, p.auth_key,
1032 p.crypt_algo_vpp_id, p.crypt_key,
1033 self.vpp_esp_protocol)
1034 p.tun_sa_in.add_vpp_config()
1036 p.tun_if = VppGreInterface(self,
1038 self.pg0.remote_ip4,
1039 type=(VppEnum.vl_api_gre_tunnel_type_t.
1040 GRE_API_TUNNEL_TYPE_TEB))
1041 p.tun_if.add_vpp_config()
1043 p.tun_protect = VppIpsecTunProtect(self,
1048 p.tun_protect.add_vpp_config()
1051 p.tun_if.config_ip4()
1052 config_tra_params(p, self.encryption_type, p.tun_if)
1054 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1055 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1057 self.vapi.cli("clear ipsec sa")
1060 p = self.ipv4_params
1061 p.tun_if.unconfig_ip4()
1062 super(TestIpsecGreTebIfEspTra, self).tearDown()
1065 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1067 """ Ipsec GRE TEB UDP ESP - Tra tests """
1068 tun4_encrypt_node_name = "esp4-encrypt-tun"
1069 tun4_decrypt_node_name = "esp4-decrypt-tun"
1070 encryption_type = ESP
1071 omac = "00:11:22:33:44:55"
1073 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1075 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1076 sa.encrypt(IP(src=self.pg0.remote_ip4,
1077 dst=self.pg0.local_ip4) /
1079 Ether(dst=self.omac) /
1080 IP(src="1.1.1.1", dst="1.1.1.2") /
1081 UDP(sport=1144, dport=2233) /
1082 Raw(b'X' * payload_size))
1083 for i in range(count)]
1085 def gen_pkts(self, sw_intf, src, dst, count=1,
1087 return [Ether(dst=self.omac) /
1088 IP(src="1.1.1.1", dst="1.1.1.2") /
1089 UDP(sport=1144, dport=2233) /
1090 Raw(b'X' * payload_size)
1091 for i in range(count)]
1093 def verify_decrypted(self, p, rxs):
1095 self.assert_equal(rx[Ether].dst, self.omac)
1096 self.assert_equal(rx[IP].dst, "1.1.1.2")
1098 def verify_encrypted(self, p, sa, rxs):
1100 self.assertTrue(rx.haslayer(UDP))
1101 self.assertEqual(rx[UDP].dport, 4545)
1102 self.assertEqual(rx[UDP].sport, 5454)
1104 pkt = sa.decrypt(rx[IP])
1105 if not pkt.haslayer(IP):
1106 pkt = IP(pkt[Raw].load)
1107 self.assert_packet_checksums_valid(pkt)
1108 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1109 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1110 self.assertTrue(pkt.haslayer(GRE))
1112 self.assertEqual(e[Ether].dst, self.omac)
1113 self.assertEqual(e[IP].dst, "1.1.1.2")
1114 except (IndexError, AssertionError):
1115 self.logger.debug(ppp("Unexpected packet:", rx))
1117 self.logger.debug(ppp("Decrypted packet:", pkt))
1123 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1125 self.tun_if = self.pg0
1127 p = self.ipv4_params
1128 p = self.ipv4_params
1129 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1130 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1131 p.nat_header = UDP(sport=5454, dport=4545)
1133 bd1 = VppBridgeDomain(self, 1)
1134 bd1.add_vpp_config()
1136 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1137 p.auth_algo_vpp_id, p.auth_key,
1138 p.crypt_algo_vpp_id, p.crypt_key,
1139 self.vpp_esp_protocol,
1143 p.tun_sa_out.add_vpp_config()
1145 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1146 p.auth_algo_vpp_id, p.auth_key,
1147 p.crypt_algo_vpp_id, p.crypt_key,
1148 self.vpp_esp_protocol,
1150 VppEnum.vl_api_ipsec_sad_flags_t.
1151 IPSEC_API_SAD_FLAG_IS_INBOUND),
1154 p.tun_sa_in.add_vpp_config()
1156 p.tun_if = VppGreInterface(self,
1158 self.pg0.remote_ip4,
1159 type=(VppEnum.vl_api_gre_tunnel_type_t.
1160 GRE_API_TUNNEL_TYPE_TEB))
1161 p.tun_if.add_vpp_config()
1163 p.tun_protect = VppIpsecTunProtect(self,
1168 p.tun_protect.add_vpp_config()
1171 p.tun_if.config_ip4()
1172 config_tra_params(p, self.encryption_type, p.tun_if)
1174 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1175 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1177 self.vapi.cli("clear ipsec sa")
1178 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1181 p = self.ipv4_params
1182 p.tun_if.unconfig_ip4()
1183 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1186 class TestIpsecGreIfEsp(TemplateIpsec,
1188 """ Ipsec GRE ESP - TUN tests """
1189 tun4_encrypt_node_name = "esp4-encrypt-tun"
1190 tun4_decrypt_node_name = "esp4-decrypt-tun"
1191 encryption_type = ESP
1193 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1195 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1196 sa.encrypt(IP(src=self.pg0.remote_ip4,
1197 dst=self.pg0.local_ip4) /
1199 IP(src=self.pg1.local_ip4,
1200 dst=self.pg1.remote_ip4) /
1201 UDP(sport=1144, dport=2233) /
1202 Raw(b'X' * payload_size))
1203 for i in range(count)]
1205 def gen_pkts(self, sw_intf, src, dst, count=1,
1207 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1208 IP(src="1.1.1.1", dst="1.1.1.2") /
1209 UDP(sport=1144, dport=2233) /
1210 Raw(b'X' * payload_size)
1211 for i in range(count)]
1213 def verify_decrypted(self, p, rxs):
1215 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1216 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1218 def verify_encrypted(self, p, sa, rxs):
1221 pkt = sa.decrypt(rx[IP])
1222 if not pkt.haslayer(IP):
1223 pkt = IP(pkt[Raw].load)
1224 self.assert_packet_checksums_valid(pkt)
1225 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1226 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1227 self.assertTrue(pkt.haslayer(GRE))
1229 self.assertEqual(e[IP].dst, "1.1.1.2")
1230 except (IndexError, AssertionError):
1231 self.logger.debug(ppp("Unexpected packet:", rx))
1233 self.logger.debug(ppp("Decrypted packet:", pkt))
1239 super(TestIpsecGreIfEsp, self).setUp()
1241 self.tun_if = self.pg0
1243 p = self.ipv4_params
1245 bd1 = VppBridgeDomain(self, 1)
1246 bd1.add_vpp_config()
1248 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1249 p.auth_algo_vpp_id, p.auth_key,
1250 p.crypt_algo_vpp_id, p.crypt_key,
1251 self.vpp_esp_protocol,
1253 self.pg0.remote_ip4)
1254 p.tun_sa_out.add_vpp_config()
1256 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1257 p.auth_algo_vpp_id, p.auth_key,
1258 p.crypt_algo_vpp_id, p.crypt_key,
1259 self.vpp_esp_protocol,
1260 self.pg0.remote_ip4,
1262 p.tun_sa_in.add_vpp_config()
1264 p.tun_if = VppGreInterface(self,
1266 self.pg0.remote_ip4)
1267 p.tun_if.add_vpp_config()
1269 p.tun_protect = VppIpsecTunProtect(self,
1273 p.tun_protect.add_vpp_config()
1276 p.tun_if.config_ip4()
1277 config_tun_params(p, self.encryption_type, p.tun_if)
1279 VppIpRoute(self, "1.1.1.2", 32,
1280 [VppRoutePath(p.tun_if.remote_ip4,
1281 0xffffffff)]).add_vpp_config()
1284 p = self.ipv4_params
1285 p.tun_if.unconfig_ip4()
1286 super(TestIpsecGreIfEsp, self).tearDown()
1289 class TestIpsecGreIfEspTra(TemplateIpsec,
1291 """ Ipsec GRE ESP - TRA tests """
1292 tun4_encrypt_node_name = "esp4-encrypt-tun"
1293 tun4_decrypt_node_name = "esp4-decrypt-tun"
1294 encryption_type = ESP
1296 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1298 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1299 sa.encrypt(IP(src=self.pg0.remote_ip4,
1300 dst=self.pg0.local_ip4) /
1302 IP(src=self.pg1.local_ip4,
1303 dst=self.pg1.remote_ip4) /
1304 UDP(sport=1144, dport=2233) /
1305 Raw(b'X' * payload_size))
1306 for i in range(count)]
1308 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1310 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1311 sa.encrypt(IP(src=self.pg0.remote_ip4,
1312 dst=self.pg0.local_ip4) /
1314 UDP(sport=1144, dport=2233) /
1315 Raw(b'X' * payload_size))
1316 for i in range(count)]
1318 def gen_pkts(self, sw_intf, src, dst, count=1,
1320 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1321 IP(src="1.1.1.1", dst="1.1.1.2") /
1322 UDP(sport=1144, dport=2233) /
1323 Raw(b'X' * payload_size)
1324 for i in range(count)]
1326 def verify_decrypted(self, p, rxs):
1328 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1329 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1331 def verify_encrypted(self, p, sa, rxs):
1334 pkt = sa.decrypt(rx[IP])
1335 if not pkt.haslayer(IP):
1336 pkt = IP(pkt[Raw].load)
1337 self.assert_packet_checksums_valid(pkt)
1338 self.assertTrue(pkt.haslayer(GRE))
1340 self.assertEqual(e[IP].dst, "1.1.1.2")
1341 except (IndexError, AssertionError):
1342 self.logger.debug(ppp("Unexpected packet:", rx))
1344 self.logger.debug(ppp("Decrypted packet:", pkt))
1350 super(TestIpsecGreIfEspTra, self).setUp()
1352 self.tun_if = self.pg0
1354 p = self.ipv4_params
1356 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1357 p.auth_algo_vpp_id, p.auth_key,
1358 p.crypt_algo_vpp_id, p.crypt_key,
1359 self.vpp_esp_protocol)
1360 p.tun_sa_out.add_vpp_config()
1362 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1363 p.auth_algo_vpp_id, p.auth_key,
1364 p.crypt_algo_vpp_id, p.crypt_key,
1365 self.vpp_esp_protocol)
1366 p.tun_sa_in.add_vpp_config()
1368 p.tun_if = VppGreInterface(self,
1370 self.pg0.remote_ip4)
1371 p.tun_if.add_vpp_config()
1373 p.tun_protect = VppIpsecTunProtect(self,
1377 p.tun_protect.add_vpp_config()
1380 p.tun_if.config_ip4()
1381 config_tra_params(p, self.encryption_type, p.tun_if)
1383 VppIpRoute(self, "1.1.1.2", 32,
1384 [VppRoutePath(p.tun_if.remote_ip4,
1385 0xffffffff)]).add_vpp_config()
1388 p = self.ipv4_params
1389 p.tun_if.unconfig_ip4()
1390 super(TestIpsecGreIfEspTra, self).tearDown()
1392 def test_gre_non_ip(self):
1393 p = self.ipv4_params
1394 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1395 src=p.remote_tun_if_host,
1396 dst=self.pg1.remote_ip6)
1397 self.send_and_assert_no_replies(self.tun_if, tx)
1398 node_name = ('/err/%s/unsupported payload' %
1399 self.tun4_decrypt_node_name)
1400 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1403 class TestIpsecGre6IfEspTra(TemplateIpsec,
1405 """ Ipsec GRE ESP - TRA tests """
1406 tun6_encrypt_node_name = "esp6-encrypt-tun"
1407 tun6_decrypt_node_name = "esp6-decrypt-tun"
1408 encryption_type = ESP
1410 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1412 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1413 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1414 dst=self.pg0.local_ip6) /
1416 IPv6(src=self.pg1.local_ip6,
1417 dst=self.pg1.remote_ip6) /
1418 UDP(sport=1144, dport=2233) /
1419 Raw(b'X' * payload_size))
1420 for i in range(count)]
1422 def gen_pkts6(self, sw_intf, src, dst, count=1,
1424 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1425 IPv6(src="1::1", dst="1::2") /
1426 UDP(sport=1144, dport=2233) /
1427 Raw(b'X' * payload_size)
1428 for i in range(count)]
1430 def verify_decrypted6(self, p, rxs):
1432 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1433 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1435 def verify_encrypted6(self, p, sa, rxs):
1438 pkt = sa.decrypt(rx[IPv6])
1439 if not pkt.haslayer(IPv6):
1440 pkt = IPv6(pkt[Raw].load)
1441 self.assert_packet_checksums_valid(pkt)
1442 self.assertTrue(pkt.haslayer(GRE))
1444 self.assertEqual(e[IPv6].dst, "1::2")
1445 except (IndexError, AssertionError):
1446 self.logger.debug(ppp("Unexpected packet:", rx))
1448 self.logger.debug(ppp("Decrypted packet:", pkt))
1454 super(TestIpsecGre6IfEspTra, self).setUp()
1456 self.tun_if = self.pg0
1458 p = self.ipv6_params
1460 bd1 = VppBridgeDomain(self, 1)
1461 bd1.add_vpp_config()
1463 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1464 p.auth_algo_vpp_id, p.auth_key,
1465 p.crypt_algo_vpp_id, p.crypt_key,
1466 self.vpp_esp_protocol)
1467 p.tun_sa_out.add_vpp_config()
1469 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1470 p.auth_algo_vpp_id, p.auth_key,
1471 p.crypt_algo_vpp_id, p.crypt_key,
1472 self.vpp_esp_protocol)
1473 p.tun_sa_in.add_vpp_config()
1475 p.tun_if = VppGreInterface(self,
1477 self.pg0.remote_ip6)
1478 p.tun_if.add_vpp_config()
1480 p.tun_protect = VppIpsecTunProtect(self,
1484 p.tun_protect.add_vpp_config()
1487 p.tun_if.config_ip6()
1488 config_tra_params(p, self.encryption_type, p.tun_if)
1490 r = VppIpRoute(self, "1::2", 128,
1491 [VppRoutePath(p.tun_if.remote_ip6,
1493 proto=DpoProto.DPO_PROTO_IP6)])
1497 p = self.ipv6_params
1498 p.tun_if.unconfig_ip6()
1499 super(TestIpsecGre6IfEspTra, self).tearDown()
1502 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1503 """ Ipsec mGRE ESP v4 TRA tests """
1504 tun4_encrypt_node_name = "esp4-encrypt-tun"
1505 tun4_decrypt_node_name = "esp4-decrypt-tun"
1506 encryption_type = ESP
1508 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1510 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1511 sa.encrypt(IP(src=p.tun_dst,
1512 dst=self.pg0.local_ip4) /
1514 IP(src=self.pg1.local_ip4,
1515 dst=self.pg1.remote_ip4) /
1516 UDP(sport=1144, dport=2233) /
1517 Raw(b'X' * payload_size))
1518 for i in range(count)]
1520 def gen_pkts(self, sw_intf, src, dst, count=1,
1522 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1523 IP(src="1.1.1.1", dst=dst) /
1524 UDP(sport=1144, dport=2233) /
1525 Raw(b'X' * payload_size)
1526 for i in range(count)]
1528 def verify_decrypted(self, p, rxs):
1530 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1531 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1533 def verify_encrypted(self, p, sa, rxs):
1536 pkt = sa.decrypt(rx[IP])
1537 if not pkt.haslayer(IP):
1538 pkt = IP(pkt[Raw].load)
1539 self.assert_packet_checksums_valid(pkt)
1540 self.assertTrue(pkt.haslayer(GRE))
1542 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1543 except (IndexError, AssertionError):
1544 self.logger.debug(ppp("Unexpected packet:", rx))
1546 self.logger.debug(ppp("Decrypted packet:", pkt))
1552 super(TestIpsecMGreIfEspTra4, self).setUp()
1555 self.tun_if = self.pg0
1556 p = self.ipv4_params
1557 p.tun_if = VppGreInterface(self,
1560 mode=(VppEnum.vl_api_tunnel_mode_t.
1561 TUNNEL_API_MODE_MP))
1562 p.tun_if.add_vpp_config()
1564 p.tun_if.config_ip4()
1565 p.tun_if.generate_remote_hosts(N_NHS)
1566 self.pg0.generate_remote_hosts(N_NHS)
1567 self.pg0.configure_ipv4_neighbors()
1569 # setup some SAs for several next-hops on the interface
1570 self.multi_params = []
1573 p = copy.copy(self.ipv4_params)
1575 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1576 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1577 p.scapy_tun_spi = p.scapy_tun_spi + ii
1578 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1579 p.vpp_tun_spi = p.vpp_tun_spi + ii
1581 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1582 p.scapy_tra_spi = p.scapy_tra_spi + ii
1583 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1584 p.vpp_tra_spi = p.vpp_tra_spi + ii
1585 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1586 p.auth_algo_vpp_id, p.auth_key,
1587 p.crypt_algo_vpp_id, p.crypt_key,
1588 self.vpp_esp_protocol)
1589 p.tun_sa_out.add_vpp_config()
1591 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1592 p.auth_algo_vpp_id, p.auth_key,
1593 p.crypt_algo_vpp_id, p.crypt_key,
1594 self.vpp_esp_protocol)
1595 p.tun_sa_in.add_vpp_config()
1597 p.tun_protect = VppIpsecTunProtect(
1602 nh=p.tun_if.remote_hosts[ii].ip4)
1603 p.tun_protect.add_vpp_config()
1604 config_tra_params(p, self.encryption_type, p.tun_if)
1605 self.multi_params.append(p)
1607 VppIpRoute(self, p.remote_tun_if_host, 32,
1608 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1609 p.tun_if.sw_if_index)]).add_vpp_config()
1611 # in this v4 variant add the teibs after the protect
1612 p.teib = VppTeib(self, p.tun_if,
1613 p.tun_if.remote_hosts[ii].ip4,
1614 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1615 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1616 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1619 p = self.ipv4_params
1620 p.tun_if.unconfig_ip4()
1621 super(TestIpsecMGreIfEspTra4, self).tearDown()
1623 def test_tun_44(self):
1626 for p in self.multi_params:
1627 self.verify_tun_44(p, count=N_PKTS)
1628 p.teib.remove_vpp_config()
1629 self.verify_tun_dropped_44(p, count=N_PKTS)
1630 p.teib.add_vpp_config()
1631 self.verify_tun_44(p, count=N_PKTS)
1634 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1635 """ Ipsec mGRE ESP v6 TRA tests """
1636 tun6_encrypt_node_name = "esp6-encrypt-tun"
1637 tun6_decrypt_node_name = "esp6-decrypt-tun"
1638 encryption_type = ESP
1640 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1642 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1643 sa.encrypt(IPv6(src=p.tun_dst,
1644 dst=self.pg0.local_ip6) /
1646 IPv6(src=self.pg1.local_ip6,
1647 dst=self.pg1.remote_ip6) /
1648 UDP(sport=1144, dport=2233) /
1649 Raw(b'X' * payload_size))
1650 for i in range(count)]
1652 def gen_pkts6(self, sw_intf, src, dst, count=1,
1654 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1655 IPv6(src="1::1", dst=dst) /
1656 UDP(sport=1144, dport=2233) /
1657 Raw(b'X' * payload_size)
1658 for i in range(count)]
1660 def verify_decrypted6(self, p, rxs):
1662 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1663 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1665 def verify_encrypted6(self, p, sa, rxs):
1668 pkt = sa.decrypt(rx[IPv6])
1669 if not pkt.haslayer(IPv6):
1670 pkt = IPv6(pkt[Raw].load)
1671 self.assert_packet_checksums_valid(pkt)
1672 self.assertTrue(pkt.haslayer(GRE))
1674 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1675 except (IndexError, AssertionError):
1676 self.logger.debug(ppp("Unexpected packet:", rx))
1678 self.logger.debug(ppp("Decrypted packet:", pkt))
1684 super(TestIpsecMGreIfEspTra6, self).setUp()
1686 self.vapi.cli("set logging class ipsec level debug")
1689 self.tun_if = self.pg0
1690 p = self.ipv6_params
1691 p.tun_if = VppGreInterface(self,
1694 mode=(VppEnum.vl_api_tunnel_mode_t.
1695 TUNNEL_API_MODE_MP))
1696 p.tun_if.add_vpp_config()
1698 p.tun_if.config_ip6()
1699 p.tun_if.generate_remote_hosts(N_NHS)
1700 self.pg0.generate_remote_hosts(N_NHS)
1701 self.pg0.configure_ipv6_neighbors()
1703 # setup some SAs for several next-hops on the interface
1704 self.multi_params = []
1706 for ii in range(N_NHS):
1707 p = copy.copy(self.ipv6_params)
1709 p.remote_tun_if_host = "1::%d" % (ii + 1)
1710 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1711 p.scapy_tun_spi = p.scapy_tun_spi + ii
1712 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1713 p.vpp_tun_spi = p.vpp_tun_spi + ii
1715 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1716 p.scapy_tra_spi = p.scapy_tra_spi + ii
1717 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1718 p.vpp_tra_spi = p.vpp_tra_spi + ii
1719 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1720 p.auth_algo_vpp_id, p.auth_key,
1721 p.crypt_algo_vpp_id, p.crypt_key,
1722 self.vpp_esp_protocol)
1723 p.tun_sa_out.add_vpp_config()
1725 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1726 p.auth_algo_vpp_id, p.auth_key,
1727 p.crypt_algo_vpp_id, p.crypt_key,
1728 self.vpp_esp_protocol)
1729 p.tun_sa_in.add_vpp_config()
1731 # in this v6 variant add the teibs first then the protection
1732 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1733 VppTeib(self, p.tun_if,
1734 p.tun_if.remote_hosts[ii].ip6,
1735 p.tun_dst).add_vpp_config()
1737 p.tun_protect = VppIpsecTunProtect(
1742 nh=p.tun_if.remote_hosts[ii].ip6)
1743 p.tun_protect.add_vpp_config()
1744 config_tra_params(p, self.encryption_type, p.tun_if)
1745 self.multi_params.append(p)
1747 VppIpRoute(self, p.remote_tun_if_host, 128,
1748 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1749 p.tun_if.sw_if_index)]).add_vpp_config()
1750 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1752 self.logger.info(self.vapi.cli("sh log"))
1753 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1754 self.logger.info(self.vapi.cli("sh adj 41"))
1757 p = self.ipv6_params
1758 p.tun_if.unconfig_ip6()
1759 super(TestIpsecMGreIfEspTra6, self).tearDown()
1761 def test_tun_66(self):
1763 for p in self.multi_params:
1764 self.verify_tun_66(p, count=63)
1767 class TemplateIpsec4TunProtect(object):
1768 """ IPsec IPv4 Tunnel protect """
1770 encryption_type = ESP
1771 tun4_encrypt_node_name = "esp4-encrypt-tun"
1772 tun4_decrypt_node_name = "esp4-decrypt-tun"
1773 tun4_input_node = "ipsec4-tun-input"
1775 def config_sa_tra(self, p):
1776 config_tun_params(p, self.encryption_type, p.tun_if)
1778 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1779 p.auth_algo_vpp_id, p.auth_key,
1780 p.crypt_algo_vpp_id, p.crypt_key,
1781 self.vpp_esp_protocol,
1783 p.tun_sa_out.add_vpp_config()
1785 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1786 p.auth_algo_vpp_id, p.auth_key,
1787 p.crypt_algo_vpp_id, p.crypt_key,
1788 self.vpp_esp_protocol,
1790 p.tun_sa_in.add_vpp_config()
1792 def config_sa_tun(self, p):
1793 config_tun_params(p, self.encryption_type, p.tun_if)
1795 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1796 p.auth_algo_vpp_id, p.auth_key,
1797 p.crypt_algo_vpp_id, p.crypt_key,
1798 self.vpp_esp_protocol,
1799 self.tun_if.local_addr[p.addr_type],
1800 self.tun_if.remote_addr[p.addr_type],
1802 p.tun_sa_out.add_vpp_config()
1804 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1805 p.auth_algo_vpp_id, p.auth_key,
1806 p.crypt_algo_vpp_id, p.crypt_key,
1807 self.vpp_esp_protocol,
1808 self.tun_if.remote_addr[p.addr_type],
1809 self.tun_if.local_addr[p.addr_type],
1811 p.tun_sa_in.add_vpp_config()
1813 def config_protect(self, p):
1814 p.tun_protect = VppIpsecTunProtect(self,
1818 p.tun_protect.add_vpp_config()
1820 def config_network(self, p):
1821 p.tun_if = VppIpIpTunInterface(self, self.pg0,
1823 self.pg0.remote_ip4)
1824 p.tun_if.add_vpp_config()
1826 p.tun_if.config_ip4()
1827 p.tun_if.config_ip6()
1829 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
1830 [VppRoutePath(p.tun_if.remote_ip4,
1832 p.route.add_vpp_config()
1833 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
1834 [VppRoutePath(p.tun_if.remote_ip6,
1836 proto=DpoProto.DPO_PROTO_IP6)])
1839 def unconfig_network(self, p):
1840 p.route.remove_vpp_config()
1841 p.tun_if.remove_vpp_config()
1843 def unconfig_protect(self, p):
1844 p.tun_protect.remove_vpp_config()
1846 def unconfig_sa(self, p):
1847 p.tun_sa_out.remove_vpp_config()
1848 p.tun_sa_in.remove_vpp_config()
1851 class TestIpsec4TunProtect(TemplateIpsec,
1852 TemplateIpsec4TunProtect,
1854 """ IPsec IPv4 Tunnel protect - transport mode"""
1857 super(TestIpsec4TunProtect, self).setUp()
1859 self.tun_if = self.pg0
1862 super(TestIpsec4TunProtect, self).tearDown()
1864 def test_tun_44(self):
1865 """IPSEC tunnel protect"""
1867 p = self.ipv4_params
1869 self.config_network(p)
1870 self.config_sa_tra(p)
1871 self.config_protect(p)
1873 self.verify_tun_44(p, count=127)
1874 c = p.tun_if.get_rx_stats()
1875 self.assertEqual(c['packets'], 127)
1876 c = p.tun_if.get_tx_stats()
1877 self.assertEqual(c['packets'], 127)
1879 self.vapi.cli("clear ipsec sa")
1880 self.verify_tun_64(p, count=127)
1881 c = p.tun_if.get_rx_stats()
1882 self.assertEqual(c['packets'], 254)
1883 c = p.tun_if.get_tx_stats()
1884 self.assertEqual(c['packets'], 254)
1886 # rekey - create new SAs and update the tunnel protection
1888 np.crypt_key = b'X' + p.crypt_key[1:]
1889 np.scapy_tun_spi += 100
1890 np.scapy_tun_sa_id += 1
1891 np.vpp_tun_spi += 100
1892 np.vpp_tun_sa_id += 1
1893 np.tun_if.local_spi = p.vpp_tun_spi
1894 np.tun_if.remote_spi = p.scapy_tun_spi
1896 self.config_sa_tra(np)
1897 self.config_protect(np)
1900 self.verify_tun_44(np, count=127)
1901 c = p.tun_if.get_rx_stats()
1902 self.assertEqual(c['packets'], 381)
1903 c = p.tun_if.get_tx_stats()
1904 self.assertEqual(c['packets'], 381)
1907 self.unconfig_protect(np)
1908 self.unconfig_sa(np)
1909 self.unconfig_network(p)
1912 class TestIpsec4TunProtectUdp(TemplateIpsec,
1913 TemplateIpsec4TunProtect,
1915 """ IPsec IPv4 Tunnel protect - transport mode"""
1918 super(TestIpsec4TunProtectUdp, self).setUp()
1920 self.tun_if = self.pg0
1922 p = self.ipv4_params
1923 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1924 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1925 p.nat_header = UDP(sport=4500, dport=4500)
1926 self.config_network(p)
1927 self.config_sa_tra(p)
1928 self.config_protect(p)
1931 p = self.ipv4_params
1932 self.unconfig_protect(p)
1934 self.unconfig_network(p)
1935 super(TestIpsec4TunProtectUdp, self).tearDown()
1937 def verify_encrypted(self, p, sa, rxs):
1938 # ensure encrypted packets are recieved with the default UDP ports
1940 self.assertEqual(rx[UDP].sport, 4500)
1941 self.assertEqual(rx[UDP].dport, 4500)
1942 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
1944 def test_tun_44(self):
1945 """IPSEC UDP tunnel protect"""
1947 p = self.ipv4_params
1949 self.verify_tun_44(p, count=127)
1950 c = p.tun_if.get_rx_stats()
1951 self.assertEqual(c['packets'], 127)
1952 c = p.tun_if.get_tx_stats()
1953 self.assertEqual(c['packets'], 127)
1955 def test_keepalive(self):
1956 """ IPSEC NAT Keepalive """
1957 self.verify_keepalive(self.ipv4_params)
1960 class TestIpsec4TunProtectTun(TemplateIpsec,
1961 TemplateIpsec4TunProtect,
1963 """ IPsec IPv4 Tunnel protect - tunnel mode"""
1965 encryption_type = ESP
1966 tun4_encrypt_node_name = "esp4-encrypt-tun"
1967 tun4_decrypt_node_name = "esp4-decrypt-tun"
1970 super(TestIpsec4TunProtectTun, self).setUp()
1972 self.tun_if = self.pg0
1975 super(TestIpsec4TunProtectTun, self).tearDown()
1977 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1979 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1980 sa.encrypt(IP(src=sw_intf.remote_ip4,
1981 dst=sw_intf.local_ip4) /
1982 IP(src=src, dst=dst) /
1983 UDP(sport=1144, dport=2233) /
1984 Raw(b'X' * payload_size))
1985 for i in range(count)]
1987 def gen_pkts(self, sw_intf, src, dst, count=1,
1989 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1990 IP(src=src, dst=dst) /
1991 UDP(sport=1144, dport=2233) /
1992 Raw(b'X' * payload_size)
1993 for i in range(count)]
1995 def verify_decrypted(self, p, rxs):
1997 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1998 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
1999 self.assert_packet_checksums_valid(rx)
2001 def verify_encrypted(self, p, sa, rxs):
2004 pkt = sa.decrypt(rx[IP])
2005 if not pkt.haslayer(IP):
2006 pkt = IP(pkt[Raw].load)
2007 self.assert_packet_checksums_valid(pkt)
2008 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2009 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2010 inner = pkt[IP].payload
2011 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2013 except (IndexError, AssertionError):
2014 self.logger.debug(ppp("Unexpected packet:", rx))
2016 self.logger.debug(ppp("Decrypted packet:", pkt))
2021 def test_tun_44(self):
2022 """IPSEC tunnel protect """
2024 p = self.ipv4_params
2026 self.config_network(p)
2027 self.config_sa_tun(p)
2028 self.config_protect(p)
2030 self.verify_tun_44(p, count=127)
2032 c = p.tun_if.get_rx_stats()
2033 self.assertEqual(c['packets'], 127)
2034 c = p.tun_if.get_tx_stats()
2035 self.assertEqual(c['packets'], 127)
2037 # rekey - create new SAs and update the tunnel protection
2039 np.crypt_key = b'X' + p.crypt_key[1:]
2040 np.scapy_tun_spi += 100
2041 np.scapy_tun_sa_id += 1
2042 np.vpp_tun_spi += 100
2043 np.vpp_tun_sa_id += 1
2044 np.tun_if.local_spi = p.vpp_tun_spi
2045 np.tun_if.remote_spi = p.scapy_tun_spi
2047 self.config_sa_tun(np)
2048 self.config_protect(np)
2051 self.verify_tun_44(np, count=127)
2052 c = p.tun_if.get_rx_stats()
2053 self.assertEqual(c['packets'], 254)
2054 c = p.tun_if.get_tx_stats()
2055 self.assertEqual(c['packets'], 254)
2058 self.unconfig_protect(np)
2059 self.unconfig_sa(np)
2060 self.unconfig_network(p)
2063 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2064 TemplateIpsec4TunProtect,
2066 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2068 encryption_type = ESP
2069 tun4_encrypt_node_name = "esp4-encrypt-tun"
2070 tun4_decrypt_node_name = "esp4-decrypt-tun"
2073 super(TestIpsec4TunProtectTunDrop, self).setUp()
2075 self.tun_if = self.pg0
2078 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2080 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2082 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2083 sa.encrypt(IP(src=sw_intf.remote_ip4,
2085 IP(src=src, dst=dst) /
2086 UDP(sport=1144, dport=2233) /
2087 Raw(b'X' * payload_size))
2088 for i in range(count)]
2090 def test_tun_drop_44(self):
2091 """IPSEC tunnel protect bogus tunnel header """
2093 p = self.ipv4_params
2095 self.config_network(p)
2096 self.config_sa_tun(p)
2097 self.config_protect(p)
2099 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2100 src=p.remote_tun_if_host,
2101 dst=self.pg1.remote_ip4,
2103 self.send_and_assert_no_replies(self.tun_if, tx)
2106 self.unconfig_protect(p)
2108 self.unconfig_network(p)
2111 class TemplateIpsec6TunProtect(object):
2112 """ IPsec IPv6 Tunnel protect """
2114 def config_sa_tra(self, p):
2115 config_tun_params(p, self.encryption_type, p.tun_if)
2117 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2118 p.auth_algo_vpp_id, p.auth_key,
2119 p.crypt_algo_vpp_id, p.crypt_key,
2120 self.vpp_esp_protocol)
2121 p.tun_sa_out.add_vpp_config()
2123 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2124 p.auth_algo_vpp_id, p.auth_key,
2125 p.crypt_algo_vpp_id, p.crypt_key,
2126 self.vpp_esp_protocol)
2127 p.tun_sa_in.add_vpp_config()
2129 def config_sa_tun(self, p):
2130 config_tun_params(p, self.encryption_type, p.tun_if)
2132 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2133 p.auth_algo_vpp_id, p.auth_key,
2134 p.crypt_algo_vpp_id, p.crypt_key,
2135 self.vpp_esp_protocol,
2136 self.tun_if.local_addr[p.addr_type],
2137 self.tun_if.remote_addr[p.addr_type])
2138 p.tun_sa_out.add_vpp_config()
2140 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2141 p.auth_algo_vpp_id, p.auth_key,
2142 p.crypt_algo_vpp_id, p.crypt_key,
2143 self.vpp_esp_protocol,
2144 self.tun_if.remote_addr[p.addr_type],
2145 self.tun_if.local_addr[p.addr_type])
2146 p.tun_sa_in.add_vpp_config()
2148 def config_protect(self, p):
2149 p.tun_protect = VppIpsecTunProtect(self,
2153 p.tun_protect.add_vpp_config()
2155 def config_network(self, p):
2156 p.tun_if = VppIpIpTunInterface(self, self.pg0,
2158 self.pg0.remote_ip6)
2159 p.tun_if.add_vpp_config()
2161 p.tun_if.config_ip6()
2162 p.tun_if.config_ip4()
2164 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2165 [VppRoutePath(p.tun_if.remote_ip6,
2167 proto=DpoProto.DPO_PROTO_IP6)])
2168 p.route.add_vpp_config()
2169 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2170 [VppRoutePath(p.tun_if.remote_ip4,
2174 def unconfig_network(self, p):
2175 p.route.remove_vpp_config()
2176 p.tun_if.remove_vpp_config()
2178 def unconfig_protect(self, p):
2179 p.tun_protect.remove_vpp_config()
2181 def unconfig_sa(self, p):
2182 p.tun_sa_out.remove_vpp_config()
2183 p.tun_sa_in.remove_vpp_config()
2186 class TestIpsec6TunProtect(TemplateIpsec,
2187 TemplateIpsec6TunProtect,
2189 """ IPsec IPv6 Tunnel protect - transport mode"""
2191 encryption_type = ESP
2192 tun6_encrypt_node_name = "esp6-encrypt-tun"
2193 tun6_decrypt_node_name = "esp6-decrypt-tun"
2196 super(TestIpsec6TunProtect, self).setUp()
2198 self.tun_if = self.pg0
2201 super(TestIpsec6TunProtect, self).tearDown()
2203 def test_tun_66(self):
2204 """IPSEC tunnel protect 6o6"""
2206 p = self.ipv6_params
2208 self.config_network(p)
2209 self.config_sa_tra(p)
2210 self.config_protect(p)
2212 self.verify_tun_66(p, count=127)
2213 c = p.tun_if.get_rx_stats()
2214 self.assertEqual(c['packets'], 127)
2215 c = p.tun_if.get_tx_stats()
2216 self.assertEqual(c['packets'], 127)
2218 # rekey - create new SAs and update the tunnel protection
2220 np.crypt_key = b'X' + p.crypt_key[1:]
2221 np.scapy_tun_spi += 100
2222 np.scapy_tun_sa_id += 1
2223 np.vpp_tun_spi += 100
2224 np.vpp_tun_sa_id += 1
2225 np.tun_if.local_spi = p.vpp_tun_spi
2226 np.tun_if.remote_spi = p.scapy_tun_spi
2228 self.config_sa_tra(np)
2229 self.config_protect(np)
2232 self.verify_tun_66(np, count=127)
2233 c = p.tun_if.get_rx_stats()
2234 self.assertEqual(c['packets'], 254)
2235 c = p.tun_if.get_tx_stats()
2236 self.assertEqual(c['packets'], 254)
2238 # bounce the interface state
2239 p.tun_if.admin_down()
2240 self.verify_drop_tun_66(np, count=127)
2241 node = ('/err/ipsec6-tun-input/%s' %
2242 'ipsec packets received on disabled interface')
2243 self.assertEqual(127, self.statistics.get_err_counter(node))
2245 self.verify_tun_66(np, count=127)
2248 # 1) add two input SAs [old, new]
2249 # 2) swap output SA to [new]
2250 # 3) use only [new] input SA
2252 np3.crypt_key = b'Z' + p.crypt_key[1:]
2253 np3.scapy_tun_spi += 100
2254 np3.scapy_tun_sa_id += 1
2255 np3.vpp_tun_spi += 100
2256 np3.vpp_tun_sa_id += 1
2257 np3.tun_if.local_spi = p.vpp_tun_spi
2258 np3.tun_if.remote_spi = p.scapy_tun_spi
2260 self.config_sa_tra(np3)
2263 p.tun_protect.update_vpp_config(np.tun_sa_out,
2264 [np.tun_sa_in, np3.tun_sa_in])
2265 self.verify_tun_66(np, np, count=127)
2266 self.verify_tun_66(np3, np, count=127)
2269 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2270 [np.tun_sa_in, np3.tun_sa_in])
2271 self.verify_tun_66(np, np3, count=127)
2272 self.verify_tun_66(np3, np3, count=127)
2275 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2277 self.verify_tun_66(np3, np3, count=127)
2278 self.verify_drop_tun_66(np, count=127)
2280 c = p.tun_if.get_rx_stats()
2281 self.assertEqual(c['packets'], 127*9)
2282 c = p.tun_if.get_tx_stats()
2283 self.assertEqual(c['packets'], 127*8)
2284 self.unconfig_sa(np)
2287 self.unconfig_protect(np3)
2288 self.unconfig_sa(np3)
2289 self.unconfig_network(p)
2291 def test_tun_46(self):
2292 """IPSEC tunnel protect 4o6"""
2294 p = self.ipv6_params
2296 self.config_network(p)
2297 self.config_sa_tra(p)
2298 self.config_protect(p)
2300 self.verify_tun_46(p, count=127)
2301 c = p.tun_if.get_rx_stats()
2302 self.assertEqual(c['packets'], 127)
2303 c = p.tun_if.get_tx_stats()
2304 self.assertEqual(c['packets'], 127)
2307 self.unconfig_protect(p)
2309 self.unconfig_network(p)
2312 class TestIpsec6TunProtectTun(TemplateIpsec,
2313 TemplateIpsec6TunProtect,
2315 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2317 encryption_type = ESP
2318 tun6_encrypt_node_name = "esp6-encrypt-tun"
2319 tun6_decrypt_node_name = "esp6-decrypt-tun"
2322 super(TestIpsec6TunProtectTun, self).setUp()
2324 self.tun_if = self.pg0
2327 super(TestIpsec6TunProtectTun, self).tearDown()
2329 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2331 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2332 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2333 dst=sw_intf.local_ip6) /
2334 IPv6(src=src, dst=dst) /
2335 UDP(sport=1166, dport=2233) /
2336 Raw(b'X' * payload_size))
2337 for i in range(count)]
2339 def gen_pkts6(self, sw_intf, src, dst, count=1,
2341 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2342 IPv6(src=src, dst=dst) /
2343 UDP(sport=1166, dport=2233) /
2344 Raw(b'X' * payload_size)
2345 for i in range(count)]
2347 def verify_decrypted6(self, p, rxs):
2349 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2350 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2351 self.assert_packet_checksums_valid(rx)
2353 def verify_encrypted6(self, p, sa, rxs):
2356 pkt = sa.decrypt(rx[IPv6])
2357 if not pkt.haslayer(IPv6):
2358 pkt = IPv6(pkt[Raw].load)
2359 self.assert_packet_checksums_valid(pkt)
2360 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2361 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2362 inner = pkt[IPv6].payload
2363 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2365 except (IndexError, AssertionError):
2366 self.logger.debug(ppp("Unexpected packet:", rx))
2368 self.logger.debug(ppp("Decrypted packet:", pkt))
2373 def test_tun_66(self):
2374 """IPSEC tunnel protect """
2376 p = self.ipv6_params
2378 self.config_network(p)
2379 self.config_sa_tun(p)
2380 self.config_protect(p)
2382 self.verify_tun_66(p, count=127)
2384 c = p.tun_if.get_rx_stats()
2385 self.assertEqual(c['packets'], 127)
2386 c = p.tun_if.get_tx_stats()
2387 self.assertEqual(c['packets'], 127)
2389 # rekey - create new SAs and update the tunnel protection
2391 np.crypt_key = b'X' + p.crypt_key[1:]
2392 np.scapy_tun_spi += 100
2393 np.scapy_tun_sa_id += 1
2394 np.vpp_tun_spi += 100
2395 np.vpp_tun_sa_id += 1
2396 np.tun_if.local_spi = p.vpp_tun_spi
2397 np.tun_if.remote_spi = p.scapy_tun_spi
2399 self.config_sa_tun(np)
2400 self.config_protect(np)
2403 self.verify_tun_66(np, count=127)
2404 c = p.tun_if.get_rx_stats()
2405 self.assertEqual(c['packets'], 254)
2406 c = p.tun_if.get_tx_stats()
2407 self.assertEqual(c['packets'], 254)
2410 self.unconfig_protect(np)
2411 self.unconfig_sa(np)
2412 self.unconfig_network(p)
2415 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2416 TemplateIpsec6TunProtect,
2418 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2420 encryption_type = ESP
2421 tun6_encrypt_node_name = "esp6-encrypt-tun"
2422 tun6_decrypt_node_name = "esp6-decrypt-tun"
2425 super(TestIpsec6TunProtectTunDrop, self).setUp()
2427 self.tun_if = self.pg0
2430 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2432 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2434 # the IP destination of the revelaed packet does not match
2435 # that assigned to the tunnel
2436 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2437 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2439 IPv6(src=src, dst=dst) /
2440 UDP(sport=1144, dport=2233) /
2441 Raw(b'X' * payload_size))
2442 for i in range(count)]
2444 def test_tun_drop_66(self):
2445 """IPSEC 6 tunnel protect bogus tunnel header """
2447 p = self.ipv6_params
2449 self.config_network(p)
2450 self.config_sa_tun(p)
2451 self.config_protect(p)
2453 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2454 src=p.remote_tun_if_host,
2455 dst=self.pg1.remote_ip6,
2457 self.send_and_assert_no_replies(self.tun_if, tx)
2459 self.unconfig_protect(p)
2461 self.unconfig_network(p)
2464 if __name__ == '__main__':
2465 unittest.main(testRunner=VppTestRunner)