5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
15 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19 VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34 IPSEC_API_SAD_FLAG_USE_ESN))
35 crypt_key = mk_scapy_crypt_key(p)
37 p.tun_dst = tun_if.remote_ip
38 p.tun_src = tun_if.local_ip
43 p.scapy_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.vpp_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
53 p.vpp_tun_sa = SecurityAssociation(
54 encryption_type, spi=p.scapy_tun_spi,
55 crypt_algo=p.crypt_algo,
57 auth_algo=p.auth_algo, auth_key=p.auth_key,
58 tunnel_header=ip_class_by_addr_type[p.addr_type](
61 nat_t_header=p.nat_header,
65 def config_tra_params(p, encryption_type, tun_if):
66 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
67 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
68 IPSEC_API_SAD_FLAG_USE_ESN))
69 crypt_key = mk_scapy_crypt_key(p)
70 p.tun_dst = tun_if.remote_ip
71 p.tun_src = tun_if.local_ip
72 p.scapy_tun_sa = SecurityAssociation(
73 encryption_type, spi=p.vpp_tun_spi,
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo, auth_key=p.auth_key,
78 nat_t_header=p.nat_header)
79 p.vpp_tun_sa = SecurityAssociation(
80 encryption_type, spi=p.scapy_tun_spi,
81 crypt_algo=p.crypt_algo,
83 auth_algo=p.auth_algo, auth_key=p.auth_key,
85 nat_t_header=p.nat_header)
88 class TemplateIpsec4TunProtect(object):
89 """ IPsec IPv4 Tunnel protect """
92 tun4_encrypt_node_name = "esp4-encrypt-tun"
93 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
94 tun4_input_node = "ipsec4-tun-input"
96 def config_sa_tra(self, p):
97 config_tun_params(p, self.encryption_type, p.tun_if)
99 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
100 p.auth_algo_vpp_id, p.auth_key,
101 p.crypt_algo_vpp_id, p.crypt_key,
102 self.vpp_esp_protocol,
104 p.tun_sa_out.add_vpp_config()
106 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
107 p.auth_algo_vpp_id, p.auth_key,
108 p.crypt_algo_vpp_id, p.crypt_key,
109 self.vpp_esp_protocol,
111 p.tun_sa_in.add_vpp_config()
113 def config_sa_tun(self, p):
114 config_tun_params(p, self.encryption_type, p.tun_if)
116 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
117 p.auth_algo_vpp_id, p.auth_key,
118 p.crypt_algo_vpp_id, p.crypt_key,
119 self.vpp_esp_protocol,
120 self.tun_if.local_addr[p.addr_type],
121 self.tun_if.remote_addr[p.addr_type],
123 p.tun_sa_out.add_vpp_config()
125 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
126 p.auth_algo_vpp_id, p.auth_key,
127 p.crypt_algo_vpp_id, p.crypt_key,
128 self.vpp_esp_protocol,
129 self.tun_if.remote_addr[p.addr_type],
130 self.tun_if.local_addr[p.addr_type],
132 p.tun_sa_in.add_vpp_config()
134 def config_protect(self, p):
135 p.tun_protect = VppIpsecTunProtect(self,
139 p.tun_protect.add_vpp_config()
141 def config_network(self, p):
142 if hasattr(p, 'tun_dst'):
145 tun_dst = self.pg0.remote_ip4
146 p.tun_if = VppIpIpTunInterface(self, self.pg0,
149 p.tun_if.add_vpp_config()
151 p.tun_if.config_ip4()
152 p.tun_if.config_ip6()
154 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
155 [VppRoutePath(p.tun_if.remote_ip4,
157 p.route.add_vpp_config()
158 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
159 [VppRoutePath(p.tun_if.remote_ip6,
161 proto=DpoProto.DPO_PROTO_IP6)])
164 def unconfig_network(self, p):
165 p.route.remove_vpp_config()
166 p.tun_if.remove_vpp_config()
168 def unconfig_protect(self, p):
169 p.tun_protect.remove_vpp_config()
171 def unconfig_sa(self, p):
172 p.tun_sa_out.remove_vpp_config()
173 p.tun_sa_in.remove_vpp_config()
176 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
178 """ IPsec tunnel interface tests """
180 encryption_type = ESP
184 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
187 def tearDownClass(cls):
188 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
191 super(TemplateIpsec4TunIfEsp, self).setUp()
193 self.tun_if = self.pg0
197 self.config_network(p)
198 self.config_sa_tra(p)
199 self.config_protect(p)
202 super(TemplateIpsec4TunIfEsp, self).tearDown()
205 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
207 """ IPsec UDP tunnel interface tests """
209 tun4_encrypt_node_name = "esp4-encrypt-tun"
210 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
211 encryption_type = ESP
215 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
218 def tearDownClass(cls):
219 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
221 def verify_encrypted(self, p, sa, rxs):
224 # ensure the UDP ports are correct before we decrypt
226 self.assertTrue(rx.haslayer(UDP))
227 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
228 self.assert_equal(rx[UDP].dport, 4500)
230 pkt = sa.decrypt(rx[IP])
231 if not pkt.haslayer(IP):
232 pkt = IP(pkt[Raw].load)
234 self.assert_packet_checksums_valid(pkt)
235 self.assert_equal(pkt[IP].dst, "1.1.1.1")
236 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
237 except (IndexError, AssertionError):
238 self.logger.debug(ppp("Unexpected packet:", rx))
240 self.logger.debug(ppp("Decrypted packet:", pkt))
245 def config_sa_tra(self, p):
246 config_tun_params(p, self.encryption_type, p.tun_if)
248 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
249 p.auth_algo_vpp_id, p.auth_key,
250 p.crypt_algo_vpp_id, p.crypt_key,
251 self.vpp_esp_protocol,
253 udp_src=p.nat_header.sport,
254 udp_dst=p.nat_header.dport)
255 p.tun_sa_out.add_vpp_config()
257 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
258 p.auth_algo_vpp_id, p.auth_key,
259 p.crypt_algo_vpp_id, p.crypt_key,
260 self.vpp_esp_protocol,
262 udp_src=p.nat_header.sport,
263 udp_dst=p.nat_header.dport)
264 p.tun_sa_in.add_vpp_config()
267 super(TemplateIpsec4TunIfEspUdp, self).setUp()
270 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
271 IPSEC_API_SAD_FLAG_UDP_ENCAP)
272 p.nat_header = UDP(sport=5454, dport=4500)
274 self.tun_if = self.pg0
276 self.config_network(p)
277 self.config_sa_tra(p)
278 self.config_protect(p)
281 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
284 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
285 """ Ipsec ESP - TUN tests """
286 tun4_encrypt_node_name = "esp4-encrypt-tun"
287 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
289 def test_tun_basic64(self):
290 """ ipsec 6o4 tunnel basic test """
291 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
293 self.verify_tun_64(self.params[socket.AF_INET], count=1)
295 def test_tun_burst64(self):
296 """ ipsec 6o4 tunnel basic test """
297 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
299 self.verify_tun_64(self.params[socket.AF_INET], count=257)
301 def test_tun_basic_frag44(self):
302 """ ipsec 4o4 tunnel frag basic test """
303 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
307 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
309 self.verify_tun_44(self.params[socket.AF_INET],
310 count=1, payload_size=1800, n_rx=2)
311 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
315 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
316 """ Ipsec ESP UDP tests """
318 tun4_input_node = "ipsec4-tun-input"
321 super(TestIpsec4TunIfEspUdp, self).setUp()
323 def test_keepalive(self):
324 """ IPSEC NAT Keepalive """
325 self.verify_keepalive(self.ipv4_params)
328 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
329 """ Ipsec ESP UDP GCM tests """
331 tun4_input_node = "ipsec4-tun-input"
334 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
336 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
337 IPSEC_API_INTEG_ALG_NONE)
338 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
339 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
340 p.crypt_algo = "AES-GCM"
342 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
346 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
347 """ Ipsec ESP - TCP tests """
351 class TemplateIpsec6TunProtect(object):
352 """ IPsec IPv6 Tunnel protect """
354 def config_sa_tra(self, p):
355 config_tun_params(p, self.encryption_type, p.tun_if)
357 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
358 p.auth_algo_vpp_id, p.auth_key,
359 p.crypt_algo_vpp_id, p.crypt_key,
360 self.vpp_esp_protocol)
361 p.tun_sa_out.add_vpp_config()
363 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
364 p.auth_algo_vpp_id, p.auth_key,
365 p.crypt_algo_vpp_id, p.crypt_key,
366 self.vpp_esp_protocol)
367 p.tun_sa_in.add_vpp_config()
369 def config_sa_tun(self, p):
370 config_tun_params(p, self.encryption_type, p.tun_if)
372 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
373 p.auth_algo_vpp_id, p.auth_key,
374 p.crypt_algo_vpp_id, p.crypt_key,
375 self.vpp_esp_protocol,
376 self.tun_if.local_addr[p.addr_type],
377 self.tun_if.remote_addr[p.addr_type])
378 p.tun_sa_out.add_vpp_config()
380 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
381 p.auth_algo_vpp_id, p.auth_key,
382 p.crypt_algo_vpp_id, p.crypt_key,
383 self.vpp_esp_protocol,
384 self.tun_if.remote_addr[p.addr_type],
385 self.tun_if.local_addr[p.addr_type])
386 p.tun_sa_in.add_vpp_config()
388 def config_protect(self, p):
389 p.tun_protect = VppIpsecTunProtect(self,
393 p.tun_protect.add_vpp_config()
395 def config_network(self, p):
396 if hasattr(p, 'tun_dst'):
399 tun_dst = self.pg0.remote_ip6
400 p.tun_if = VppIpIpTunInterface(self, self.pg0,
403 p.tun_if.add_vpp_config()
405 p.tun_if.config_ip6()
406 p.tun_if.config_ip4()
408 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
409 [VppRoutePath(p.tun_if.remote_ip6,
411 proto=DpoProto.DPO_PROTO_IP6)])
412 p.route.add_vpp_config()
413 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
414 [VppRoutePath(p.tun_if.remote_ip4,
418 def unconfig_network(self, p):
419 p.route.remove_vpp_config()
420 p.tun_if.remove_vpp_config()
422 def unconfig_protect(self, p):
423 p.tun_protect.remove_vpp_config()
425 def unconfig_sa(self, p):
426 p.tun_sa_out.remove_vpp_config()
427 p.tun_sa_in.remove_vpp_config()
430 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
432 """ IPsec tunnel interface tests """
434 encryption_type = ESP
437 super(TemplateIpsec6TunIfEsp, self).setUp()
439 self.tun_if = self.pg0
442 self.config_network(p)
443 self.config_sa_tra(p)
444 self.config_protect(p)
447 super(TemplateIpsec6TunIfEsp, self).tearDown()
450 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
452 """ Ipsec ESP - TUN tests """
453 tun6_encrypt_node_name = "esp6-encrypt-tun"
454 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
456 def test_tun_basic46(self):
457 """ ipsec 4o6 tunnel basic test """
458 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
459 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
461 def test_tun_burst46(self):
462 """ ipsec 4o6 tunnel burst test """
463 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
464 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
467 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
468 IpsecTun6HandoffTests):
469 """ Ipsec ESP 6 Handoff tests """
470 tun6_encrypt_node_name = "esp6-encrypt-tun"
471 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
473 def test_tun_handoff_66_police(self):
474 """ ESP 6o6 tunnel with policer worker hand-off test """
475 self.vapi.cli("clear errors")
476 self.vapi.cli("clear ipsec sa")
479 p = self.params[socket.AF_INET6]
481 action_tx = PolicerAction(
482 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
484 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
485 conform_action=action_tx,
486 exceed_action=action_tx,
487 violate_action=action_tx)
488 policer.add_vpp_config()
490 # Start policing on tun
491 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
493 for pol_bind in [1, 0]:
494 policer.bind_vpp_config(pol_bind, True)
496 # inject alternately on worker 0 and 1.
497 for worker in [0, 1, 0, 1]:
498 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
500 src=p.remote_tun_if_host,
501 dst=self.pg1.remote_ip6,
503 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
504 self.pg1, worker=worker)
505 self.verify_decrypted6(p, recv_pkts)
506 self.logger.debug(self.vapi.cli("show trace max 100"))
508 stats = policer.get_stats()
509 stats0 = policer.get_stats(worker=0)
510 stats1 = policer.get_stats(worker=1)
513 # First pass: Worker 1, should have done all the policing
514 self.assertEqual(stats, stats1)
516 # Worker 0, should have handed everything off
517 self.assertEqual(stats0['conform_packets'], 0)
518 self.assertEqual(stats0['exceed_packets'], 0)
519 self.assertEqual(stats0['violate_packets'], 0)
521 # Second pass: both workers should have policed equal amounts
522 self.assertGreater(stats1['conform_packets'], 0)
523 self.assertEqual(stats1['exceed_packets'], 0)
524 self.assertGreater(stats1['violate_packets'], 0)
526 self.assertGreater(stats0['conform_packets'], 0)
527 self.assertEqual(stats0['exceed_packets'], 0)
528 self.assertGreater(stats0['violate_packets'], 0)
530 self.assertEqual(stats0['conform_packets'] +
531 stats0['violate_packets'],
532 stats1['conform_packets'] +
533 stats1['violate_packets'])
535 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
536 policer.remove_vpp_config()
539 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
540 IpsecTun4HandoffTests):
541 """ Ipsec ESP 4 Handoff tests """
542 tun4_encrypt_node_name = "esp4-encrypt-tun"
543 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
545 def test_tun_handoff_44_police(self):
546 """ ESP 4o4 tunnel with policer worker hand-off test """
547 self.vapi.cli("clear errors")
548 self.vapi.cli("clear ipsec sa")
551 p = self.params[socket.AF_INET]
553 action_tx = PolicerAction(
554 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
556 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
557 conform_action=action_tx,
558 exceed_action=action_tx,
559 violate_action=action_tx)
560 policer.add_vpp_config()
562 # Start policing on tun
563 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
565 for pol_bind in [1, 0]:
566 policer.bind_vpp_config(pol_bind, True)
568 # inject alternately on worker 0 and 1.
569 for worker in [0, 1, 0, 1]:
570 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
572 src=p.remote_tun_if_host,
573 dst=self.pg1.remote_ip4,
575 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
576 self.pg1, worker=worker)
577 self.verify_decrypted(p, recv_pkts)
578 self.logger.debug(self.vapi.cli("show trace max 100"))
580 stats = policer.get_stats()
581 stats0 = policer.get_stats(worker=0)
582 stats1 = policer.get_stats(worker=1)
585 # First pass: Worker 1, should have done all the policing
586 self.assertEqual(stats, stats1)
588 # Worker 0, should have handed everything off
589 self.assertEqual(stats0['conform_packets'], 0)
590 self.assertEqual(stats0['exceed_packets'], 0)
591 self.assertEqual(stats0['violate_packets'], 0)
593 # Second pass: both workers should have policed equal amounts
594 self.assertGreater(stats1['conform_packets'], 0)
595 self.assertEqual(stats1['exceed_packets'], 0)
596 self.assertGreater(stats1['violate_packets'], 0)
598 self.assertGreater(stats0['conform_packets'], 0)
599 self.assertEqual(stats0['exceed_packets'], 0)
600 self.assertGreater(stats0['violate_packets'], 0)
602 self.assertEqual(stats0['conform_packets'] +
603 stats0['violate_packets'],
604 stats1['conform_packets'] +
605 stats1['violate_packets'])
607 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
608 policer.remove_vpp_config()
611 @tag_fixme_vpp_workers
612 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
615 """ IPsec IPv4 Multi Tunnel interface """
617 encryption_type = ESP
618 tun4_encrypt_node_name = "esp4-encrypt-tun"
619 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
622 super(TestIpsec4MultiTunIfEsp, self).setUp()
624 self.tun_if = self.pg0
626 self.multi_params = []
627 self.pg0.generate_remote_hosts(10)
628 self.pg0.configure_ipv4_neighbors()
631 p = copy.copy(self.ipv4_params)
633 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
634 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635 p.scapy_tun_spi = p.scapy_tun_spi + ii
636 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637 p.vpp_tun_spi = p.vpp_tun_spi + ii
639 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640 p.scapy_tra_spi = p.scapy_tra_spi + ii
641 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642 p.vpp_tra_spi = p.vpp_tra_spi + ii
643 p.tun_dst = self.pg0.remote_hosts[ii].ip4
645 self.multi_params.append(p)
646 self.config_network(p)
647 self.config_sa_tra(p)
648 self.config_protect(p)
651 super(TestIpsec4MultiTunIfEsp, self).tearDown()
653 def test_tun_44(self):
654 """Multiple IPSEC tunnel interfaces """
655 for p in self.multi_params:
656 self.verify_tun_44(p, count=127)
657 self.assertEqual(p.tun_if.get_rx_stats(), 127)
658 self.assertEqual(p.tun_if.get_tx_stats(), 127)
660 def test_tun_rr_44(self):
661 """ Round-robin packets acrros multiple interface """
663 for p in self.multi_params:
664 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
665 src=p.remote_tun_if_host,
666 dst=self.pg1.remote_ip4)
667 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
669 for rx, p in zip(rxs, self.multi_params):
670 self.verify_decrypted(p, [rx])
673 for p in self.multi_params:
674 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
675 dst=p.remote_tun_if_host)
676 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
678 for rx, p in zip(rxs, self.multi_params):
679 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
682 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
685 """ IPsec IPv4 Tunnel interface all Algos """
687 encryption_type = ESP
688 tun4_encrypt_node_name = "esp4-encrypt-tun"
689 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
692 super(TestIpsec4TunIfEspAll, self).setUp()
694 self.tun_if = self.pg0
697 self.config_network(p)
698 self.config_sa_tra(p)
699 self.config_protect(p)
703 self.unconfig_protect(p)
704 self.unconfig_network(p)
707 super(TestIpsec4TunIfEspAll, self).tearDown()
711 # change the key and the SPI
714 p.crypt_key = b'X' + p.crypt_key[1:]
716 p.scapy_tun_sa_id += 1
719 p.tun_if.local_spi = p.vpp_tun_spi
720 p.tun_if.remote_spi = p.scapy_tun_spi
722 config_tun_params(p, self.encryption_type, p.tun_if)
724 p.tun_sa_out = VppIpsecSA(self,
731 self.vpp_esp_protocol,
734 p.tun_sa_in = VppIpsecSA(self,
741 self.vpp_esp_protocol,
744 p.tun_sa_in.add_vpp_config()
745 p.tun_sa_out.add_vpp_config()
747 self.config_protect(p)
748 np.tun_sa_out.remove_vpp_config()
749 np.tun_sa_in.remove_vpp_config()
750 self.logger.info(self.vapi.cli("sh ipsec sa"))
752 def test_tun_44(self):
753 """IPSEC tunnel all algos """
755 # foreach VPP crypto engine
756 engines = ["ia32", "ipsecmb", "openssl"]
758 # foreach crypto algorithm
759 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
760 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
761 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
762 IPSEC_API_INTEG_ALG_NONE),
763 'scapy-crypto': "AES-GCM",
764 'scapy-integ': "NULL",
765 'key': b"JPjyOWBeVEQiMe7h",
767 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
768 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
769 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
770 IPSEC_API_INTEG_ALG_NONE),
771 'scapy-crypto': "AES-GCM",
772 'scapy-integ': "NULL",
773 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
775 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
776 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
777 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
778 IPSEC_API_INTEG_ALG_NONE),
779 'scapy-crypto': "AES-GCM",
780 'scapy-integ': "NULL",
781 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
783 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
784 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
785 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
786 IPSEC_API_INTEG_ALG_SHA1_96),
787 'scapy-crypto': "AES-CBC",
788 'scapy-integ': "HMAC-SHA1-96",
790 'key': b"JPjyOWBeVEQiMe7h"},
791 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
792 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
793 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
794 IPSEC_API_INTEG_ALG_SHA_512_256),
795 'scapy-crypto': "AES-CBC",
796 'scapy-integ': "SHA2-512-256",
798 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
799 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
800 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
801 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
802 IPSEC_API_INTEG_ALG_SHA_256_128),
803 'scapy-crypto': "AES-CBC",
804 'scapy-integ': "SHA2-256-128",
806 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
807 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
808 IPSEC_API_CRYPTO_ALG_NONE),
809 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
810 IPSEC_API_INTEG_ALG_SHA1_96),
811 'scapy-crypto': "NULL",
812 'scapy-integ': "HMAC-SHA1-96",
814 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
816 for engine in engines:
817 self.vapi.cli("set crypto handler all %s" % engine)
820 # loop through each of the algorithms
823 # with self.subTest(algo=algo['scapy']):
826 p.auth_algo_vpp_id = algo['vpp-integ']
827 p.crypt_algo_vpp_id = algo['vpp-crypto']
828 p.crypt_algo = algo['scapy-crypto']
829 p.auth_algo = algo['scapy-integ']
830 p.crypt_key = algo['key']
831 p.salt = algo['salt']
837 self.verify_tun_44(p, count=127)
840 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
843 """ IPsec IPv4 Tunnel interface no Algos """
845 encryption_type = ESP
846 tun4_encrypt_node_name = "esp4-encrypt-tun"
847 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
850 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
852 self.tun_if = self.pg0
854 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
855 IPSEC_API_INTEG_ALG_NONE)
859 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
860 IPSEC_API_CRYPTO_ALG_NONE)
861 p.crypt_algo = 'NULL'
865 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
867 def test_tun_44(self):
868 """ IPSec SA with NULL algos """
871 self.config_network(p)
872 self.config_sa_tra(p)
873 self.config_protect(p)
875 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
876 dst=p.remote_tun_if_host)
877 self.send_and_assert_no_replies(self.pg1, tx)
879 self.unconfig_protect(p)
881 self.unconfig_network(p)
884 @tag_fixme_vpp_workers
885 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
888 """ IPsec IPv6 Multi Tunnel interface """
890 encryption_type = ESP
891 tun6_encrypt_node_name = "esp6-encrypt-tun"
892 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
895 super(TestIpsec6MultiTunIfEsp, self).setUp()
897 self.tun_if = self.pg0
899 self.multi_params = []
900 self.pg0.generate_remote_hosts(10)
901 self.pg0.configure_ipv6_neighbors()
904 p = copy.copy(self.ipv6_params)
906 p.remote_tun_if_host = "1111::%d" % (ii + 1)
907 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
908 p.scapy_tun_spi = p.scapy_tun_spi + ii
909 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
910 p.vpp_tun_spi = p.vpp_tun_spi + ii
912 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
913 p.scapy_tra_spi = p.scapy_tra_spi + ii
914 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
915 p.vpp_tra_spi = p.vpp_tra_spi + ii
916 p.tun_dst = self.pg0.remote_hosts[ii].ip6
918 self.multi_params.append(p)
919 self.config_network(p)
920 self.config_sa_tra(p)
921 self.config_protect(p)
924 super(TestIpsec6MultiTunIfEsp, self).tearDown()
926 def test_tun_66(self):
927 """Multiple IPSEC tunnel interfaces """
928 for p in self.multi_params:
929 self.verify_tun_66(p, count=127)
930 self.assertEqual(p.tun_if.get_rx_stats(), 127)
931 self.assertEqual(p.tun_if.get_tx_stats(), 127)
934 class TestIpsecGreTebIfEsp(TemplateIpsec,
936 """ Ipsec GRE TEB ESP - TUN tests """
937 tun4_encrypt_node_name = "esp4-encrypt-tun"
938 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
939 encryption_type = ESP
940 omac = "00:11:22:33:44:55"
942 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
944 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
945 sa.encrypt(IP(src=self.pg0.remote_ip4,
946 dst=self.pg0.local_ip4) /
948 Ether(dst=self.omac) /
949 IP(src="1.1.1.1", dst="1.1.1.2") /
950 UDP(sport=1144, dport=2233) /
951 Raw(b'X' * payload_size))
952 for i in range(count)]
954 def gen_pkts(self, sw_intf, src, dst, count=1,
956 return [Ether(dst=self.omac) /
957 IP(src="1.1.1.1", dst="1.1.1.2") /
958 UDP(sport=1144, dport=2233) /
959 Raw(b'X' * payload_size)
960 for i in range(count)]
962 def verify_decrypted(self, p, rxs):
964 self.assert_equal(rx[Ether].dst, self.omac)
965 self.assert_equal(rx[IP].dst, "1.1.1.2")
967 def verify_encrypted(self, p, sa, rxs):
970 pkt = sa.decrypt(rx[IP])
971 if not pkt.haslayer(IP):
972 pkt = IP(pkt[Raw].load)
973 self.assert_packet_checksums_valid(pkt)
974 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
975 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
976 self.assertTrue(pkt.haslayer(GRE))
978 self.assertEqual(e[Ether].dst, self.omac)
979 self.assertEqual(e[IP].dst, "1.1.1.2")
980 except (IndexError, AssertionError):
981 self.logger.debug(ppp("Unexpected packet:", rx))
983 self.logger.debug(ppp("Decrypted packet:", pkt))
989 super(TestIpsecGreTebIfEsp, self).setUp()
991 self.tun_if = self.pg0
995 bd1 = VppBridgeDomain(self, 1)
998 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
999 p.auth_algo_vpp_id, p.auth_key,
1000 p.crypt_algo_vpp_id, p.crypt_key,
1001 self.vpp_esp_protocol,
1003 self.pg0.remote_ip4)
1004 p.tun_sa_out.add_vpp_config()
1006 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1007 p.auth_algo_vpp_id, p.auth_key,
1008 p.crypt_algo_vpp_id, p.crypt_key,
1009 self.vpp_esp_protocol,
1010 self.pg0.remote_ip4,
1012 p.tun_sa_in.add_vpp_config()
1014 p.tun_if = VppGreInterface(self,
1016 self.pg0.remote_ip4,
1017 type=(VppEnum.vl_api_gre_tunnel_type_t.
1018 GRE_API_TUNNEL_TYPE_TEB))
1019 p.tun_if.add_vpp_config()
1021 p.tun_protect = VppIpsecTunProtect(self,
1026 p.tun_protect.add_vpp_config()
1029 p.tun_if.config_ip4()
1030 config_tun_params(p, self.encryption_type, p.tun_if)
1032 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1033 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1035 self.vapi.cli("clear ipsec sa")
1036 self.vapi.cli("sh adj")
1037 self.vapi.cli("sh ipsec tun")
1040 p = self.ipv4_params
1041 p.tun_if.unconfig_ip4()
1042 super(TestIpsecGreTebIfEsp, self).tearDown()
1045 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1047 """ Ipsec GRE TEB ESP - TUN tests """
1048 tun4_encrypt_node_name = "esp4-encrypt-tun"
1049 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1050 encryption_type = ESP
1051 omac = "00:11:22:33:44:55"
1053 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1055 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1056 sa.encrypt(IP(src=self.pg0.remote_ip4,
1057 dst=self.pg0.local_ip4) /
1059 Ether(dst=self.omac) /
1060 IP(src="1.1.1.1", dst="1.1.1.2") /
1061 UDP(sport=1144, dport=2233) /
1062 Raw(b'X' * payload_size))
1063 for i in range(count)]
1065 def gen_pkts(self, sw_intf, src, dst, count=1,
1067 return [Ether(dst=self.omac) /
1069 IP(src="1.1.1.1", dst="1.1.1.2") /
1070 UDP(sport=1144, dport=2233) /
1071 Raw(b'X' * payload_size)
1072 for i in range(count)]
1074 def verify_decrypted(self, p, rxs):
1076 self.assert_equal(rx[Ether].dst, self.omac)
1077 self.assert_equal(rx[Dot1Q].vlan, 11)
1078 self.assert_equal(rx[IP].dst, "1.1.1.2")
1080 def verify_encrypted(self, p, sa, rxs):
1083 pkt = sa.decrypt(rx[IP])
1084 if not pkt.haslayer(IP):
1085 pkt = IP(pkt[Raw].load)
1086 self.assert_packet_checksums_valid(pkt)
1087 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1088 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1089 self.assertTrue(pkt.haslayer(GRE))
1091 self.assertEqual(e[Ether].dst, self.omac)
1092 self.assertFalse(e.haslayer(Dot1Q))
1093 self.assertEqual(e[IP].dst, "1.1.1.2")
1094 except (IndexError, AssertionError):
1095 self.logger.debug(ppp("Unexpected packet:", rx))
1097 self.logger.debug(ppp("Decrypted packet:", pkt))
1103 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1105 self.tun_if = self.pg0
1107 p = self.ipv4_params
1109 bd1 = VppBridgeDomain(self, 1)
1110 bd1.add_vpp_config()
1112 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1113 self.vapi.l2_interface_vlan_tag_rewrite(
1114 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1116 self.pg1_11.admin_up()
1118 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1119 p.auth_algo_vpp_id, p.auth_key,
1120 p.crypt_algo_vpp_id, p.crypt_key,
1121 self.vpp_esp_protocol,
1123 self.pg0.remote_ip4)
1124 p.tun_sa_out.add_vpp_config()
1126 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1127 p.auth_algo_vpp_id, p.auth_key,
1128 p.crypt_algo_vpp_id, p.crypt_key,
1129 self.vpp_esp_protocol,
1130 self.pg0.remote_ip4,
1132 p.tun_sa_in.add_vpp_config()
1134 p.tun_if = VppGreInterface(self,
1136 self.pg0.remote_ip4,
1137 type=(VppEnum.vl_api_gre_tunnel_type_t.
1138 GRE_API_TUNNEL_TYPE_TEB))
1139 p.tun_if.add_vpp_config()
1141 p.tun_protect = VppIpsecTunProtect(self,
1146 p.tun_protect.add_vpp_config()
1149 p.tun_if.config_ip4()
1150 config_tun_params(p, self.encryption_type, p.tun_if)
1152 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1153 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1155 self.vapi.cli("clear ipsec sa")
1158 p = self.ipv4_params
1159 p.tun_if.unconfig_ip4()
1160 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1161 self.pg1_11.admin_down()
1162 self.pg1_11.remove_vpp_config()
1165 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1167 """ Ipsec GRE TEB ESP - Tra tests """
1168 tun4_encrypt_node_name = "esp4-encrypt-tun"
1169 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1170 encryption_type = ESP
1171 omac = "00:11:22:33:44:55"
1173 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1175 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1176 sa.encrypt(IP(src=self.pg0.remote_ip4,
1177 dst=self.pg0.local_ip4) /
1179 Ether(dst=self.omac) /
1180 IP(src="1.1.1.1", dst="1.1.1.2") /
1181 UDP(sport=1144, dport=2233) /
1182 Raw(b'X' * payload_size))
1183 for i in range(count)]
1185 def gen_pkts(self, sw_intf, src, dst, count=1,
1187 return [Ether(dst=self.omac) /
1188 IP(src="1.1.1.1", dst="1.1.1.2") /
1189 UDP(sport=1144, dport=2233) /
1190 Raw(b'X' * payload_size)
1191 for i in range(count)]
1193 def verify_decrypted(self, p, rxs):
1195 self.assert_equal(rx[Ether].dst, self.omac)
1196 self.assert_equal(rx[IP].dst, "1.1.1.2")
1198 def verify_encrypted(self, p, sa, rxs):
1201 pkt = sa.decrypt(rx[IP])
1202 if not pkt.haslayer(IP):
1203 pkt = IP(pkt[Raw].load)
1204 self.assert_packet_checksums_valid(pkt)
1205 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1206 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1207 self.assertTrue(pkt.haslayer(GRE))
1209 self.assertEqual(e[Ether].dst, self.omac)
1210 self.assertEqual(e[IP].dst, "1.1.1.2")
1211 except (IndexError, AssertionError):
1212 self.logger.debug(ppp("Unexpected packet:", rx))
1214 self.logger.debug(ppp("Decrypted packet:", pkt))
1220 super(TestIpsecGreTebIfEspTra, self).setUp()
1222 self.tun_if = self.pg0
1224 p = self.ipv4_params
1226 bd1 = VppBridgeDomain(self, 1)
1227 bd1.add_vpp_config()
1229 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1230 p.auth_algo_vpp_id, p.auth_key,
1231 p.crypt_algo_vpp_id, p.crypt_key,
1232 self.vpp_esp_protocol)
1233 p.tun_sa_out.add_vpp_config()
1235 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1236 p.auth_algo_vpp_id, p.auth_key,
1237 p.crypt_algo_vpp_id, p.crypt_key,
1238 self.vpp_esp_protocol)
1239 p.tun_sa_in.add_vpp_config()
1241 p.tun_if = VppGreInterface(self,
1243 self.pg0.remote_ip4,
1244 type=(VppEnum.vl_api_gre_tunnel_type_t.
1245 GRE_API_TUNNEL_TYPE_TEB))
1246 p.tun_if.add_vpp_config()
1248 p.tun_protect = VppIpsecTunProtect(self,
1253 p.tun_protect.add_vpp_config()
1256 p.tun_if.config_ip4()
1257 config_tra_params(p, self.encryption_type, p.tun_if)
1259 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1260 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1262 self.vapi.cli("clear ipsec sa")
1265 p = self.ipv4_params
1266 p.tun_if.unconfig_ip4()
1267 super(TestIpsecGreTebIfEspTra, self).tearDown()
1270 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1272 """ Ipsec GRE TEB UDP ESP - Tra tests """
1273 tun4_encrypt_node_name = "esp4-encrypt-tun"
1274 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1275 encryption_type = ESP
1276 omac = "00:11:22:33:44:55"
1278 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1280 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1281 sa.encrypt(IP(src=self.pg0.remote_ip4,
1282 dst=self.pg0.local_ip4) /
1284 Ether(dst=self.omac) /
1285 IP(src="1.1.1.1", dst="1.1.1.2") /
1286 UDP(sport=1144, dport=2233) /
1287 Raw(b'X' * payload_size))
1288 for i in range(count)]
1290 def gen_pkts(self, sw_intf, src, dst, count=1,
1292 return [Ether(dst=self.omac) /
1293 IP(src="1.1.1.1", dst="1.1.1.2") /
1294 UDP(sport=1144, dport=2233) /
1295 Raw(b'X' * payload_size)
1296 for i in range(count)]
1298 def verify_decrypted(self, p, rxs):
1300 self.assert_equal(rx[Ether].dst, self.omac)
1301 self.assert_equal(rx[IP].dst, "1.1.1.2")
1303 def verify_encrypted(self, p, sa, rxs):
1305 self.assertTrue(rx.haslayer(UDP))
1306 self.assertEqual(rx[UDP].dport, 4545)
1307 self.assertEqual(rx[UDP].sport, 5454)
1309 pkt = sa.decrypt(rx[IP])
1310 if not pkt.haslayer(IP):
1311 pkt = IP(pkt[Raw].load)
1312 self.assert_packet_checksums_valid(pkt)
1313 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1314 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1315 self.assertTrue(pkt.haslayer(GRE))
1317 self.assertEqual(e[Ether].dst, self.omac)
1318 self.assertEqual(e[IP].dst, "1.1.1.2")
1319 except (IndexError, AssertionError):
1320 self.logger.debug(ppp("Unexpected packet:", rx))
1322 self.logger.debug(ppp("Decrypted packet:", pkt))
1328 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1330 self.tun_if = self.pg0
1332 p = self.ipv4_params
1333 p = self.ipv4_params
1334 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1335 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1336 p.nat_header = UDP(sport=5454, dport=4545)
1338 bd1 = VppBridgeDomain(self, 1)
1339 bd1.add_vpp_config()
1341 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1342 p.auth_algo_vpp_id, p.auth_key,
1343 p.crypt_algo_vpp_id, p.crypt_key,
1344 self.vpp_esp_protocol,
1348 p.tun_sa_out.add_vpp_config()
1350 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1351 p.auth_algo_vpp_id, p.auth_key,
1352 p.crypt_algo_vpp_id, p.crypt_key,
1353 self.vpp_esp_protocol,
1355 VppEnum.vl_api_ipsec_sad_flags_t.
1356 IPSEC_API_SAD_FLAG_IS_INBOUND),
1359 p.tun_sa_in.add_vpp_config()
1361 p.tun_if = VppGreInterface(self,
1363 self.pg0.remote_ip4,
1364 type=(VppEnum.vl_api_gre_tunnel_type_t.
1365 GRE_API_TUNNEL_TYPE_TEB))
1366 p.tun_if.add_vpp_config()
1368 p.tun_protect = VppIpsecTunProtect(self,
1373 p.tun_protect.add_vpp_config()
1376 p.tun_if.config_ip4()
1377 config_tra_params(p, self.encryption_type, p.tun_if)
1379 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1380 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1382 self.vapi.cli("clear ipsec sa")
1383 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1386 p = self.ipv4_params
1387 p.tun_if.unconfig_ip4()
1388 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1391 class TestIpsecGreIfEsp(TemplateIpsec,
1393 """ Ipsec GRE ESP - TUN tests """
1394 tun4_encrypt_node_name = "esp4-encrypt-tun"
1395 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1396 encryption_type = ESP
1398 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1400 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1401 sa.encrypt(IP(src=self.pg0.remote_ip4,
1402 dst=self.pg0.local_ip4) /
1404 IP(src=self.pg1.local_ip4,
1405 dst=self.pg1.remote_ip4) /
1406 UDP(sport=1144, dport=2233) /
1407 Raw(b'X' * payload_size))
1408 for i in range(count)]
1410 def gen_pkts(self, sw_intf, src, dst, count=1,
1412 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1413 IP(src="1.1.1.1", dst="1.1.1.2") /
1414 UDP(sport=1144, dport=2233) /
1415 Raw(b'X' * payload_size)
1416 for i in range(count)]
1418 def verify_decrypted(self, p, rxs):
1420 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1421 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1423 def verify_encrypted(self, p, sa, rxs):
1426 pkt = sa.decrypt(rx[IP])
1427 if not pkt.haslayer(IP):
1428 pkt = IP(pkt[Raw].load)
1429 self.assert_packet_checksums_valid(pkt)
1430 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1431 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1432 self.assertTrue(pkt.haslayer(GRE))
1434 self.assertEqual(e[IP].dst, "1.1.1.2")
1435 except (IndexError, AssertionError):
1436 self.logger.debug(ppp("Unexpected packet:", rx))
1438 self.logger.debug(ppp("Decrypted packet:", pkt))
1444 super(TestIpsecGreIfEsp, self).setUp()
1446 self.tun_if = self.pg0
1448 p = self.ipv4_params
1450 bd1 = VppBridgeDomain(self, 1)
1451 bd1.add_vpp_config()
1453 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1454 p.auth_algo_vpp_id, p.auth_key,
1455 p.crypt_algo_vpp_id, p.crypt_key,
1456 self.vpp_esp_protocol,
1458 self.pg0.remote_ip4)
1459 p.tun_sa_out.add_vpp_config()
1461 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1462 p.auth_algo_vpp_id, p.auth_key,
1463 p.crypt_algo_vpp_id, p.crypt_key,
1464 self.vpp_esp_protocol,
1465 self.pg0.remote_ip4,
1467 p.tun_sa_in.add_vpp_config()
1469 p.tun_if = VppGreInterface(self,
1471 self.pg0.remote_ip4)
1472 p.tun_if.add_vpp_config()
1474 p.tun_protect = VppIpsecTunProtect(self,
1478 p.tun_protect.add_vpp_config()
1481 p.tun_if.config_ip4()
1482 config_tun_params(p, self.encryption_type, p.tun_if)
1484 VppIpRoute(self, "1.1.1.2", 32,
1485 [VppRoutePath(p.tun_if.remote_ip4,
1486 0xffffffff)]).add_vpp_config()
1489 p = self.ipv4_params
1490 p.tun_if.unconfig_ip4()
1491 super(TestIpsecGreIfEsp, self).tearDown()
1494 class TestIpsecGreIfEspTra(TemplateIpsec,
1496 """ Ipsec GRE ESP - TRA tests """
1497 tun4_encrypt_node_name = "esp4-encrypt-tun"
1498 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1499 encryption_type = ESP
1501 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1503 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1504 sa.encrypt(IP(src=self.pg0.remote_ip4,
1505 dst=self.pg0.local_ip4) /
1507 IP(src=self.pg1.local_ip4,
1508 dst=self.pg1.remote_ip4) /
1509 UDP(sport=1144, dport=2233) /
1510 Raw(b'X' * payload_size))
1511 for i in range(count)]
1513 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1515 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1516 sa.encrypt(IP(src=self.pg0.remote_ip4,
1517 dst=self.pg0.local_ip4) /
1519 UDP(sport=1144, dport=2233) /
1520 Raw(b'X' * payload_size))
1521 for i in range(count)]
1523 def gen_pkts(self, sw_intf, src, dst, count=1,
1525 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1526 IP(src="1.1.1.1", dst="1.1.1.2") /
1527 UDP(sport=1144, dport=2233) /
1528 Raw(b'X' * payload_size)
1529 for i in range(count)]
1531 def verify_decrypted(self, p, rxs):
1533 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1534 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1536 def verify_encrypted(self, p, sa, rxs):
1539 pkt = sa.decrypt(rx[IP])
1540 if not pkt.haslayer(IP):
1541 pkt = IP(pkt[Raw].load)
1542 self.assert_packet_checksums_valid(pkt)
1543 self.assertTrue(pkt.haslayer(GRE))
1545 self.assertEqual(e[IP].dst, "1.1.1.2")
1546 except (IndexError, AssertionError):
1547 self.logger.debug(ppp("Unexpected packet:", rx))
1549 self.logger.debug(ppp("Decrypted packet:", pkt))
1555 super(TestIpsecGreIfEspTra, self).setUp()
1557 self.tun_if = self.pg0
1559 p = self.ipv4_params
1561 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1562 p.auth_algo_vpp_id, p.auth_key,
1563 p.crypt_algo_vpp_id, p.crypt_key,
1564 self.vpp_esp_protocol)
1565 p.tun_sa_out.add_vpp_config()
1567 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1568 p.auth_algo_vpp_id, p.auth_key,
1569 p.crypt_algo_vpp_id, p.crypt_key,
1570 self.vpp_esp_protocol)
1571 p.tun_sa_in.add_vpp_config()
1573 p.tun_if = VppGreInterface(self,
1575 self.pg0.remote_ip4)
1576 p.tun_if.add_vpp_config()
1578 p.tun_protect = VppIpsecTunProtect(self,
1582 p.tun_protect.add_vpp_config()
1585 p.tun_if.config_ip4()
1586 config_tra_params(p, self.encryption_type, p.tun_if)
1588 VppIpRoute(self, "1.1.1.2", 32,
1589 [VppRoutePath(p.tun_if.remote_ip4,
1590 0xffffffff)]).add_vpp_config()
1593 p = self.ipv4_params
1594 p.tun_if.unconfig_ip4()
1595 super(TestIpsecGreIfEspTra, self).tearDown()
1597 def test_gre_non_ip(self):
1598 p = self.ipv4_params
1599 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1600 src=p.remote_tun_if_host,
1601 dst=self.pg1.remote_ip6)
1602 self.send_and_assert_no_replies(self.tun_if, tx)
1603 node_name = ('/err/%s/unsupported payload' %
1604 self.tun4_decrypt_node_name[0])
1605 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1608 class TestIpsecGre6IfEspTra(TemplateIpsec,
1610 """ Ipsec GRE ESP - TRA tests """
1611 tun6_encrypt_node_name = "esp6-encrypt-tun"
1612 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1613 encryption_type = ESP
1615 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1617 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1618 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1619 dst=self.pg0.local_ip6) /
1621 IPv6(src=self.pg1.local_ip6,
1622 dst=self.pg1.remote_ip6) /
1623 UDP(sport=1144, dport=2233) /
1624 Raw(b'X' * payload_size))
1625 for i in range(count)]
1627 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1629 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1630 IPv6(src="1::1", dst="1::2") /
1631 UDP(sport=1144, dport=2233) /
1632 Raw(b'X' * payload_size)
1633 for i in range(count)]
1635 def verify_decrypted6(self, p, rxs):
1637 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1638 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1640 def verify_encrypted6(self, p, sa, rxs):
1643 pkt = sa.decrypt(rx[IPv6])
1644 if not pkt.haslayer(IPv6):
1645 pkt = IPv6(pkt[Raw].load)
1646 self.assert_packet_checksums_valid(pkt)
1647 self.assertTrue(pkt.haslayer(GRE))
1649 self.assertEqual(e[IPv6].dst, "1::2")
1650 except (IndexError, AssertionError):
1651 self.logger.debug(ppp("Unexpected packet:", rx))
1653 self.logger.debug(ppp("Decrypted packet:", pkt))
1659 super(TestIpsecGre6IfEspTra, self).setUp()
1661 self.tun_if = self.pg0
1663 p = self.ipv6_params
1665 bd1 = VppBridgeDomain(self, 1)
1666 bd1.add_vpp_config()
1668 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1669 p.auth_algo_vpp_id, p.auth_key,
1670 p.crypt_algo_vpp_id, p.crypt_key,
1671 self.vpp_esp_protocol)
1672 p.tun_sa_out.add_vpp_config()
1674 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1675 p.auth_algo_vpp_id, p.auth_key,
1676 p.crypt_algo_vpp_id, p.crypt_key,
1677 self.vpp_esp_protocol)
1678 p.tun_sa_in.add_vpp_config()
1680 p.tun_if = VppGreInterface(self,
1682 self.pg0.remote_ip6)
1683 p.tun_if.add_vpp_config()
1685 p.tun_protect = VppIpsecTunProtect(self,
1689 p.tun_protect.add_vpp_config()
1692 p.tun_if.config_ip6()
1693 config_tra_params(p, self.encryption_type, p.tun_if)
1695 r = VppIpRoute(self, "1::2", 128,
1696 [VppRoutePath(p.tun_if.remote_ip6,
1698 proto=DpoProto.DPO_PROTO_IP6)])
1702 p = self.ipv6_params
1703 p.tun_if.unconfig_ip6()
1704 super(TestIpsecGre6IfEspTra, self).tearDown()
1707 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1708 """ Ipsec mGRE ESP v4 TRA tests """
1709 tun4_encrypt_node_name = "esp4-encrypt-tun"
1710 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1711 encryption_type = ESP
1713 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1715 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1716 sa.encrypt(IP(src=p.tun_dst,
1717 dst=self.pg0.local_ip4) /
1719 IP(src=self.pg1.local_ip4,
1720 dst=self.pg1.remote_ip4) /
1721 UDP(sport=1144, dport=2233) /
1722 Raw(b'X' * payload_size))
1723 for i in range(count)]
1725 def gen_pkts(self, sw_intf, src, dst, count=1,
1727 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1728 IP(src="1.1.1.1", dst=dst) /
1729 UDP(sport=1144, dport=2233) /
1730 Raw(b'X' * payload_size)
1731 for i in range(count)]
1733 def verify_decrypted(self, p, rxs):
1735 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1736 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1738 def verify_encrypted(self, p, sa, rxs):
1741 pkt = sa.decrypt(rx[IP])
1742 if not pkt.haslayer(IP):
1743 pkt = IP(pkt[Raw].load)
1744 self.assert_packet_checksums_valid(pkt)
1745 self.assertTrue(pkt.haslayer(GRE))
1747 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1748 except (IndexError, AssertionError):
1749 self.logger.debug(ppp("Unexpected packet:", rx))
1751 self.logger.debug(ppp("Decrypted packet:", pkt))
1757 super(TestIpsecMGreIfEspTra4, self).setUp()
1760 self.tun_if = self.pg0
1761 p = self.ipv4_params
1762 p.tun_if = VppGreInterface(self,
1765 mode=(VppEnum.vl_api_tunnel_mode_t.
1766 TUNNEL_API_MODE_MP))
1767 p.tun_if.add_vpp_config()
1769 p.tun_if.config_ip4()
1770 p.tun_if.generate_remote_hosts(N_NHS)
1771 self.pg0.generate_remote_hosts(N_NHS)
1772 self.pg0.configure_ipv4_neighbors()
1774 # setup some SAs for several next-hops on the interface
1775 self.multi_params = []
1777 for ii in range(N_NHS):
1778 p = copy.copy(self.ipv4_params)
1780 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1781 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1782 p.scapy_tun_spi = p.scapy_tun_spi + ii
1783 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1784 p.vpp_tun_spi = p.vpp_tun_spi + ii
1786 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1787 p.scapy_tra_spi = p.scapy_tra_spi + ii
1788 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1789 p.vpp_tra_spi = p.vpp_tra_spi + ii
1790 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1791 p.auth_algo_vpp_id, p.auth_key,
1792 p.crypt_algo_vpp_id, p.crypt_key,
1793 self.vpp_esp_protocol)
1794 p.tun_sa_out.add_vpp_config()
1796 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1797 p.auth_algo_vpp_id, p.auth_key,
1798 p.crypt_algo_vpp_id, p.crypt_key,
1799 self.vpp_esp_protocol)
1800 p.tun_sa_in.add_vpp_config()
1802 p.tun_protect = VppIpsecTunProtect(
1807 nh=p.tun_if.remote_hosts[ii].ip4)
1808 p.tun_protect.add_vpp_config()
1809 config_tra_params(p, self.encryption_type, p.tun_if)
1810 self.multi_params.append(p)
1812 VppIpRoute(self, p.remote_tun_if_host, 32,
1813 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1814 p.tun_if.sw_if_index)]).add_vpp_config()
1816 # in this v4 variant add the teibs after the protect
1817 p.teib = VppTeib(self, p.tun_if,
1818 p.tun_if.remote_hosts[ii].ip4,
1819 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1820 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1821 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1824 p = self.ipv4_params
1825 p.tun_if.unconfig_ip4()
1826 super(TestIpsecMGreIfEspTra4, self).tearDown()
1828 def test_tun_44(self):
1831 for p in self.multi_params:
1832 self.verify_tun_44(p, count=N_PKTS)
1833 p.teib.remove_vpp_config()
1834 self.verify_tun_dropped_44(p, count=N_PKTS)
1835 p.teib.add_vpp_config()
1836 self.verify_tun_44(p, count=N_PKTS)
1839 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1840 """ Ipsec mGRE ESP v6 TRA tests """
1841 tun6_encrypt_node_name = "esp6-encrypt-tun"
1842 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1843 encryption_type = ESP
1845 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1847 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1848 sa.encrypt(IPv6(src=p.tun_dst,
1849 dst=self.pg0.local_ip6) /
1851 IPv6(src=self.pg1.local_ip6,
1852 dst=self.pg1.remote_ip6) /
1853 UDP(sport=1144, dport=2233) /
1854 Raw(b'X' * payload_size))
1855 for i in range(count)]
1857 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1859 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1860 IPv6(src="1::1", dst=dst) /
1861 UDP(sport=1144, dport=2233) /
1862 Raw(b'X' * payload_size)
1863 for i in range(count)]
1865 def verify_decrypted6(self, p, rxs):
1867 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1868 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1870 def verify_encrypted6(self, p, sa, rxs):
1873 pkt = sa.decrypt(rx[IPv6])
1874 if not pkt.haslayer(IPv6):
1875 pkt = IPv6(pkt[Raw].load)
1876 self.assert_packet_checksums_valid(pkt)
1877 self.assertTrue(pkt.haslayer(GRE))
1879 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1880 except (IndexError, AssertionError):
1881 self.logger.debug(ppp("Unexpected packet:", rx))
1883 self.logger.debug(ppp("Decrypted packet:", pkt))
1889 super(TestIpsecMGreIfEspTra6, self).setUp()
1891 self.vapi.cli("set logging class ipsec level debug")
1894 self.tun_if = self.pg0
1895 p = self.ipv6_params
1896 p.tun_if = VppGreInterface(self,
1899 mode=(VppEnum.vl_api_tunnel_mode_t.
1900 TUNNEL_API_MODE_MP))
1901 p.tun_if.add_vpp_config()
1903 p.tun_if.config_ip6()
1904 p.tun_if.generate_remote_hosts(N_NHS)
1905 self.pg0.generate_remote_hosts(N_NHS)
1906 self.pg0.configure_ipv6_neighbors()
1908 # setup some SAs for several next-hops on the interface
1909 self.multi_params = []
1911 for ii in range(N_NHS):
1912 p = copy.copy(self.ipv6_params)
1914 p.remote_tun_if_host = "1::%d" % (ii + 1)
1915 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1916 p.scapy_tun_spi = p.scapy_tun_spi + ii
1917 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1918 p.vpp_tun_spi = p.vpp_tun_spi + ii
1920 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1921 p.scapy_tra_spi = p.scapy_tra_spi + ii
1922 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1923 p.vpp_tra_spi = p.vpp_tra_spi + ii
1924 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1925 p.auth_algo_vpp_id, p.auth_key,
1926 p.crypt_algo_vpp_id, p.crypt_key,
1927 self.vpp_esp_protocol)
1928 p.tun_sa_out.add_vpp_config()
1930 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1931 p.auth_algo_vpp_id, p.auth_key,
1932 p.crypt_algo_vpp_id, p.crypt_key,
1933 self.vpp_esp_protocol)
1934 p.tun_sa_in.add_vpp_config()
1936 # in this v6 variant add the teibs first then the protection
1937 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1938 VppTeib(self, p.tun_if,
1939 p.tun_if.remote_hosts[ii].ip6,
1940 p.tun_dst).add_vpp_config()
1942 p.tun_protect = VppIpsecTunProtect(
1947 nh=p.tun_if.remote_hosts[ii].ip6)
1948 p.tun_protect.add_vpp_config()
1949 config_tra_params(p, self.encryption_type, p.tun_if)
1950 self.multi_params.append(p)
1952 VppIpRoute(self, p.remote_tun_if_host, 128,
1953 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1954 p.tun_if.sw_if_index)]).add_vpp_config()
1955 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1957 self.logger.info(self.vapi.cli("sh log"))
1958 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1959 self.logger.info(self.vapi.cli("sh adj 41"))
1962 p = self.ipv6_params
1963 p.tun_if.unconfig_ip6()
1964 super(TestIpsecMGreIfEspTra6, self).tearDown()
1966 def test_tun_66(self):
1968 for p in self.multi_params:
1969 self.verify_tun_66(p, count=63)
1972 @tag_fixme_vpp_workers
1973 class TestIpsec4TunProtect(TemplateIpsec,
1974 TemplateIpsec4TunProtect,
1976 """ IPsec IPv4 Tunnel protect - transport mode"""
1979 super(TestIpsec4TunProtect, self).setUp()
1981 self.tun_if = self.pg0
1984 super(TestIpsec4TunProtect, self).tearDown()
1986 def test_tun_44(self):
1987 """IPSEC tunnel protect"""
1989 p = self.ipv4_params
1991 self.config_network(p)
1992 self.config_sa_tra(p)
1993 self.config_protect(p)
1995 self.verify_tun_44(p, count=127)
1996 self.assertEqual(p.tun_if.get_rx_stats(), 127)
1997 self.assertEqual(p.tun_if.get_tx_stats(), 127)
1999 self.vapi.cli("clear ipsec sa")
2000 self.verify_tun_64(p, count=127)
2001 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2002 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2004 # rekey - create new SAs and update the tunnel protection
2006 np.crypt_key = b'X' + p.crypt_key[1:]
2007 np.scapy_tun_spi += 100
2008 np.scapy_tun_sa_id += 1
2009 np.vpp_tun_spi += 100
2010 np.vpp_tun_sa_id += 1
2011 np.tun_if.local_spi = p.vpp_tun_spi
2012 np.tun_if.remote_spi = p.scapy_tun_spi
2014 self.config_sa_tra(np)
2015 self.config_protect(np)
2018 self.verify_tun_44(np, count=127)
2019 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2020 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2023 self.unconfig_protect(np)
2024 self.unconfig_sa(np)
2025 self.unconfig_network(p)
2028 @tag_fixme_vpp_workers
2029 class TestIpsec4TunProtectUdp(TemplateIpsec,
2030 TemplateIpsec4TunProtect,
2032 """ IPsec IPv4 Tunnel protect - transport mode"""
2035 super(TestIpsec4TunProtectUdp, self).setUp()
2037 self.tun_if = self.pg0
2039 p = self.ipv4_params
2040 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2041 IPSEC_API_SAD_FLAG_UDP_ENCAP)
2042 p.nat_header = UDP(sport=4500, dport=4500)
2043 self.config_network(p)
2044 self.config_sa_tra(p)
2045 self.config_protect(p)
2048 p = self.ipv4_params
2049 self.unconfig_protect(p)
2051 self.unconfig_network(p)
2052 super(TestIpsec4TunProtectUdp, self).tearDown()
2054 def verify_encrypted(self, p, sa, rxs):
2055 # ensure encrypted packets are recieved with the default UDP ports
2057 self.assertEqual(rx[UDP].sport, 4500)
2058 self.assertEqual(rx[UDP].dport, 4500)
2059 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2061 def test_tun_44(self):
2062 """IPSEC UDP tunnel protect"""
2064 p = self.ipv4_params
2066 self.verify_tun_44(p, count=127)
2067 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2068 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2070 def test_keepalive(self):
2071 """ IPSEC NAT Keepalive """
2072 self.verify_keepalive(self.ipv4_params)
2075 @tag_fixme_vpp_workers
2076 class TestIpsec4TunProtectTun(TemplateIpsec,
2077 TemplateIpsec4TunProtect,
2079 """ IPsec IPv4 Tunnel protect - tunnel mode"""
2081 encryption_type = ESP
2082 tun4_encrypt_node_name = "esp4-encrypt-tun"
2083 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2086 super(TestIpsec4TunProtectTun, self).setUp()
2088 self.tun_if = self.pg0
2091 super(TestIpsec4TunProtectTun, self).tearDown()
2093 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2095 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2096 sa.encrypt(IP(src=sw_intf.remote_ip4,
2097 dst=sw_intf.local_ip4) /
2098 IP(src=src, dst=dst) /
2099 UDP(sport=1144, dport=2233) /
2100 Raw(b'X' * payload_size))
2101 for i in range(count)]
2103 def gen_pkts(self, sw_intf, src, dst, count=1,
2105 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2106 IP(src=src, dst=dst) /
2107 UDP(sport=1144, dport=2233) /
2108 Raw(b'X' * payload_size)
2109 for i in range(count)]
2111 def verify_decrypted(self, p, rxs):
2113 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2114 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2115 self.assert_packet_checksums_valid(rx)
2117 def verify_encrypted(self, p, sa, rxs):
2120 pkt = sa.decrypt(rx[IP])
2121 if not pkt.haslayer(IP):
2122 pkt = IP(pkt[Raw].load)
2123 self.assert_packet_checksums_valid(pkt)
2124 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2125 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2126 inner = pkt[IP].payload
2127 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2129 except (IndexError, AssertionError):
2130 self.logger.debug(ppp("Unexpected packet:", rx))
2132 self.logger.debug(ppp("Decrypted packet:", pkt))
2137 def test_tun_44(self):
2138 """IPSEC tunnel protect """
2140 p = self.ipv4_params
2142 self.config_network(p)
2143 self.config_sa_tun(p)
2144 self.config_protect(p)
2146 # also add an output features on the tunnel and physical interface
2147 # so we test they still work
2148 r_all = AclRule(True,
2149 src_prefix="0.0.0.0/0",
2150 dst_prefix="0.0.0.0/0",
2152 a = VppAcl(self, [r_all]).add_vpp_config()
2154 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2155 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2157 self.verify_tun_44(p, count=127)
2159 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2160 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2162 # rekey - create new SAs and update the tunnel protection
2164 np.crypt_key = b'X' + p.crypt_key[1:]
2165 np.scapy_tun_spi += 100
2166 np.scapy_tun_sa_id += 1
2167 np.vpp_tun_spi += 100
2168 np.vpp_tun_sa_id += 1
2169 np.tun_if.local_spi = p.vpp_tun_spi
2170 np.tun_if.remote_spi = p.scapy_tun_spi
2172 self.config_sa_tun(np)
2173 self.config_protect(np)
2176 self.verify_tun_44(np, count=127)
2177 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2178 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2181 self.unconfig_protect(np)
2182 self.unconfig_sa(np)
2183 self.unconfig_network(p)
2186 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2187 TemplateIpsec4TunProtect,
2189 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2191 encryption_type = ESP
2192 tun4_encrypt_node_name = "esp4-encrypt-tun"
2193 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2196 super(TestIpsec4TunProtectTunDrop, self).setUp()
2198 self.tun_if = self.pg0
2201 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2203 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2205 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2206 sa.encrypt(IP(src=sw_intf.remote_ip4,
2208 IP(src=src, dst=dst) /
2209 UDP(sport=1144, dport=2233) /
2210 Raw(b'X' * payload_size))
2211 for i in range(count)]
2213 def test_tun_drop_44(self):
2214 """IPSEC tunnel protect bogus tunnel header """
2216 p = self.ipv4_params
2218 self.config_network(p)
2219 self.config_sa_tun(p)
2220 self.config_protect(p)
2222 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2223 src=p.remote_tun_if_host,
2224 dst=self.pg1.remote_ip4,
2226 self.send_and_assert_no_replies(self.tun_if, tx)
2229 self.unconfig_protect(p)
2231 self.unconfig_network(p)
2234 @tag_fixme_vpp_workers
2235 class TestIpsec6TunProtect(TemplateIpsec,
2236 TemplateIpsec6TunProtect,
2238 """ IPsec IPv6 Tunnel protect - transport mode"""
2240 encryption_type = ESP
2241 tun6_encrypt_node_name = "esp6-encrypt-tun"
2242 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2245 super(TestIpsec6TunProtect, self).setUp()
2247 self.tun_if = self.pg0
2250 super(TestIpsec6TunProtect, self).tearDown()
2252 def test_tun_66(self):
2253 """IPSEC tunnel protect 6o6"""
2255 p = self.ipv6_params
2257 self.config_network(p)
2258 self.config_sa_tra(p)
2259 self.config_protect(p)
2261 self.verify_tun_66(p, count=127)
2262 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2263 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2265 # rekey - create new SAs and update the tunnel protection
2267 np.crypt_key = b'X' + p.crypt_key[1:]
2268 np.scapy_tun_spi += 100
2269 np.scapy_tun_sa_id += 1
2270 np.vpp_tun_spi += 100
2271 np.vpp_tun_sa_id += 1
2272 np.tun_if.local_spi = p.vpp_tun_spi
2273 np.tun_if.remote_spi = p.scapy_tun_spi
2275 self.config_sa_tra(np)
2276 self.config_protect(np)
2279 self.verify_tun_66(np, count=127)
2280 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2281 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2283 # bounce the interface state
2284 p.tun_if.admin_down()
2285 self.verify_drop_tun_66(np, count=127)
2286 node = ('/err/ipsec6-tun-input/%s' %
2287 'ipsec packets received on disabled interface')
2288 self.assertEqual(127, self.statistics.get_err_counter(node))
2290 self.verify_tun_66(np, count=127)
2293 # 1) add two input SAs [old, new]
2294 # 2) swap output SA to [new]
2295 # 3) use only [new] input SA
2297 np3.crypt_key = b'Z' + p.crypt_key[1:]
2298 np3.scapy_tun_spi += 100
2299 np3.scapy_tun_sa_id += 1
2300 np3.vpp_tun_spi += 100
2301 np3.vpp_tun_sa_id += 1
2302 np3.tun_if.local_spi = p.vpp_tun_spi
2303 np3.tun_if.remote_spi = p.scapy_tun_spi
2305 self.config_sa_tra(np3)
2308 p.tun_protect.update_vpp_config(np.tun_sa_out,
2309 [np.tun_sa_in, np3.tun_sa_in])
2310 self.verify_tun_66(np, np, count=127)
2311 self.verify_tun_66(np3, np, count=127)
2314 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2315 [np.tun_sa_in, np3.tun_sa_in])
2316 self.verify_tun_66(np, np3, count=127)
2317 self.verify_tun_66(np3, np3, count=127)
2320 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2322 self.verify_tun_66(np3, np3, count=127)
2323 self.verify_drop_tun_66(np, count=127)
2325 self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
2326 self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
2327 self.unconfig_sa(np)
2330 self.unconfig_protect(np3)
2331 self.unconfig_sa(np3)
2332 self.unconfig_network(p)
2334 def test_tun_46(self):
2335 """IPSEC tunnel protect 4o6"""
2337 p = self.ipv6_params
2339 self.config_network(p)
2340 self.config_sa_tra(p)
2341 self.config_protect(p)
2343 self.verify_tun_46(p, count=127)
2344 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2345 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2348 self.unconfig_protect(p)
2350 self.unconfig_network(p)
2353 @tag_fixme_vpp_workers
2354 class TestIpsec6TunProtectTun(TemplateIpsec,
2355 TemplateIpsec6TunProtect,
2357 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2359 encryption_type = ESP
2360 tun6_encrypt_node_name = "esp6-encrypt-tun"
2361 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2364 super(TestIpsec6TunProtectTun, self).setUp()
2366 self.tun_if = self.pg0
2369 super(TestIpsec6TunProtectTun, self).tearDown()
2371 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2373 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2374 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2375 dst=sw_intf.local_ip6) /
2376 IPv6(src=src, dst=dst) /
2377 UDP(sport=1166, dport=2233) /
2378 Raw(b'X' * payload_size))
2379 for i in range(count)]
2381 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2383 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2384 IPv6(src=src, dst=dst) /
2385 UDP(sport=1166, dport=2233) /
2386 Raw(b'X' * payload_size)
2387 for i in range(count)]
2389 def verify_decrypted6(self, p, rxs):
2391 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2392 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2393 self.assert_packet_checksums_valid(rx)
2395 def verify_encrypted6(self, p, sa, rxs):
2398 pkt = sa.decrypt(rx[IPv6])
2399 if not pkt.haslayer(IPv6):
2400 pkt = IPv6(pkt[Raw].load)
2401 self.assert_packet_checksums_valid(pkt)
2402 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2403 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2404 inner = pkt[IPv6].payload
2405 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2407 except (IndexError, AssertionError):
2408 self.logger.debug(ppp("Unexpected packet:", rx))
2410 self.logger.debug(ppp("Decrypted packet:", pkt))
2415 def test_tun_66(self):
2416 """IPSEC tunnel protect """
2418 p = self.ipv6_params
2420 self.config_network(p)
2421 self.config_sa_tun(p)
2422 self.config_protect(p)
2424 self.verify_tun_66(p, count=127)
2426 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2427 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2429 # rekey - create new SAs and update the tunnel protection
2431 np.crypt_key = b'X' + p.crypt_key[1:]
2432 np.scapy_tun_spi += 100
2433 np.scapy_tun_sa_id += 1
2434 np.vpp_tun_spi += 100
2435 np.vpp_tun_sa_id += 1
2436 np.tun_if.local_spi = p.vpp_tun_spi
2437 np.tun_if.remote_spi = p.scapy_tun_spi
2439 self.config_sa_tun(np)
2440 self.config_protect(np)
2443 self.verify_tun_66(np, count=127)
2444 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2445 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2448 self.unconfig_protect(np)
2449 self.unconfig_sa(np)
2450 self.unconfig_network(p)
2453 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2454 TemplateIpsec6TunProtect,
2456 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2458 encryption_type = ESP
2459 tun6_encrypt_node_name = "esp6-encrypt-tun"
2460 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2463 super(TestIpsec6TunProtectTunDrop, self).setUp()
2465 self.tun_if = self.pg0
2468 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2470 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2472 # the IP destination of the revelaed packet does not match
2473 # that assigned to the tunnel
2474 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2475 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2477 IPv6(src=src, dst=dst) /
2478 UDP(sport=1144, dport=2233) /
2479 Raw(b'X' * payload_size))
2480 for i in range(count)]
2482 def test_tun_drop_66(self):
2483 """IPSEC 6 tunnel protect bogus tunnel header """
2485 p = self.ipv6_params
2487 self.config_network(p)
2488 self.config_sa_tun(p)
2489 self.config_protect(p)
2491 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2492 src=p.remote_tun_if_host,
2493 dst=self.pg1.remote_ip6,
2495 self.send_and_assert_no_replies(self.tun_if, tx)
2497 self.unconfig_protect(p)
2499 self.unconfig_network(p)
2502 class TemplateIpsecItf4(object):
2503 """ IPsec Interface IPv4 """
2505 encryption_type = ESP
2506 tun4_encrypt_node_name = "esp4-encrypt-tun"
2507 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2508 tun4_input_node = "ipsec4-tun-input"
2510 def config_sa_tun(self, p, src, dst):
2511 config_tun_params(p, self.encryption_type, None, src, dst)
2513 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2514 p.auth_algo_vpp_id, p.auth_key,
2515 p.crypt_algo_vpp_id, p.crypt_key,
2516 self.vpp_esp_protocol,
2519 p.tun_sa_out.add_vpp_config()
2521 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2522 p.auth_algo_vpp_id, p.auth_key,
2523 p.crypt_algo_vpp_id, p.crypt_key,
2524 self.vpp_esp_protocol,
2527 p.tun_sa_in.add_vpp_config()
2529 def config_protect(self, p):
2530 p.tun_protect = VppIpsecTunProtect(self,
2534 p.tun_protect.add_vpp_config()
2536 def config_network(self, p, instance=0xffffffff):
2537 p.tun_if = VppIpsecInterface(self, instance=instance)
2539 p.tun_if.add_vpp_config()
2541 p.tun_if.config_ip4()
2542 p.tun_if.config_ip6()
2544 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2545 [VppRoutePath(p.tun_if.remote_ip4,
2547 p.route.add_vpp_config()
2548 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2549 [VppRoutePath(p.tun_if.remote_ip6,
2551 proto=DpoProto.DPO_PROTO_IP6)])
2554 def unconfig_network(self, p):
2555 p.route.remove_vpp_config()
2556 p.tun_if.remove_vpp_config()
2558 def unconfig_protect(self, p):
2559 p.tun_protect.remove_vpp_config()
2561 def unconfig_sa(self, p):
2562 p.tun_sa_out.remove_vpp_config()
2563 p.tun_sa_in.remove_vpp_config()
2566 @tag_fixme_vpp_workers
2567 class TestIpsecItf4(TemplateIpsec,
2570 """ IPsec Interface IPv4 """
2573 super(TestIpsecItf4, self).setUp()
2575 self.tun_if = self.pg0
2578 super(TestIpsecItf4, self).tearDown()
2580 def test_tun_instance_44(self):
2581 p = self.ipv4_params
2582 self.config_network(p, instance=3)
2584 with self.assertRaises(CliFailedCommandError):
2585 self.vapi.cli("show interface ipsec0")
2587 output = self.vapi.cli("show interface ipsec3")
2588 self.assertTrue("unknown" not in output)
2590 self.unconfig_network(p)
2592 def test_tun_44(self):
2593 """IPSEC interface IPv4"""
2596 p = self.ipv4_params
2598 self.config_network(p)
2599 self.config_sa_tun(p,
2601 self.pg0.remote_ip4)
2602 self.config_protect(p)
2604 self.verify_tun_44(p, count=n_pkts)
2605 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2606 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2608 p.tun_if.admin_down()
2609 self.verify_tun_dropped_44(p, count=n_pkts)
2611 self.verify_tun_44(p, count=n_pkts)
2613 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2614 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2616 # it's a v6 packet when its encrypted
2617 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2619 self.verify_tun_64(p, count=n_pkts)
2620 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2621 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2623 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2625 self.vapi.cli("clear interfaces")
2627 # rekey - create new SAs and update the tunnel protection
2629 np.crypt_key = b'X' + p.crypt_key[1:]
2630 np.scapy_tun_spi += 100
2631 np.scapy_tun_sa_id += 1
2632 np.vpp_tun_spi += 100
2633 np.vpp_tun_sa_id += 1
2634 np.tun_if.local_spi = p.vpp_tun_spi
2635 np.tun_if.remote_spi = p.scapy_tun_spi
2637 self.config_sa_tun(np,
2639 self.pg0.remote_ip4)
2640 self.config_protect(np)
2643 self.verify_tun_44(np, count=n_pkts)
2644 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2645 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2648 self.unconfig_protect(np)
2649 self.unconfig_sa(np)
2650 self.unconfig_network(p)
2652 def test_tun_44_null(self):
2653 """IPSEC interface IPv4 NULL auth/crypto"""
2656 p = copy.copy(self.ipv4_params)
2658 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2659 IPSEC_API_INTEG_ALG_NONE)
2660 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2661 IPSEC_API_CRYPTO_ALG_NONE)
2662 p.crypt_algo = "NULL"
2663 p.auth_algo = "NULL"
2665 self.config_network(p)
2666 self.config_sa_tun(p,
2668 self.pg0.remote_ip4)
2669 self.config_protect(p)
2671 self.verify_tun_44(p, count=n_pkts)
2674 self.unconfig_protect(p)
2676 self.unconfig_network(p)
2678 def test_tun_44_police(self):
2679 """IPSEC interface IPv4 with input policer"""
2681 p = self.ipv4_params
2683 self.config_network(p)
2684 self.config_sa_tun(p,
2686 self.pg0.remote_ip4)
2687 self.config_protect(p)
2689 action_tx = PolicerAction(
2690 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2692 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2693 conform_action=action_tx,
2694 exceed_action=action_tx,
2695 violate_action=action_tx)
2696 policer.add_vpp_config()
2698 # Start policing on tun
2699 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2701 self.verify_tun_44(p, count=n_pkts)
2702 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2703 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2705 stats = policer.get_stats()
2707 # Single rate, 2 colour policer - expect conform, violate but no exceed
2708 self.assertGreater(stats['conform_packets'], 0)
2709 self.assertEqual(stats['exceed_packets'], 0)
2710 self.assertGreater(stats['violate_packets'], 0)
2712 # Stop policing on tun
2713 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2714 self.verify_tun_44(p, count=n_pkts)
2716 # No new policer stats
2717 statsnew = policer.get_stats()
2718 self.assertEqual(stats, statsnew)
2721 policer.remove_vpp_config()
2722 self.unconfig_protect(p)
2724 self.unconfig_network(p)
2727 class TestIpsecItf4MPLS(TemplateIpsec,
2730 """ IPsec Interface MPLSoIPv4 """
2732 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2735 super(TestIpsecItf4MPLS, self).setUp()
2737 self.tun_if = self.pg0
2740 super(TestIpsecItf4MPLS, self).tearDown()
2742 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2744 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2745 sa.encrypt(MPLS(label=44, ttl=3) /
2746 IP(src=src, dst=dst) /
2747 UDP(sport=1166, dport=2233) /
2748 Raw(b'X' * payload_size))
2749 for i in range(count)]
2751 def verify_encrypted(self, p, sa, rxs):
2754 pkt = sa.decrypt(rx[IP])
2755 if not pkt.haslayer(IP):
2756 pkt = IP(pkt[Raw].load)
2757 self.assert_packet_checksums_valid(pkt)
2758 self.assert_equal(pkt[MPLS].label, 44)
2759 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2760 except (IndexError, AssertionError):
2761 self.logger.debug(ppp("Unexpected packet:", rx))
2763 self.logger.debug(ppp("Decrypted packet:", pkt))
2768 def test_tun_mpls_o_ip4(self):
2769 """IPSEC interface MPLS over IPv4"""
2772 p = self.ipv4_params
2775 tbl = VppMplsTable(self, 0)
2776 tbl.add_vpp_config()
2778 self.config_network(p)
2779 # deag MPLS routes from the tunnel
2780 r4 = VppMplsRoute(self, 44, 1,
2782 self.pg1.remote_ip4,
2783 self.pg1.sw_if_index)]).add_vpp_config()
2784 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2785 p.tun_if.sw_if_index,
2786 labels=[VppMplsLabel(44)])])
2787 p.tun_if.enable_mpls()
2789 self.config_sa_tun(p,
2791 self.pg0.remote_ip4)
2792 self.config_protect(p)
2794 self.verify_tun_44(p, count=n_pkts)
2797 p.tun_if.disable_mpls()
2798 self.unconfig_protect(p)
2800 self.unconfig_network(p)
2803 class TemplateIpsecItf6(object):
2804 """ IPsec Interface IPv6 """
2806 encryption_type = ESP
2807 tun6_encrypt_node_name = "esp6-encrypt-tun"
2808 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2809 tun6_input_node = "ipsec6-tun-input"
2811 def config_sa_tun(self, p, src, dst):
2812 config_tun_params(p, self.encryption_type, None, src, dst)
2814 if not hasattr(p, 'tun_flags'):
2816 if not hasattr(p, 'hop_limit'):
2819 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2820 p.auth_algo_vpp_id, p.auth_key,
2821 p.crypt_algo_vpp_id, p.crypt_key,
2822 self.vpp_esp_protocol,
2825 tun_flags=p.tun_flags,
2826 hop_limit=p.hop_limit)
2827 p.tun_sa_out.add_vpp_config()
2829 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2830 p.auth_algo_vpp_id, p.auth_key,
2831 p.crypt_algo_vpp_id, p.crypt_key,
2832 self.vpp_esp_protocol,
2835 p.tun_sa_in.add_vpp_config()
2837 def config_protect(self, p):
2838 p.tun_protect = VppIpsecTunProtect(self,
2842 p.tun_protect.add_vpp_config()
2844 def config_network(self, p):
2845 p.tun_if = VppIpsecInterface(self)
2847 p.tun_if.add_vpp_config()
2849 p.tun_if.config_ip4()
2850 p.tun_if.config_ip6()
2852 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2853 [VppRoutePath(p.tun_if.remote_ip4,
2857 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2858 [VppRoutePath(p.tun_if.remote_ip6,
2860 proto=DpoProto.DPO_PROTO_IP6)])
2861 p.route.add_vpp_config()
2863 def unconfig_network(self, p):
2864 p.route.remove_vpp_config()
2865 p.tun_if.remove_vpp_config()
2867 def unconfig_protect(self, p):
2868 p.tun_protect.remove_vpp_config()
2870 def unconfig_sa(self, p):
2871 p.tun_sa_out.remove_vpp_config()
2872 p.tun_sa_in.remove_vpp_config()
2875 @tag_fixme_vpp_workers
2876 class TestIpsecItf6(TemplateIpsec,
2879 """ IPsec Interface IPv6 """
2882 super(TestIpsecItf6, self).setUp()
2884 self.tun_if = self.pg0
2887 super(TestIpsecItf6, self).tearDown()
2889 def test_tun_44(self):
2890 """IPSEC interface IPv6"""
2892 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2894 p = self.ipv6_params
2895 p.inner_hop_limit = 24
2896 p.outer_hop_limit = 23
2897 p.outer_flow_label = 243224
2898 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2900 self.config_network(p)
2901 self.config_sa_tun(p,
2903 self.pg0.remote_ip6)
2904 self.config_protect(p)
2906 self.verify_tun_66(p, count=n_pkts)
2907 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2908 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2910 p.tun_if.admin_down()
2911 self.verify_drop_tun_66(p, count=n_pkts)
2913 self.verify_tun_66(p, count=n_pkts)
2915 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2916 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2918 # it's a v4 packet when its encrypted
2919 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2921 self.verify_tun_46(p, count=n_pkts)
2922 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2923 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2925 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2927 self.vapi.cli("clear interfaces")
2929 # rekey - create new SAs and update the tunnel protection
2931 np.crypt_key = b'X' + p.crypt_key[1:]
2932 np.scapy_tun_spi += 100
2933 np.scapy_tun_sa_id += 1
2934 np.vpp_tun_spi += 100
2935 np.vpp_tun_sa_id += 1
2936 np.tun_if.local_spi = p.vpp_tun_spi
2937 np.tun_if.remote_spi = p.scapy_tun_spi
2938 np.inner_hop_limit = 24
2939 np.outer_hop_limit = 128
2940 np.inner_flow_label = 0xabcde
2941 np.outer_flow_label = 0xabcde
2943 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2945 self.config_sa_tun(np,
2947 self.pg0.remote_ip6)
2948 self.config_protect(np)
2951 self.verify_tun_66(np, count=n_pkts)
2952 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2953 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2956 self.unconfig_protect(np)
2957 self.unconfig_sa(np)
2958 self.unconfig_network(p)
2960 def test_tun_66_police(self):
2961 """IPSEC interface IPv6 with input policer"""
2962 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2964 p = self.ipv6_params
2965 p.inner_hop_limit = 24
2966 p.outer_hop_limit = 23
2967 p.outer_flow_label = 243224
2968 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2970 self.config_network(p)
2971 self.config_sa_tun(p,
2973 self.pg0.remote_ip6)
2974 self.config_protect(p)
2976 action_tx = PolicerAction(
2977 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2979 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2980 conform_action=action_tx,
2981 exceed_action=action_tx,
2982 violate_action=action_tx)
2983 policer.add_vpp_config()
2985 # Start policing on tun
2986 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2988 self.verify_tun_66(p, count=n_pkts)
2989 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2990 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2992 stats = policer.get_stats()
2994 # Single rate, 2 colour policer - expect conform, violate but no exceed
2995 self.assertGreater(stats['conform_packets'], 0)
2996 self.assertEqual(stats['exceed_packets'], 0)
2997 self.assertGreater(stats['violate_packets'], 0)
2999 # Stop policing on tun
3000 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3001 self.verify_tun_66(p, count=n_pkts)
3003 # No new policer stats
3004 statsnew = policer.get_stats()
3005 self.assertEqual(stats, statsnew)
3008 policer.remove_vpp_config()
3009 self.unconfig_protect(p)
3011 self.unconfig_network(p)
3014 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3015 """ Ipsec P2MP ESP v4 tests """
3016 tun4_encrypt_node_name = "esp4-encrypt-tun"
3017 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3018 encryption_type = ESP
3020 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3022 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3023 sa.encrypt(IP(src=self.pg1.local_ip4,
3024 dst=self.pg1.remote_ip4) /
3025 UDP(sport=1144, dport=2233) /
3026 Raw(b'X' * payload_size))
3027 for i in range(count)]
3029 def gen_pkts(self, sw_intf, src, dst, count=1,
3031 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3032 IP(src="1.1.1.1", dst=dst) /
3033 UDP(sport=1144, dport=2233) /
3034 Raw(b'X' * payload_size)
3035 for i in range(count)]
3037 def verify_decrypted(self, p, rxs):
3039 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3040 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3042 def verify_encrypted(self, p, sa, rxs):
3045 self.assertEqual(rx[IP].tos,
3046 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3047 self.assertEqual(rx[IP].ttl, p.hop_limit)
3048 pkt = sa.decrypt(rx[IP])
3049 if not pkt.haslayer(IP):
3050 pkt = IP(pkt[Raw].load)
3051 self.assert_packet_checksums_valid(pkt)
3053 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3054 except (IndexError, AssertionError):
3055 self.logger.debug(ppp("Unexpected packet:", rx))
3057 self.logger.debug(ppp("Decrypted packet:", pkt))
3063 super(TestIpsecMIfEsp4, self).setUp()
3066 self.tun_if = self.pg0
3067 p = self.ipv4_params
3068 p.tun_if = VppIpsecInterface(self,
3069 mode=(VppEnum.vl_api_tunnel_mode_t.
3070 TUNNEL_API_MODE_MP))
3071 p.tun_if.add_vpp_config()
3073 p.tun_if.config_ip4()
3074 p.tun_if.unconfig_ip4()
3075 p.tun_if.config_ip4()
3076 p.tun_if.generate_remote_hosts(N_NHS)
3077 self.pg0.generate_remote_hosts(N_NHS)
3078 self.pg0.configure_ipv4_neighbors()
3080 # setup some SAs for several next-hops on the interface
3081 self.multi_params = []
3083 for ii in range(N_NHS):
3084 p = copy.copy(self.ipv4_params)
3086 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3087 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3088 p.scapy_tun_spi = p.scapy_tun_spi + ii
3089 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3090 p.vpp_tun_spi = p.vpp_tun_spi + ii
3092 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3093 p.scapy_tra_spi = p.scapy_tra_spi + ii
3094 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3095 p.vpp_tra_spi = p.vpp_tra_spi + ii
3097 p.tun_sa_out = VppIpsecSA(
3098 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3099 p.auth_algo_vpp_id, p.auth_key,
3100 p.crypt_algo_vpp_id, p.crypt_key,
3101 self.vpp_esp_protocol,
3103 self.pg0.remote_hosts[ii].ip4,
3104 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3105 hop_limit=p.hop_limit)
3106 p.tun_sa_out.add_vpp_config()
3108 p.tun_sa_in = VppIpsecSA(
3109 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3110 p.auth_algo_vpp_id, p.auth_key,
3111 p.crypt_algo_vpp_id, p.crypt_key,
3112 self.vpp_esp_protocol,
3113 self.pg0.remote_hosts[ii].ip4,
3115 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3116 hop_limit=p.hop_limit)
3117 p.tun_sa_in.add_vpp_config()
3119 p.tun_protect = VppIpsecTunProtect(
3124 nh=p.tun_if.remote_hosts[ii].ip4)
3125 p.tun_protect.add_vpp_config()
3126 config_tun_params(p, self.encryption_type, None,
3128 self.pg0.remote_hosts[ii].ip4)
3129 self.multi_params.append(p)
3131 VppIpRoute(self, p.remote_tun_if_host, 32,
3132 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3133 p.tun_if.sw_if_index)]).add_vpp_config()
3135 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3138 p = self.ipv4_params
3139 p.tun_if.unconfig_ip4()
3140 super(TestIpsecMIfEsp4, self).tearDown()
3142 def test_tun_44(self):
3145 for p in self.multi_params:
3146 self.verify_tun_44(p, count=N_PKTS)
3149 class TestIpsecItf6MPLS(TemplateIpsec,
3152 """ IPsec Interface MPLSoIPv6 """
3154 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3157 super(TestIpsecItf6MPLS, self).setUp()
3159 self.tun_if = self.pg0
3162 super(TestIpsecItf6MPLS, self).tearDown()
3164 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3166 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3167 sa.encrypt(MPLS(label=66, ttl=3) /
3168 IPv6(src=src, dst=dst) /
3169 UDP(sport=1166, dport=2233) /
3170 Raw(b'X' * payload_size))
3171 for i in range(count)]
3173 def verify_encrypted6(self, p, sa, rxs):
3176 pkt = sa.decrypt(rx[IPv6])
3177 if not pkt.haslayer(IPv6):
3178 pkt = IP(pkt[Raw].load)
3179 self.assert_packet_checksums_valid(pkt)
3180 self.assert_equal(pkt[MPLS].label, 66)
3181 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3182 except (IndexError, AssertionError):
3183 self.logger.debug(ppp("Unexpected packet:", rx))
3185 self.logger.debug(ppp("Decrypted packet:", pkt))
3190 def test_tun_mpls_o_ip6(self):
3191 """IPSEC interface MPLS over IPv6"""
3194 p = self.ipv6_params
3197 tbl = VppMplsTable(self, 0)
3198 tbl.add_vpp_config()
3200 self.config_network(p)
3201 # deag MPLS routes from the tunnel
3202 r6 = VppMplsRoute(self, 66, 1,
3204 self.pg1.remote_ip6,
3205 self.pg1.sw_if_index)],
3206 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3207 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3208 p.tun_if.sw_if_index,
3209 labels=[VppMplsLabel(66)])])
3210 p.tun_if.enable_mpls()
3212 self.config_sa_tun(p,
3214 self.pg0.remote_ip6)
3215 self.config_protect(p)
3217 self.verify_tun_66(p, count=n_pkts)
3220 p.tun_if.disable_mpls()
3221 self.unconfig_protect(p)
3223 self.unconfig_network(p)
3226 if __name__ == '__main__':
3227 unittest.main(testRunner=VppTestRunner)