5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
15 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19 VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34 IPSEC_API_SAD_FLAG_USE_ESN))
35 crypt_key = mk_scapy_crypt_key(p)
37 p.tun_dst = tun_if.remote_ip
38 p.tun_src = tun_if.local_ip
43 p.scapy_tun_sa = SecurityAssociation(
44 encryption_type, spi=p.vpp_tun_spi,
45 crypt_algo=p.crypt_algo,
47 auth_algo=p.auth_algo, auth_key=p.auth_key,
48 tunnel_header=ip_class_by_addr_type[p.addr_type](
51 nat_t_header=p.nat_header,
53 p.vpp_tun_sa = SecurityAssociation(
54 encryption_type, spi=p.scapy_tun_spi,
55 crypt_algo=p.crypt_algo,
57 auth_algo=p.auth_algo, auth_key=p.auth_key,
58 tunnel_header=ip_class_by_addr_type[p.addr_type](
61 nat_t_header=p.nat_header,
65 def config_tra_params(p, encryption_type, tun_if):
66 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
67 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
68 IPSEC_API_SAD_FLAG_USE_ESN))
69 crypt_key = mk_scapy_crypt_key(p)
70 p.tun_dst = tun_if.remote_ip
71 p.tun_src = tun_if.local_ip
72 p.scapy_tun_sa = SecurityAssociation(
73 encryption_type, spi=p.vpp_tun_spi,
74 crypt_algo=p.crypt_algo,
76 auth_algo=p.auth_algo, auth_key=p.auth_key,
78 nat_t_header=p.nat_header)
79 p.vpp_tun_sa = SecurityAssociation(
80 encryption_type, spi=p.scapy_tun_spi,
81 crypt_algo=p.crypt_algo,
83 auth_algo=p.auth_algo, auth_key=p.auth_key,
85 nat_t_header=p.nat_header)
88 class TemplateIpsec4TunProtect(object):
89 """ IPsec IPv4 Tunnel protect """
92 tun4_encrypt_node_name = "esp4-encrypt-tun"
93 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
94 tun4_input_node = "ipsec4-tun-input"
96 def config_sa_tra(self, p):
97 config_tun_params(p, self.encryption_type, p.tun_if)
99 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
100 p.auth_algo_vpp_id, p.auth_key,
101 p.crypt_algo_vpp_id, p.crypt_key,
102 self.vpp_esp_protocol,
104 p.tun_sa_out.add_vpp_config()
106 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
107 p.auth_algo_vpp_id, p.auth_key,
108 p.crypt_algo_vpp_id, p.crypt_key,
109 self.vpp_esp_protocol,
111 p.tun_sa_in.add_vpp_config()
113 def config_sa_tun(self, p):
114 config_tun_params(p, self.encryption_type, p.tun_if)
116 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
117 p.auth_algo_vpp_id, p.auth_key,
118 p.crypt_algo_vpp_id, p.crypt_key,
119 self.vpp_esp_protocol,
120 self.tun_if.local_addr[p.addr_type],
121 self.tun_if.remote_addr[p.addr_type],
123 p.tun_sa_out.add_vpp_config()
125 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
126 p.auth_algo_vpp_id, p.auth_key,
127 p.crypt_algo_vpp_id, p.crypt_key,
128 self.vpp_esp_protocol,
129 self.tun_if.remote_addr[p.addr_type],
130 self.tun_if.local_addr[p.addr_type],
132 p.tun_sa_in.add_vpp_config()
134 def config_protect(self, p):
135 p.tun_protect = VppIpsecTunProtect(self,
139 p.tun_protect.add_vpp_config()
141 def config_network(self, p):
142 if hasattr(p, 'tun_dst'):
145 tun_dst = self.pg0.remote_ip4
146 p.tun_if = VppIpIpTunInterface(self, self.pg0,
149 p.tun_if.add_vpp_config()
151 p.tun_if.config_ip4()
152 p.tun_if.config_ip6()
154 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
155 [VppRoutePath(p.tun_if.remote_ip4,
157 p.route.add_vpp_config()
158 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
159 [VppRoutePath(p.tun_if.remote_ip6,
161 proto=DpoProto.DPO_PROTO_IP6)])
164 def unconfig_network(self, p):
165 p.route.remove_vpp_config()
166 p.tun_if.remove_vpp_config()
168 def unconfig_protect(self, p):
169 p.tun_protect.remove_vpp_config()
171 def unconfig_sa(self, p):
172 p.tun_sa_out.remove_vpp_config()
173 p.tun_sa_in.remove_vpp_config()
176 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
178 """ IPsec tunnel interface tests """
180 encryption_type = ESP
184 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
187 def tearDownClass(cls):
188 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
191 super(TemplateIpsec4TunIfEsp, self).setUp()
193 self.tun_if = self.pg0
197 self.config_network(p)
198 self.config_sa_tra(p)
199 self.config_protect(p)
202 super(TemplateIpsec4TunIfEsp, self).tearDown()
205 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
207 """ IPsec UDP tunnel interface tests """
209 tun4_encrypt_node_name = "esp4-encrypt-tun"
210 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
211 encryption_type = ESP
215 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
218 def tearDownClass(cls):
219 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
221 def verify_encrypted(self, p, sa, rxs):
224 # ensure the UDP ports are correct before we decrypt
226 self.assertTrue(rx.haslayer(UDP))
227 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
228 self.assert_equal(rx[UDP].dport, 4500)
230 pkt = sa.decrypt(rx[IP])
231 if not pkt.haslayer(IP):
232 pkt = IP(pkt[Raw].load)
234 self.assert_packet_checksums_valid(pkt)
235 self.assert_equal(pkt[IP].dst, "1.1.1.1")
236 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
237 except (IndexError, AssertionError):
238 self.logger.debug(ppp("Unexpected packet:", rx))
240 self.logger.debug(ppp("Decrypted packet:", pkt))
245 def config_sa_tra(self, p):
246 config_tun_params(p, self.encryption_type, p.tun_if)
248 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
249 p.auth_algo_vpp_id, p.auth_key,
250 p.crypt_algo_vpp_id, p.crypt_key,
251 self.vpp_esp_protocol,
253 udp_src=p.nat_header.sport,
254 udp_dst=p.nat_header.dport)
255 p.tun_sa_out.add_vpp_config()
257 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
258 p.auth_algo_vpp_id, p.auth_key,
259 p.crypt_algo_vpp_id, p.crypt_key,
260 self.vpp_esp_protocol,
262 udp_src=p.nat_header.sport,
263 udp_dst=p.nat_header.dport)
264 p.tun_sa_in.add_vpp_config()
267 super(TemplateIpsec4TunIfEspUdp, self).setUp()
270 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
271 IPSEC_API_SAD_FLAG_UDP_ENCAP)
272 p.nat_header = UDP(sport=5454, dport=4500)
274 self.tun_if = self.pg0
276 self.config_network(p)
277 self.config_sa_tra(p)
278 self.config_protect(p)
281 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
284 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
285 """ Ipsec ESP - TUN tests """
286 tun4_encrypt_node_name = "esp4-encrypt-tun"
287 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
289 def test_tun_basic64(self):
290 """ ipsec 6o4 tunnel basic test """
291 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
293 self.verify_tun_64(self.params[socket.AF_INET], count=1)
295 def test_tun_burst64(self):
296 """ ipsec 6o4 tunnel basic test """
297 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
299 self.verify_tun_64(self.params[socket.AF_INET], count=257)
301 def test_tun_basic_frag44(self):
302 """ ipsec 4o4 tunnel frag basic test """
303 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
307 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
309 self.verify_tun_44(self.params[socket.AF_INET],
310 count=1, payload_size=1800, n_rx=2)
311 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
315 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
316 """ Ipsec ESP UDP tests """
318 tun4_input_node = "ipsec4-tun-input"
321 super(TestIpsec4TunIfEspUdp, self).setUp()
323 def test_keepalive(self):
324 """ IPSEC NAT Keepalive """
325 self.verify_keepalive(self.ipv4_params)
328 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
329 """ Ipsec ESP UDP GCM tests """
331 tun4_input_node = "ipsec4-tun-input"
334 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
336 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
337 IPSEC_API_INTEG_ALG_NONE)
338 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
339 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
340 p.crypt_algo = "AES-GCM"
342 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
346 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
347 """ Ipsec ESP - TCP tests """
351 class TemplateIpsec6TunProtect(object):
352 """ IPsec IPv6 Tunnel protect """
354 def config_sa_tra(self, p):
355 config_tun_params(p, self.encryption_type, p.tun_if)
357 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
358 p.auth_algo_vpp_id, p.auth_key,
359 p.crypt_algo_vpp_id, p.crypt_key,
360 self.vpp_esp_protocol)
361 p.tun_sa_out.add_vpp_config()
363 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
364 p.auth_algo_vpp_id, p.auth_key,
365 p.crypt_algo_vpp_id, p.crypt_key,
366 self.vpp_esp_protocol)
367 p.tun_sa_in.add_vpp_config()
369 def config_sa_tun(self, p):
370 config_tun_params(p, self.encryption_type, p.tun_if)
372 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
373 p.auth_algo_vpp_id, p.auth_key,
374 p.crypt_algo_vpp_id, p.crypt_key,
375 self.vpp_esp_protocol,
376 self.tun_if.local_addr[p.addr_type],
377 self.tun_if.remote_addr[p.addr_type])
378 p.tun_sa_out.add_vpp_config()
380 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
381 p.auth_algo_vpp_id, p.auth_key,
382 p.crypt_algo_vpp_id, p.crypt_key,
383 self.vpp_esp_protocol,
384 self.tun_if.remote_addr[p.addr_type],
385 self.tun_if.local_addr[p.addr_type])
386 p.tun_sa_in.add_vpp_config()
388 def config_protect(self, p):
389 p.tun_protect = VppIpsecTunProtect(self,
393 p.tun_protect.add_vpp_config()
395 def config_network(self, p):
396 if hasattr(p, 'tun_dst'):
399 tun_dst = self.pg0.remote_ip6
400 p.tun_if = VppIpIpTunInterface(self, self.pg0,
403 p.tun_if.add_vpp_config()
405 p.tun_if.config_ip6()
406 p.tun_if.config_ip4()
408 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
409 [VppRoutePath(p.tun_if.remote_ip6,
411 proto=DpoProto.DPO_PROTO_IP6)])
412 p.route.add_vpp_config()
413 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
414 [VppRoutePath(p.tun_if.remote_ip4,
418 def unconfig_network(self, p):
419 p.route.remove_vpp_config()
420 p.tun_if.remove_vpp_config()
422 def unconfig_protect(self, p):
423 p.tun_protect.remove_vpp_config()
425 def unconfig_sa(self, p):
426 p.tun_sa_out.remove_vpp_config()
427 p.tun_sa_in.remove_vpp_config()
430 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
432 """ IPsec tunnel interface tests """
434 encryption_type = ESP
437 super(TemplateIpsec6TunIfEsp, self).setUp()
439 self.tun_if = self.pg0
442 self.config_network(p)
443 self.config_sa_tra(p)
444 self.config_protect(p)
447 super(TemplateIpsec6TunIfEsp, self).tearDown()
450 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
452 """ Ipsec ESP - TUN tests """
453 tun6_encrypt_node_name = "esp6-encrypt-tun"
454 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
456 def test_tun_basic46(self):
457 """ ipsec 4o6 tunnel basic test """
458 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
459 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
461 def test_tun_burst46(self):
462 """ ipsec 4o6 tunnel burst test """
463 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
464 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
467 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
468 IpsecTun6HandoffTests):
469 """ Ipsec ESP 6 Handoff tests """
470 tun6_encrypt_node_name = "esp6-encrypt-tun"
471 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
473 def test_tun_handoff_66_police(self):
474 """ ESP 6o6 tunnel with policer worker hand-off test """
475 self.vapi.cli("clear errors")
476 self.vapi.cli("clear ipsec sa")
479 p = self.params[socket.AF_INET6]
481 action_tx = PolicerAction(
482 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
484 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
485 conform_action=action_tx,
486 exceed_action=action_tx,
487 violate_action=action_tx)
488 policer.add_vpp_config()
490 # Start policing on tun
491 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
493 for pol_bind in [1, 0]:
494 policer.bind_vpp_config(pol_bind, True)
496 # inject alternately on worker 0 and 1.
497 for worker in [0, 1, 0, 1]:
498 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
500 src=p.remote_tun_if_host,
501 dst=self.pg1.remote_ip6,
503 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
504 self.pg1, worker=worker)
505 self.verify_decrypted6(p, recv_pkts)
506 self.logger.debug(self.vapi.cli("show trace max 100"))
508 stats = policer.get_stats()
509 stats0 = policer.get_stats(worker=0)
510 stats1 = policer.get_stats(worker=1)
513 # First pass: Worker 1, should have done all the policing
514 self.assertEqual(stats, stats1)
516 # Worker 0, should have handed everything off
517 self.assertEqual(stats0['conform_packets'], 0)
518 self.assertEqual(stats0['exceed_packets'], 0)
519 self.assertEqual(stats0['violate_packets'], 0)
521 # Second pass: both workers should have policed equal amounts
522 self.assertGreater(stats1['conform_packets'], 0)
523 self.assertEqual(stats1['exceed_packets'], 0)
524 self.assertGreater(stats1['violate_packets'], 0)
526 self.assertGreater(stats0['conform_packets'], 0)
527 self.assertEqual(stats0['exceed_packets'], 0)
528 self.assertGreater(stats0['violate_packets'], 0)
530 self.assertEqual(stats0['conform_packets'] +
531 stats0['violate_packets'],
532 stats1['conform_packets'] +
533 stats1['violate_packets'])
535 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
536 policer.remove_vpp_config()
539 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
540 IpsecTun4HandoffTests):
541 """ Ipsec ESP 4 Handoff tests """
542 tun4_encrypt_node_name = "esp4-encrypt-tun"
543 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
545 def test_tun_handoff_44_police(self):
546 """ ESP 4o4 tunnel with policer worker hand-off test """
547 self.vapi.cli("clear errors")
548 self.vapi.cli("clear ipsec sa")
551 p = self.params[socket.AF_INET]
553 action_tx = PolicerAction(
554 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
556 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
557 conform_action=action_tx,
558 exceed_action=action_tx,
559 violate_action=action_tx)
560 policer.add_vpp_config()
562 # Start policing on tun
563 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
565 for pol_bind in [1, 0]:
566 policer.bind_vpp_config(pol_bind, True)
568 # inject alternately on worker 0 and 1.
569 for worker in [0, 1, 0, 1]:
570 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
572 src=p.remote_tun_if_host,
573 dst=self.pg1.remote_ip4,
575 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
576 self.pg1, worker=worker)
577 self.verify_decrypted(p, recv_pkts)
578 self.logger.debug(self.vapi.cli("show trace max 100"))
580 stats = policer.get_stats()
581 stats0 = policer.get_stats(worker=0)
582 stats1 = policer.get_stats(worker=1)
585 # First pass: Worker 1, should have done all the policing
586 self.assertEqual(stats, stats1)
588 # Worker 0, should have handed everything off
589 self.assertEqual(stats0['conform_packets'], 0)
590 self.assertEqual(stats0['exceed_packets'], 0)
591 self.assertEqual(stats0['violate_packets'], 0)
593 # Second pass: both workers should have policed equal amounts
594 self.assertGreater(stats1['conform_packets'], 0)
595 self.assertEqual(stats1['exceed_packets'], 0)
596 self.assertGreater(stats1['violate_packets'], 0)
598 self.assertGreater(stats0['conform_packets'], 0)
599 self.assertEqual(stats0['exceed_packets'], 0)
600 self.assertGreater(stats0['violate_packets'], 0)
602 self.assertEqual(stats0['conform_packets'] +
603 stats0['violate_packets'],
604 stats1['conform_packets'] +
605 stats1['violate_packets'])
607 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
608 policer.remove_vpp_config()
611 @tag_fixme_vpp_workers
612 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
615 """ IPsec IPv4 Multi Tunnel interface """
617 encryption_type = ESP
618 tun4_encrypt_node_name = "esp4-encrypt-tun"
619 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
622 super(TestIpsec4MultiTunIfEsp, self).setUp()
624 self.tun_if = self.pg0
626 self.multi_params = []
627 self.pg0.generate_remote_hosts(10)
628 self.pg0.configure_ipv4_neighbors()
631 p = copy.copy(self.ipv4_params)
633 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
634 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
635 p.scapy_tun_spi = p.scapy_tun_spi + ii
636 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
637 p.vpp_tun_spi = p.vpp_tun_spi + ii
639 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
640 p.scapy_tra_spi = p.scapy_tra_spi + ii
641 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
642 p.vpp_tra_spi = p.vpp_tra_spi + ii
643 p.tun_dst = self.pg0.remote_hosts[ii].ip4
645 self.multi_params.append(p)
646 self.config_network(p)
647 self.config_sa_tra(p)
648 self.config_protect(p)
651 super(TestIpsec4MultiTunIfEsp, self).tearDown()
653 def test_tun_44(self):
654 """Multiple IPSEC tunnel interfaces """
655 for p in self.multi_params:
656 self.verify_tun_44(p, count=127)
657 c = p.tun_if.get_rx_stats()
658 self.assertEqual(c['packets'], 127)
659 c = p.tun_if.get_tx_stats()
660 self.assertEqual(c['packets'], 127)
662 def test_tun_rr_44(self):
663 """ Round-robin packets acrros multiple interface """
665 for p in self.multi_params:
666 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
667 src=p.remote_tun_if_host,
668 dst=self.pg1.remote_ip4)
669 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
671 for rx, p in zip(rxs, self.multi_params):
672 self.verify_decrypted(p, [rx])
675 for p in self.multi_params:
676 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
677 dst=p.remote_tun_if_host)
678 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
680 for rx, p in zip(rxs, self.multi_params):
681 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
684 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
687 """ IPsec IPv4 Tunnel interface all Algos """
689 encryption_type = ESP
690 tun4_encrypt_node_name = "esp4-encrypt-tun"
691 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
694 super(TestIpsec4TunIfEspAll, self).setUp()
696 self.tun_if = self.pg0
699 self.config_network(p)
700 self.config_sa_tra(p)
701 self.config_protect(p)
705 self.unconfig_protect(p)
706 self.unconfig_network(p)
709 super(TestIpsec4TunIfEspAll, self).tearDown()
713 # change the key and the SPI
716 p.crypt_key = b'X' + p.crypt_key[1:]
718 p.scapy_tun_sa_id += 1
721 p.tun_if.local_spi = p.vpp_tun_spi
722 p.tun_if.remote_spi = p.scapy_tun_spi
724 config_tun_params(p, self.encryption_type, p.tun_if)
726 p.tun_sa_out = VppIpsecSA(self,
733 self.vpp_esp_protocol,
736 p.tun_sa_in = VppIpsecSA(self,
743 self.vpp_esp_protocol,
746 p.tun_sa_in.add_vpp_config()
747 p.tun_sa_out.add_vpp_config()
749 self.config_protect(p)
750 np.tun_sa_out.remove_vpp_config()
751 np.tun_sa_in.remove_vpp_config()
752 self.logger.info(self.vapi.cli("sh ipsec sa"))
754 def test_tun_44(self):
755 """IPSEC tunnel all algos """
757 # foreach VPP crypto engine
758 engines = ["ia32", "ipsecmb", "openssl"]
760 # foreach crypto algorithm
761 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
762 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
763 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
764 IPSEC_API_INTEG_ALG_NONE),
765 'scapy-crypto': "AES-GCM",
766 'scapy-integ': "NULL",
767 'key': b"JPjyOWBeVEQiMe7h",
769 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
770 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
771 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
772 IPSEC_API_INTEG_ALG_NONE),
773 'scapy-crypto': "AES-GCM",
774 'scapy-integ': "NULL",
775 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
777 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
778 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
779 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
780 IPSEC_API_INTEG_ALG_NONE),
781 'scapy-crypto': "AES-GCM",
782 'scapy-integ': "NULL",
783 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
785 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
786 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
787 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
788 IPSEC_API_INTEG_ALG_SHA1_96),
789 'scapy-crypto': "AES-CBC",
790 'scapy-integ': "HMAC-SHA1-96",
792 'key': b"JPjyOWBeVEQiMe7h"},
793 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
794 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
795 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
796 IPSEC_API_INTEG_ALG_SHA_512_256),
797 'scapy-crypto': "AES-CBC",
798 'scapy-integ': "SHA2-512-256",
800 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
801 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
802 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
803 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
804 IPSEC_API_INTEG_ALG_SHA_256_128),
805 'scapy-crypto': "AES-CBC",
806 'scapy-integ': "SHA2-256-128",
808 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
809 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
810 IPSEC_API_CRYPTO_ALG_NONE),
811 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
812 IPSEC_API_INTEG_ALG_SHA1_96),
813 'scapy-crypto': "NULL",
814 'scapy-integ': "HMAC-SHA1-96",
816 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
818 for engine in engines:
819 self.vapi.cli("set crypto handler all %s" % engine)
822 # loop through each of the algorithms
825 # with self.subTest(algo=algo['scapy']):
828 p.auth_algo_vpp_id = algo['vpp-integ']
829 p.crypt_algo_vpp_id = algo['vpp-crypto']
830 p.crypt_algo = algo['scapy-crypto']
831 p.auth_algo = algo['scapy-integ']
832 p.crypt_key = algo['key']
833 p.salt = algo['salt']
839 self.verify_tun_44(p, count=127)
842 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
845 """ IPsec IPv4 Tunnel interface no Algos """
847 encryption_type = ESP
848 tun4_encrypt_node_name = "esp4-encrypt-tun"
849 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
852 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
854 self.tun_if = self.pg0
856 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
857 IPSEC_API_INTEG_ALG_NONE)
861 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
862 IPSEC_API_CRYPTO_ALG_NONE)
863 p.crypt_algo = 'NULL'
867 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
869 def test_tun_44(self):
870 """ IPSec SA with NULL algos """
873 self.config_network(p)
874 self.config_sa_tra(p)
875 self.config_protect(p)
877 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
878 dst=p.remote_tun_if_host)
879 self.send_and_assert_no_replies(self.pg1, tx)
881 self.unconfig_protect(p)
883 self.unconfig_network(p)
886 @tag_fixme_vpp_workers
887 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
890 """ IPsec IPv6 Multi Tunnel interface """
892 encryption_type = ESP
893 tun6_encrypt_node_name = "esp6-encrypt-tun"
894 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
897 super(TestIpsec6MultiTunIfEsp, self).setUp()
899 self.tun_if = self.pg0
901 self.multi_params = []
902 self.pg0.generate_remote_hosts(10)
903 self.pg0.configure_ipv6_neighbors()
906 p = copy.copy(self.ipv6_params)
908 p.remote_tun_if_host = "1111::%d" % (ii + 1)
909 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
910 p.scapy_tun_spi = p.scapy_tun_spi + ii
911 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
912 p.vpp_tun_spi = p.vpp_tun_spi + ii
914 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
915 p.scapy_tra_spi = p.scapy_tra_spi + ii
916 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
917 p.vpp_tra_spi = p.vpp_tra_spi + ii
918 p.tun_dst = self.pg0.remote_hosts[ii].ip6
920 self.multi_params.append(p)
921 self.config_network(p)
922 self.config_sa_tra(p)
923 self.config_protect(p)
926 super(TestIpsec6MultiTunIfEsp, self).tearDown()
928 def test_tun_66(self):
929 """Multiple IPSEC tunnel interfaces """
930 for p in self.multi_params:
931 self.verify_tun_66(p, count=127)
932 c = p.tun_if.get_rx_stats()
933 self.assertEqual(c['packets'], 127)
934 c = p.tun_if.get_tx_stats()
935 self.assertEqual(c['packets'], 127)
938 class TestIpsecGreTebIfEsp(TemplateIpsec,
940 """ Ipsec GRE TEB ESP - TUN tests """
941 tun4_encrypt_node_name = "esp4-encrypt-tun"
942 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
943 encryption_type = ESP
944 omac = "00:11:22:33:44:55"
946 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
948 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
949 sa.encrypt(IP(src=self.pg0.remote_ip4,
950 dst=self.pg0.local_ip4) /
952 Ether(dst=self.omac) /
953 IP(src="1.1.1.1", dst="1.1.1.2") /
954 UDP(sport=1144, dport=2233) /
955 Raw(b'X' * payload_size))
956 for i in range(count)]
958 def gen_pkts(self, sw_intf, src, dst, count=1,
960 return [Ether(dst=self.omac) /
961 IP(src="1.1.1.1", dst="1.1.1.2") /
962 UDP(sport=1144, dport=2233) /
963 Raw(b'X' * payload_size)
964 for i in range(count)]
966 def verify_decrypted(self, p, rxs):
968 self.assert_equal(rx[Ether].dst, self.omac)
969 self.assert_equal(rx[IP].dst, "1.1.1.2")
971 def verify_encrypted(self, p, sa, rxs):
974 pkt = sa.decrypt(rx[IP])
975 if not pkt.haslayer(IP):
976 pkt = IP(pkt[Raw].load)
977 self.assert_packet_checksums_valid(pkt)
978 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
979 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
980 self.assertTrue(pkt.haslayer(GRE))
982 self.assertEqual(e[Ether].dst, self.omac)
983 self.assertEqual(e[IP].dst, "1.1.1.2")
984 except (IndexError, AssertionError):
985 self.logger.debug(ppp("Unexpected packet:", rx))
987 self.logger.debug(ppp("Decrypted packet:", pkt))
993 super(TestIpsecGreTebIfEsp, self).setUp()
995 self.tun_if = self.pg0
999 bd1 = VppBridgeDomain(self, 1)
1000 bd1.add_vpp_config()
1002 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1003 p.auth_algo_vpp_id, p.auth_key,
1004 p.crypt_algo_vpp_id, p.crypt_key,
1005 self.vpp_esp_protocol,
1007 self.pg0.remote_ip4)
1008 p.tun_sa_out.add_vpp_config()
1010 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1011 p.auth_algo_vpp_id, p.auth_key,
1012 p.crypt_algo_vpp_id, p.crypt_key,
1013 self.vpp_esp_protocol,
1014 self.pg0.remote_ip4,
1016 p.tun_sa_in.add_vpp_config()
1018 p.tun_if = VppGreInterface(self,
1020 self.pg0.remote_ip4,
1021 type=(VppEnum.vl_api_gre_tunnel_type_t.
1022 GRE_API_TUNNEL_TYPE_TEB))
1023 p.tun_if.add_vpp_config()
1025 p.tun_protect = VppIpsecTunProtect(self,
1030 p.tun_protect.add_vpp_config()
1033 p.tun_if.config_ip4()
1034 config_tun_params(p, self.encryption_type, p.tun_if)
1036 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1037 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1039 self.vapi.cli("clear ipsec sa")
1040 self.vapi.cli("sh adj")
1041 self.vapi.cli("sh ipsec tun")
1044 p = self.ipv4_params
1045 p.tun_if.unconfig_ip4()
1046 super(TestIpsecGreTebIfEsp, self).tearDown()
1049 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1051 """ Ipsec GRE TEB ESP - TUN tests """
1052 tun4_encrypt_node_name = "esp4-encrypt-tun"
1053 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1054 encryption_type = ESP
1055 omac = "00:11:22:33:44:55"
1057 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1059 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1060 sa.encrypt(IP(src=self.pg0.remote_ip4,
1061 dst=self.pg0.local_ip4) /
1063 Ether(dst=self.omac) /
1064 IP(src="1.1.1.1", dst="1.1.1.2") /
1065 UDP(sport=1144, dport=2233) /
1066 Raw(b'X' * payload_size))
1067 for i in range(count)]
1069 def gen_pkts(self, sw_intf, src, dst, count=1,
1071 return [Ether(dst=self.omac) /
1073 IP(src="1.1.1.1", dst="1.1.1.2") /
1074 UDP(sport=1144, dport=2233) /
1075 Raw(b'X' * payload_size)
1076 for i in range(count)]
1078 def verify_decrypted(self, p, rxs):
1080 self.assert_equal(rx[Ether].dst, self.omac)
1081 self.assert_equal(rx[Dot1Q].vlan, 11)
1082 self.assert_equal(rx[IP].dst, "1.1.1.2")
1084 def verify_encrypted(self, p, sa, rxs):
1087 pkt = sa.decrypt(rx[IP])
1088 if not pkt.haslayer(IP):
1089 pkt = IP(pkt[Raw].load)
1090 self.assert_packet_checksums_valid(pkt)
1091 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1092 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1093 self.assertTrue(pkt.haslayer(GRE))
1095 self.assertEqual(e[Ether].dst, self.omac)
1096 self.assertFalse(e.haslayer(Dot1Q))
1097 self.assertEqual(e[IP].dst, "1.1.1.2")
1098 except (IndexError, AssertionError):
1099 self.logger.debug(ppp("Unexpected packet:", rx))
1101 self.logger.debug(ppp("Decrypted packet:", pkt))
1107 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1109 self.tun_if = self.pg0
1111 p = self.ipv4_params
1113 bd1 = VppBridgeDomain(self, 1)
1114 bd1.add_vpp_config()
1116 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1117 self.vapi.l2_interface_vlan_tag_rewrite(
1118 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1120 self.pg1_11.admin_up()
1122 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1123 p.auth_algo_vpp_id, p.auth_key,
1124 p.crypt_algo_vpp_id, p.crypt_key,
1125 self.vpp_esp_protocol,
1127 self.pg0.remote_ip4)
1128 p.tun_sa_out.add_vpp_config()
1130 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1131 p.auth_algo_vpp_id, p.auth_key,
1132 p.crypt_algo_vpp_id, p.crypt_key,
1133 self.vpp_esp_protocol,
1134 self.pg0.remote_ip4,
1136 p.tun_sa_in.add_vpp_config()
1138 p.tun_if = VppGreInterface(self,
1140 self.pg0.remote_ip4,
1141 type=(VppEnum.vl_api_gre_tunnel_type_t.
1142 GRE_API_TUNNEL_TYPE_TEB))
1143 p.tun_if.add_vpp_config()
1145 p.tun_protect = VppIpsecTunProtect(self,
1150 p.tun_protect.add_vpp_config()
1153 p.tun_if.config_ip4()
1154 config_tun_params(p, self.encryption_type, p.tun_if)
1156 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1157 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1159 self.vapi.cli("clear ipsec sa")
1162 p = self.ipv4_params
1163 p.tun_if.unconfig_ip4()
1164 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1165 self.pg1_11.admin_down()
1166 self.pg1_11.remove_vpp_config()
1169 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1171 """ Ipsec GRE TEB ESP - Tra tests """
1172 tun4_encrypt_node_name = "esp4-encrypt-tun"
1173 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1174 encryption_type = ESP
1175 omac = "00:11:22:33:44:55"
1177 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1179 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1180 sa.encrypt(IP(src=self.pg0.remote_ip4,
1181 dst=self.pg0.local_ip4) /
1183 Ether(dst=self.omac) /
1184 IP(src="1.1.1.1", dst="1.1.1.2") /
1185 UDP(sport=1144, dport=2233) /
1186 Raw(b'X' * payload_size))
1187 for i in range(count)]
1189 def gen_pkts(self, sw_intf, src, dst, count=1,
1191 return [Ether(dst=self.omac) /
1192 IP(src="1.1.1.1", dst="1.1.1.2") /
1193 UDP(sport=1144, dport=2233) /
1194 Raw(b'X' * payload_size)
1195 for i in range(count)]
1197 def verify_decrypted(self, p, rxs):
1199 self.assert_equal(rx[Ether].dst, self.omac)
1200 self.assert_equal(rx[IP].dst, "1.1.1.2")
1202 def verify_encrypted(self, p, sa, rxs):
1205 pkt = sa.decrypt(rx[IP])
1206 if not pkt.haslayer(IP):
1207 pkt = IP(pkt[Raw].load)
1208 self.assert_packet_checksums_valid(pkt)
1209 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1210 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1211 self.assertTrue(pkt.haslayer(GRE))
1213 self.assertEqual(e[Ether].dst, self.omac)
1214 self.assertEqual(e[IP].dst, "1.1.1.2")
1215 except (IndexError, AssertionError):
1216 self.logger.debug(ppp("Unexpected packet:", rx))
1218 self.logger.debug(ppp("Decrypted packet:", pkt))
1224 super(TestIpsecGreTebIfEspTra, self).setUp()
1226 self.tun_if = self.pg0
1228 p = self.ipv4_params
1230 bd1 = VppBridgeDomain(self, 1)
1231 bd1.add_vpp_config()
1233 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1234 p.auth_algo_vpp_id, p.auth_key,
1235 p.crypt_algo_vpp_id, p.crypt_key,
1236 self.vpp_esp_protocol)
1237 p.tun_sa_out.add_vpp_config()
1239 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1240 p.auth_algo_vpp_id, p.auth_key,
1241 p.crypt_algo_vpp_id, p.crypt_key,
1242 self.vpp_esp_protocol)
1243 p.tun_sa_in.add_vpp_config()
1245 p.tun_if = VppGreInterface(self,
1247 self.pg0.remote_ip4,
1248 type=(VppEnum.vl_api_gre_tunnel_type_t.
1249 GRE_API_TUNNEL_TYPE_TEB))
1250 p.tun_if.add_vpp_config()
1252 p.tun_protect = VppIpsecTunProtect(self,
1257 p.tun_protect.add_vpp_config()
1260 p.tun_if.config_ip4()
1261 config_tra_params(p, self.encryption_type, p.tun_if)
1263 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1264 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1266 self.vapi.cli("clear ipsec sa")
1269 p = self.ipv4_params
1270 p.tun_if.unconfig_ip4()
1271 super(TestIpsecGreTebIfEspTra, self).tearDown()
1274 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1276 """ Ipsec GRE TEB UDP ESP - Tra tests """
1277 tun4_encrypt_node_name = "esp4-encrypt-tun"
1278 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1279 encryption_type = ESP
1280 omac = "00:11:22:33:44:55"
1282 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1284 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1285 sa.encrypt(IP(src=self.pg0.remote_ip4,
1286 dst=self.pg0.local_ip4) /
1288 Ether(dst=self.omac) /
1289 IP(src="1.1.1.1", dst="1.1.1.2") /
1290 UDP(sport=1144, dport=2233) /
1291 Raw(b'X' * payload_size))
1292 for i in range(count)]
1294 def gen_pkts(self, sw_intf, src, dst, count=1,
1296 return [Ether(dst=self.omac) /
1297 IP(src="1.1.1.1", dst="1.1.1.2") /
1298 UDP(sport=1144, dport=2233) /
1299 Raw(b'X' * payload_size)
1300 for i in range(count)]
1302 def verify_decrypted(self, p, rxs):
1304 self.assert_equal(rx[Ether].dst, self.omac)
1305 self.assert_equal(rx[IP].dst, "1.1.1.2")
1307 def verify_encrypted(self, p, sa, rxs):
1309 self.assertTrue(rx.haslayer(UDP))
1310 self.assertEqual(rx[UDP].dport, 4545)
1311 self.assertEqual(rx[UDP].sport, 5454)
1313 pkt = sa.decrypt(rx[IP])
1314 if not pkt.haslayer(IP):
1315 pkt = IP(pkt[Raw].load)
1316 self.assert_packet_checksums_valid(pkt)
1317 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1318 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1319 self.assertTrue(pkt.haslayer(GRE))
1321 self.assertEqual(e[Ether].dst, self.omac)
1322 self.assertEqual(e[IP].dst, "1.1.1.2")
1323 except (IndexError, AssertionError):
1324 self.logger.debug(ppp("Unexpected packet:", rx))
1326 self.logger.debug(ppp("Decrypted packet:", pkt))
1332 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1334 self.tun_if = self.pg0
1336 p = self.ipv4_params
1337 p = self.ipv4_params
1338 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1339 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1340 p.nat_header = UDP(sport=5454, dport=4545)
1342 bd1 = VppBridgeDomain(self, 1)
1343 bd1.add_vpp_config()
1345 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1346 p.auth_algo_vpp_id, p.auth_key,
1347 p.crypt_algo_vpp_id, p.crypt_key,
1348 self.vpp_esp_protocol,
1352 p.tun_sa_out.add_vpp_config()
1354 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1355 p.auth_algo_vpp_id, p.auth_key,
1356 p.crypt_algo_vpp_id, p.crypt_key,
1357 self.vpp_esp_protocol,
1359 VppEnum.vl_api_ipsec_sad_flags_t.
1360 IPSEC_API_SAD_FLAG_IS_INBOUND),
1363 p.tun_sa_in.add_vpp_config()
1365 p.tun_if = VppGreInterface(self,
1367 self.pg0.remote_ip4,
1368 type=(VppEnum.vl_api_gre_tunnel_type_t.
1369 GRE_API_TUNNEL_TYPE_TEB))
1370 p.tun_if.add_vpp_config()
1372 p.tun_protect = VppIpsecTunProtect(self,
1377 p.tun_protect.add_vpp_config()
1380 p.tun_if.config_ip4()
1381 config_tra_params(p, self.encryption_type, p.tun_if)
1383 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1384 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1386 self.vapi.cli("clear ipsec sa")
1387 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1390 p = self.ipv4_params
1391 p.tun_if.unconfig_ip4()
1392 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1395 class TestIpsecGreIfEsp(TemplateIpsec,
1397 """ Ipsec GRE ESP - TUN tests """
1398 tun4_encrypt_node_name = "esp4-encrypt-tun"
1399 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1400 encryption_type = ESP
1402 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1404 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1405 sa.encrypt(IP(src=self.pg0.remote_ip4,
1406 dst=self.pg0.local_ip4) /
1408 IP(src=self.pg1.local_ip4,
1409 dst=self.pg1.remote_ip4) /
1410 UDP(sport=1144, dport=2233) /
1411 Raw(b'X' * payload_size))
1412 for i in range(count)]
1414 def gen_pkts(self, sw_intf, src, dst, count=1,
1416 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1417 IP(src="1.1.1.1", dst="1.1.1.2") /
1418 UDP(sport=1144, dport=2233) /
1419 Raw(b'X' * payload_size)
1420 for i in range(count)]
1422 def verify_decrypted(self, p, rxs):
1424 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1425 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1427 def verify_encrypted(self, p, sa, rxs):
1430 pkt = sa.decrypt(rx[IP])
1431 if not pkt.haslayer(IP):
1432 pkt = IP(pkt[Raw].load)
1433 self.assert_packet_checksums_valid(pkt)
1434 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1435 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1436 self.assertTrue(pkt.haslayer(GRE))
1438 self.assertEqual(e[IP].dst, "1.1.1.2")
1439 except (IndexError, AssertionError):
1440 self.logger.debug(ppp("Unexpected packet:", rx))
1442 self.logger.debug(ppp("Decrypted packet:", pkt))
1448 super(TestIpsecGreIfEsp, self).setUp()
1450 self.tun_if = self.pg0
1452 p = self.ipv4_params
1454 bd1 = VppBridgeDomain(self, 1)
1455 bd1.add_vpp_config()
1457 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1458 p.auth_algo_vpp_id, p.auth_key,
1459 p.crypt_algo_vpp_id, p.crypt_key,
1460 self.vpp_esp_protocol,
1462 self.pg0.remote_ip4)
1463 p.tun_sa_out.add_vpp_config()
1465 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1466 p.auth_algo_vpp_id, p.auth_key,
1467 p.crypt_algo_vpp_id, p.crypt_key,
1468 self.vpp_esp_protocol,
1469 self.pg0.remote_ip4,
1471 p.tun_sa_in.add_vpp_config()
1473 p.tun_if = VppGreInterface(self,
1475 self.pg0.remote_ip4)
1476 p.tun_if.add_vpp_config()
1478 p.tun_protect = VppIpsecTunProtect(self,
1482 p.tun_protect.add_vpp_config()
1485 p.tun_if.config_ip4()
1486 config_tun_params(p, self.encryption_type, p.tun_if)
1488 VppIpRoute(self, "1.1.1.2", 32,
1489 [VppRoutePath(p.tun_if.remote_ip4,
1490 0xffffffff)]).add_vpp_config()
1493 p = self.ipv4_params
1494 p.tun_if.unconfig_ip4()
1495 super(TestIpsecGreIfEsp, self).tearDown()
1498 class TestIpsecGreIfEspTra(TemplateIpsec,
1500 """ Ipsec GRE ESP - TRA tests """
1501 tun4_encrypt_node_name = "esp4-encrypt-tun"
1502 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1503 encryption_type = ESP
1505 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1507 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1508 sa.encrypt(IP(src=self.pg0.remote_ip4,
1509 dst=self.pg0.local_ip4) /
1511 IP(src=self.pg1.local_ip4,
1512 dst=self.pg1.remote_ip4) /
1513 UDP(sport=1144, dport=2233) /
1514 Raw(b'X' * payload_size))
1515 for i in range(count)]
1517 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1519 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1520 sa.encrypt(IP(src=self.pg0.remote_ip4,
1521 dst=self.pg0.local_ip4) /
1523 UDP(sport=1144, dport=2233) /
1524 Raw(b'X' * payload_size))
1525 for i in range(count)]
1527 def gen_pkts(self, sw_intf, src, dst, count=1,
1529 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1530 IP(src="1.1.1.1", dst="1.1.1.2") /
1531 UDP(sport=1144, dport=2233) /
1532 Raw(b'X' * payload_size)
1533 for i in range(count)]
1535 def verify_decrypted(self, p, rxs):
1537 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1538 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1540 def verify_encrypted(self, p, sa, rxs):
1543 pkt = sa.decrypt(rx[IP])
1544 if not pkt.haslayer(IP):
1545 pkt = IP(pkt[Raw].load)
1546 self.assert_packet_checksums_valid(pkt)
1547 self.assertTrue(pkt.haslayer(GRE))
1549 self.assertEqual(e[IP].dst, "1.1.1.2")
1550 except (IndexError, AssertionError):
1551 self.logger.debug(ppp("Unexpected packet:", rx))
1553 self.logger.debug(ppp("Decrypted packet:", pkt))
1559 super(TestIpsecGreIfEspTra, self).setUp()
1561 self.tun_if = self.pg0
1563 p = self.ipv4_params
1565 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1566 p.auth_algo_vpp_id, p.auth_key,
1567 p.crypt_algo_vpp_id, p.crypt_key,
1568 self.vpp_esp_protocol)
1569 p.tun_sa_out.add_vpp_config()
1571 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1572 p.auth_algo_vpp_id, p.auth_key,
1573 p.crypt_algo_vpp_id, p.crypt_key,
1574 self.vpp_esp_protocol)
1575 p.tun_sa_in.add_vpp_config()
1577 p.tun_if = VppGreInterface(self,
1579 self.pg0.remote_ip4)
1580 p.tun_if.add_vpp_config()
1582 p.tun_protect = VppIpsecTunProtect(self,
1586 p.tun_protect.add_vpp_config()
1589 p.tun_if.config_ip4()
1590 config_tra_params(p, self.encryption_type, p.tun_if)
1592 VppIpRoute(self, "1.1.1.2", 32,
1593 [VppRoutePath(p.tun_if.remote_ip4,
1594 0xffffffff)]).add_vpp_config()
1597 p = self.ipv4_params
1598 p.tun_if.unconfig_ip4()
1599 super(TestIpsecGreIfEspTra, self).tearDown()
1601 def test_gre_non_ip(self):
1602 p = self.ipv4_params
1603 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1604 src=p.remote_tun_if_host,
1605 dst=self.pg1.remote_ip6)
1606 self.send_and_assert_no_replies(self.tun_if, tx)
1607 node_name = ('/err/%s/unsupported payload' %
1608 self.tun4_decrypt_node_name[0])
1609 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1612 class TestIpsecGre6IfEspTra(TemplateIpsec,
1614 """ Ipsec GRE ESP - TRA tests """
1615 tun6_encrypt_node_name = "esp6-encrypt-tun"
1616 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1617 encryption_type = ESP
1619 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1621 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1622 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1623 dst=self.pg0.local_ip6) /
1625 IPv6(src=self.pg1.local_ip6,
1626 dst=self.pg1.remote_ip6) /
1627 UDP(sport=1144, dport=2233) /
1628 Raw(b'X' * payload_size))
1629 for i in range(count)]
1631 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1633 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1634 IPv6(src="1::1", dst="1::2") /
1635 UDP(sport=1144, dport=2233) /
1636 Raw(b'X' * payload_size)
1637 for i in range(count)]
1639 def verify_decrypted6(self, p, rxs):
1641 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1642 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1644 def verify_encrypted6(self, p, sa, rxs):
1647 pkt = sa.decrypt(rx[IPv6])
1648 if not pkt.haslayer(IPv6):
1649 pkt = IPv6(pkt[Raw].load)
1650 self.assert_packet_checksums_valid(pkt)
1651 self.assertTrue(pkt.haslayer(GRE))
1653 self.assertEqual(e[IPv6].dst, "1::2")
1654 except (IndexError, AssertionError):
1655 self.logger.debug(ppp("Unexpected packet:", rx))
1657 self.logger.debug(ppp("Decrypted packet:", pkt))
1663 super(TestIpsecGre6IfEspTra, self).setUp()
1665 self.tun_if = self.pg0
1667 p = self.ipv6_params
1669 bd1 = VppBridgeDomain(self, 1)
1670 bd1.add_vpp_config()
1672 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1673 p.auth_algo_vpp_id, p.auth_key,
1674 p.crypt_algo_vpp_id, p.crypt_key,
1675 self.vpp_esp_protocol)
1676 p.tun_sa_out.add_vpp_config()
1678 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1679 p.auth_algo_vpp_id, p.auth_key,
1680 p.crypt_algo_vpp_id, p.crypt_key,
1681 self.vpp_esp_protocol)
1682 p.tun_sa_in.add_vpp_config()
1684 p.tun_if = VppGreInterface(self,
1686 self.pg0.remote_ip6)
1687 p.tun_if.add_vpp_config()
1689 p.tun_protect = VppIpsecTunProtect(self,
1693 p.tun_protect.add_vpp_config()
1696 p.tun_if.config_ip6()
1697 config_tra_params(p, self.encryption_type, p.tun_if)
1699 r = VppIpRoute(self, "1::2", 128,
1700 [VppRoutePath(p.tun_if.remote_ip6,
1702 proto=DpoProto.DPO_PROTO_IP6)])
1706 p = self.ipv6_params
1707 p.tun_if.unconfig_ip6()
1708 super(TestIpsecGre6IfEspTra, self).tearDown()
1711 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1712 """ Ipsec mGRE ESP v4 TRA tests """
1713 tun4_encrypt_node_name = "esp4-encrypt-tun"
1714 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1715 encryption_type = ESP
1717 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1719 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1720 sa.encrypt(IP(src=p.tun_dst,
1721 dst=self.pg0.local_ip4) /
1723 IP(src=self.pg1.local_ip4,
1724 dst=self.pg1.remote_ip4) /
1725 UDP(sport=1144, dport=2233) /
1726 Raw(b'X' * payload_size))
1727 for i in range(count)]
1729 def gen_pkts(self, sw_intf, src, dst, count=1,
1731 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1732 IP(src="1.1.1.1", dst=dst) /
1733 UDP(sport=1144, dport=2233) /
1734 Raw(b'X' * payload_size)
1735 for i in range(count)]
1737 def verify_decrypted(self, p, rxs):
1739 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1740 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1742 def verify_encrypted(self, p, sa, rxs):
1745 pkt = sa.decrypt(rx[IP])
1746 if not pkt.haslayer(IP):
1747 pkt = IP(pkt[Raw].load)
1748 self.assert_packet_checksums_valid(pkt)
1749 self.assertTrue(pkt.haslayer(GRE))
1751 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1752 except (IndexError, AssertionError):
1753 self.logger.debug(ppp("Unexpected packet:", rx))
1755 self.logger.debug(ppp("Decrypted packet:", pkt))
1761 super(TestIpsecMGreIfEspTra4, self).setUp()
1764 self.tun_if = self.pg0
1765 p = self.ipv4_params
1766 p.tun_if = VppGreInterface(self,
1769 mode=(VppEnum.vl_api_tunnel_mode_t.
1770 TUNNEL_API_MODE_MP))
1771 p.tun_if.add_vpp_config()
1773 p.tun_if.config_ip4()
1774 p.tun_if.generate_remote_hosts(N_NHS)
1775 self.pg0.generate_remote_hosts(N_NHS)
1776 self.pg0.configure_ipv4_neighbors()
1778 # setup some SAs for several next-hops on the interface
1779 self.multi_params = []
1781 for ii in range(N_NHS):
1782 p = copy.copy(self.ipv4_params)
1784 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1785 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1786 p.scapy_tun_spi = p.scapy_tun_spi + ii
1787 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1788 p.vpp_tun_spi = p.vpp_tun_spi + ii
1790 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1791 p.scapy_tra_spi = p.scapy_tra_spi + ii
1792 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1793 p.vpp_tra_spi = p.vpp_tra_spi + ii
1794 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1795 p.auth_algo_vpp_id, p.auth_key,
1796 p.crypt_algo_vpp_id, p.crypt_key,
1797 self.vpp_esp_protocol)
1798 p.tun_sa_out.add_vpp_config()
1800 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1801 p.auth_algo_vpp_id, p.auth_key,
1802 p.crypt_algo_vpp_id, p.crypt_key,
1803 self.vpp_esp_protocol)
1804 p.tun_sa_in.add_vpp_config()
1806 p.tun_protect = VppIpsecTunProtect(
1811 nh=p.tun_if.remote_hosts[ii].ip4)
1812 p.tun_protect.add_vpp_config()
1813 config_tra_params(p, self.encryption_type, p.tun_if)
1814 self.multi_params.append(p)
1816 VppIpRoute(self, p.remote_tun_if_host, 32,
1817 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1818 p.tun_if.sw_if_index)]).add_vpp_config()
1820 # in this v4 variant add the teibs after the protect
1821 p.teib = VppTeib(self, p.tun_if,
1822 p.tun_if.remote_hosts[ii].ip4,
1823 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1824 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1825 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1828 p = self.ipv4_params
1829 p.tun_if.unconfig_ip4()
1830 super(TestIpsecMGreIfEspTra4, self).tearDown()
1832 def test_tun_44(self):
1835 for p in self.multi_params:
1836 self.verify_tun_44(p, count=N_PKTS)
1837 p.teib.remove_vpp_config()
1838 self.verify_tun_dropped_44(p, count=N_PKTS)
1839 p.teib.add_vpp_config()
1840 self.verify_tun_44(p, count=N_PKTS)
1843 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1844 """ Ipsec mGRE ESP v6 TRA tests """
1845 tun6_encrypt_node_name = "esp6-encrypt-tun"
1846 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1847 encryption_type = ESP
1849 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1851 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1852 sa.encrypt(IPv6(src=p.tun_dst,
1853 dst=self.pg0.local_ip6) /
1855 IPv6(src=self.pg1.local_ip6,
1856 dst=self.pg1.remote_ip6) /
1857 UDP(sport=1144, dport=2233) /
1858 Raw(b'X' * payload_size))
1859 for i in range(count)]
1861 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1863 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1864 IPv6(src="1::1", dst=dst) /
1865 UDP(sport=1144, dport=2233) /
1866 Raw(b'X' * payload_size)
1867 for i in range(count)]
1869 def verify_decrypted6(self, p, rxs):
1871 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1872 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1874 def verify_encrypted6(self, p, sa, rxs):
1877 pkt = sa.decrypt(rx[IPv6])
1878 if not pkt.haslayer(IPv6):
1879 pkt = IPv6(pkt[Raw].load)
1880 self.assert_packet_checksums_valid(pkt)
1881 self.assertTrue(pkt.haslayer(GRE))
1883 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1884 except (IndexError, AssertionError):
1885 self.logger.debug(ppp("Unexpected packet:", rx))
1887 self.logger.debug(ppp("Decrypted packet:", pkt))
1893 super(TestIpsecMGreIfEspTra6, self).setUp()
1895 self.vapi.cli("set logging class ipsec level debug")
1898 self.tun_if = self.pg0
1899 p = self.ipv6_params
1900 p.tun_if = VppGreInterface(self,
1903 mode=(VppEnum.vl_api_tunnel_mode_t.
1904 TUNNEL_API_MODE_MP))
1905 p.tun_if.add_vpp_config()
1907 p.tun_if.config_ip6()
1908 p.tun_if.generate_remote_hosts(N_NHS)
1909 self.pg0.generate_remote_hosts(N_NHS)
1910 self.pg0.configure_ipv6_neighbors()
1912 # setup some SAs for several next-hops on the interface
1913 self.multi_params = []
1915 for ii in range(N_NHS):
1916 p = copy.copy(self.ipv6_params)
1918 p.remote_tun_if_host = "1::%d" % (ii + 1)
1919 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1920 p.scapy_tun_spi = p.scapy_tun_spi + ii
1921 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1922 p.vpp_tun_spi = p.vpp_tun_spi + ii
1924 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1925 p.scapy_tra_spi = p.scapy_tra_spi + ii
1926 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1927 p.vpp_tra_spi = p.vpp_tra_spi + ii
1928 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1929 p.auth_algo_vpp_id, p.auth_key,
1930 p.crypt_algo_vpp_id, p.crypt_key,
1931 self.vpp_esp_protocol)
1932 p.tun_sa_out.add_vpp_config()
1934 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1935 p.auth_algo_vpp_id, p.auth_key,
1936 p.crypt_algo_vpp_id, p.crypt_key,
1937 self.vpp_esp_protocol)
1938 p.tun_sa_in.add_vpp_config()
1940 # in this v6 variant add the teibs first then the protection
1941 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1942 VppTeib(self, p.tun_if,
1943 p.tun_if.remote_hosts[ii].ip6,
1944 p.tun_dst).add_vpp_config()
1946 p.tun_protect = VppIpsecTunProtect(
1951 nh=p.tun_if.remote_hosts[ii].ip6)
1952 p.tun_protect.add_vpp_config()
1953 config_tra_params(p, self.encryption_type, p.tun_if)
1954 self.multi_params.append(p)
1956 VppIpRoute(self, p.remote_tun_if_host, 128,
1957 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1958 p.tun_if.sw_if_index)]).add_vpp_config()
1959 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1961 self.logger.info(self.vapi.cli("sh log"))
1962 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1963 self.logger.info(self.vapi.cli("sh adj 41"))
1966 p = self.ipv6_params
1967 p.tun_if.unconfig_ip6()
1968 super(TestIpsecMGreIfEspTra6, self).tearDown()
1970 def test_tun_66(self):
1972 for p in self.multi_params:
1973 self.verify_tun_66(p, count=63)
1976 @tag_fixme_vpp_workers
1977 class TestIpsec4TunProtect(TemplateIpsec,
1978 TemplateIpsec4TunProtect,
1980 """ IPsec IPv4 Tunnel protect - transport mode"""
1983 super(TestIpsec4TunProtect, self).setUp()
1985 self.tun_if = self.pg0
1988 super(TestIpsec4TunProtect, self).tearDown()
1990 def test_tun_44(self):
1991 """IPSEC tunnel protect"""
1993 p = self.ipv4_params
1995 self.config_network(p)
1996 self.config_sa_tra(p)
1997 self.config_protect(p)
1999 self.verify_tun_44(p, count=127)
2000 c = p.tun_if.get_rx_stats()
2001 self.assertEqual(c['packets'], 127)
2002 c = p.tun_if.get_tx_stats()
2003 self.assertEqual(c['packets'], 127)
2005 self.vapi.cli("clear ipsec sa")
2006 self.verify_tun_64(p, count=127)
2007 c = p.tun_if.get_rx_stats()
2008 self.assertEqual(c['packets'], 254)
2009 c = p.tun_if.get_tx_stats()
2010 self.assertEqual(c['packets'], 254)
2012 # rekey - create new SAs and update the tunnel protection
2014 np.crypt_key = b'X' + p.crypt_key[1:]
2015 np.scapy_tun_spi += 100
2016 np.scapy_tun_sa_id += 1
2017 np.vpp_tun_spi += 100
2018 np.vpp_tun_sa_id += 1
2019 np.tun_if.local_spi = p.vpp_tun_spi
2020 np.tun_if.remote_spi = p.scapy_tun_spi
2022 self.config_sa_tra(np)
2023 self.config_protect(np)
2026 self.verify_tun_44(np, count=127)
2027 c = p.tun_if.get_rx_stats()
2028 self.assertEqual(c['packets'], 381)
2029 c = p.tun_if.get_tx_stats()
2030 self.assertEqual(c['packets'], 381)
2033 self.unconfig_protect(np)
2034 self.unconfig_sa(np)
2035 self.unconfig_network(p)
2038 @tag_fixme_vpp_workers
2039 class TestIpsec4TunProtectUdp(TemplateIpsec,
2040 TemplateIpsec4TunProtect,
2042 """ IPsec IPv4 Tunnel protect - transport mode"""
2045 super(TestIpsec4TunProtectUdp, self).setUp()
2047 self.tun_if = self.pg0
2049 p = self.ipv4_params
2050 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2051 IPSEC_API_SAD_FLAG_UDP_ENCAP)
2052 p.nat_header = UDP(sport=4500, dport=4500)
2053 self.config_network(p)
2054 self.config_sa_tra(p)
2055 self.config_protect(p)
2058 p = self.ipv4_params
2059 self.unconfig_protect(p)
2061 self.unconfig_network(p)
2062 super(TestIpsec4TunProtectUdp, self).tearDown()
2064 def verify_encrypted(self, p, sa, rxs):
2065 # ensure encrypted packets are recieved with the default UDP ports
2067 self.assertEqual(rx[UDP].sport, 4500)
2068 self.assertEqual(rx[UDP].dport, 4500)
2069 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2071 def test_tun_44(self):
2072 """IPSEC UDP tunnel protect"""
2074 p = self.ipv4_params
2076 self.verify_tun_44(p, count=127)
2077 c = p.tun_if.get_rx_stats()
2078 self.assertEqual(c['packets'], 127)
2079 c = p.tun_if.get_tx_stats()
2080 self.assertEqual(c['packets'], 127)
2082 def test_keepalive(self):
2083 """ IPSEC NAT Keepalive """
2084 self.verify_keepalive(self.ipv4_params)
2087 @tag_fixme_vpp_workers
2088 class TestIpsec4TunProtectTun(TemplateIpsec,
2089 TemplateIpsec4TunProtect,
2091 """ IPsec IPv4 Tunnel protect - tunnel mode"""
2093 encryption_type = ESP
2094 tun4_encrypt_node_name = "esp4-encrypt-tun"
2095 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2098 super(TestIpsec4TunProtectTun, self).setUp()
2100 self.tun_if = self.pg0
2103 super(TestIpsec4TunProtectTun, self).tearDown()
2105 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2107 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2108 sa.encrypt(IP(src=sw_intf.remote_ip4,
2109 dst=sw_intf.local_ip4) /
2110 IP(src=src, dst=dst) /
2111 UDP(sport=1144, dport=2233) /
2112 Raw(b'X' * payload_size))
2113 for i in range(count)]
2115 def gen_pkts(self, sw_intf, src, dst, count=1,
2117 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2118 IP(src=src, dst=dst) /
2119 UDP(sport=1144, dport=2233) /
2120 Raw(b'X' * payload_size)
2121 for i in range(count)]
2123 def verify_decrypted(self, p, rxs):
2125 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2126 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2127 self.assert_packet_checksums_valid(rx)
2129 def verify_encrypted(self, p, sa, rxs):
2132 pkt = sa.decrypt(rx[IP])
2133 if not pkt.haslayer(IP):
2134 pkt = IP(pkt[Raw].load)
2135 self.assert_packet_checksums_valid(pkt)
2136 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2137 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2138 inner = pkt[IP].payload
2139 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2141 except (IndexError, AssertionError):
2142 self.logger.debug(ppp("Unexpected packet:", rx))
2144 self.logger.debug(ppp("Decrypted packet:", pkt))
2149 def test_tun_44(self):
2150 """IPSEC tunnel protect """
2152 p = self.ipv4_params
2154 self.config_network(p)
2155 self.config_sa_tun(p)
2156 self.config_protect(p)
2158 # also add an output features on the tunnel and physical interface
2159 # so we test they still work
2160 r_all = AclRule(True,
2161 src_prefix="0.0.0.0/0",
2162 dst_prefix="0.0.0.0/0",
2164 a = VppAcl(self, [r_all]).add_vpp_config()
2166 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2167 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2169 self.verify_tun_44(p, count=127)
2171 c = p.tun_if.get_rx_stats()
2172 self.assertEqual(c['packets'], 127)
2173 c = p.tun_if.get_tx_stats()
2174 self.assertEqual(c['packets'], 127)
2176 # rekey - create new SAs and update the tunnel protection
2178 np.crypt_key = b'X' + p.crypt_key[1:]
2179 np.scapy_tun_spi += 100
2180 np.scapy_tun_sa_id += 1
2181 np.vpp_tun_spi += 100
2182 np.vpp_tun_sa_id += 1
2183 np.tun_if.local_spi = p.vpp_tun_spi
2184 np.tun_if.remote_spi = p.scapy_tun_spi
2186 self.config_sa_tun(np)
2187 self.config_protect(np)
2190 self.verify_tun_44(np, count=127)
2191 c = p.tun_if.get_rx_stats()
2192 self.assertEqual(c['packets'], 254)
2193 c = p.tun_if.get_tx_stats()
2194 self.assertEqual(c['packets'], 254)
2197 self.unconfig_protect(np)
2198 self.unconfig_sa(np)
2199 self.unconfig_network(p)
2202 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2203 TemplateIpsec4TunProtect,
2205 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2207 encryption_type = ESP
2208 tun4_encrypt_node_name = "esp4-encrypt-tun"
2209 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2212 super(TestIpsec4TunProtectTunDrop, self).setUp()
2214 self.tun_if = self.pg0
2217 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2219 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2221 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2222 sa.encrypt(IP(src=sw_intf.remote_ip4,
2224 IP(src=src, dst=dst) /
2225 UDP(sport=1144, dport=2233) /
2226 Raw(b'X' * payload_size))
2227 for i in range(count)]
2229 def test_tun_drop_44(self):
2230 """IPSEC tunnel protect bogus tunnel header """
2232 p = self.ipv4_params
2234 self.config_network(p)
2235 self.config_sa_tun(p)
2236 self.config_protect(p)
2238 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2239 src=p.remote_tun_if_host,
2240 dst=self.pg1.remote_ip4,
2242 self.send_and_assert_no_replies(self.tun_if, tx)
2245 self.unconfig_protect(p)
2247 self.unconfig_network(p)
2250 @tag_fixme_vpp_workers
2251 class TestIpsec6TunProtect(TemplateIpsec,
2252 TemplateIpsec6TunProtect,
2254 """ IPsec IPv6 Tunnel protect - transport mode"""
2256 encryption_type = ESP
2257 tun6_encrypt_node_name = "esp6-encrypt-tun"
2258 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2261 super(TestIpsec6TunProtect, self).setUp()
2263 self.tun_if = self.pg0
2266 super(TestIpsec6TunProtect, self).tearDown()
2268 def test_tun_66(self):
2269 """IPSEC tunnel protect 6o6"""
2271 p = self.ipv6_params
2273 self.config_network(p)
2274 self.config_sa_tra(p)
2275 self.config_protect(p)
2277 self.verify_tun_66(p, count=127)
2278 c = p.tun_if.get_rx_stats()
2279 self.assertEqual(c['packets'], 127)
2280 c = p.tun_if.get_tx_stats()
2281 self.assertEqual(c['packets'], 127)
2283 # rekey - create new SAs and update the tunnel protection
2285 np.crypt_key = b'X' + p.crypt_key[1:]
2286 np.scapy_tun_spi += 100
2287 np.scapy_tun_sa_id += 1
2288 np.vpp_tun_spi += 100
2289 np.vpp_tun_sa_id += 1
2290 np.tun_if.local_spi = p.vpp_tun_spi
2291 np.tun_if.remote_spi = p.scapy_tun_spi
2293 self.config_sa_tra(np)
2294 self.config_protect(np)
2297 self.verify_tun_66(np, count=127)
2298 c = p.tun_if.get_rx_stats()
2299 self.assertEqual(c['packets'], 254)
2300 c = p.tun_if.get_tx_stats()
2301 self.assertEqual(c['packets'], 254)
2303 # bounce the interface state
2304 p.tun_if.admin_down()
2305 self.verify_drop_tun_66(np, count=127)
2306 node = ('/err/ipsec6-tun-input/%s' %
2307 'ipsec packets received on disabled interface')
2308 self.assertEqual(127, self.statistics.get_err_counter(node))
2310 self.verify_tun_66(np, count=127)
2313 # 1) add two input SAs [old, new]
2314 # 2) swap output SA to [new]
2315 # 3) use only [new] input SA
2317 np3.crypt_key = b'Z' + p.crypt_key[1:]
2318 np3.scapy_tun_spi += 100
2319 np3.scapy_tun_sa_id += 1
2320 np3.vpp_tun_spi += 100
2321 np3.vpp_tun_sa_id += 1
2322 np3.tun_if.local_spi = p.vpp_tun_spi
2323 np3.tun_if.remote_spi = p.scapy_tun_spi
2325 self.config_sa_tra(np3)
2328 p.tun_protect.update_vpp_config(np.tun_sa_out,
2329 [np.tun_sa_in, np3.tun_sa_in])
2330 self.verify_tun_66(np, np, count=127)
2331 self.verify_tun_66(np3, np, count=127)
2334 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2335 [np.tun_sa_in, np3.tun_sa_in])
2336 self.verify_tun_66(np, np3, count=127)
2337 self.verify_tun_66(np3, np3, count=127)
2340 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2342 self.verify_tun_66(np3, np3, count=127)
2343 self.verify_drop_tun_66(np, count=127)
2345 c = p.tun_if.get_rx_stats()
2346 self.assertEqual(c['packets'], 127*9)
2347 c = p.tun_if.get_tx_stats()
2348 self.assertEqual(c['packets'], 127*8)
2349 self.unconfig_sa(np)
2352 self.unconfig_protect(np3)
2353 self.unconfig_sa(np3)
2354 self.unconfig_network(p)
2356 def test_tun_46(self):
2357 """IPSEC tunnel protect 4o6"""
2359 p = self.ipv6_params
2361 self.config_network(p)
2362 self.config_sa_tra(p)
2363 self.config_protect(p)
2365 self.verify_tun_46(p, count=127)
2366 c = p.tun_if.get_rx_stats()
2367 self.assertEqual(c['packets'], 127)
2368 c = p.tun_if.get_tx_stats()
2369 self.assertEqual(c['packets'], 127)
2372 self.unconfig_protect(p)
2374 self.unconfig_network(p)
2377 @tag_fixme_vpp_workers
2378 class TestIpsec6TunProtectTun(TemplateIpsec,
2379 TemplateIpsec6TunProtect,
2381 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2383 encryption_type = ESP
2384 tun6_encrypt_node_name = "esp6-encrypt-tun"
2385 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2388 super(TestIpsec6TunProtectTun, self).setUp()
2390 self.tun_if = self.pg0
2393 super(TestIpsec6TunProtectTun, self).tearDown()
2395 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2397 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2398 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2399 dst=sw_intf.local_ip6) /
2400 IPv6(src=src, dst=dst) /
2401 UDP(sport=1166, dport=2233) /
2402 Raw(b'X' * payload_size))
2403 for i in range(count)]
2405 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2407 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2408 IPv6(src=src, dst=dst) /
2409 UDP(sport=1166, dport=2233) /
2410 Raw(b'X' * payload_size)
2411 for i in range(count)]
2413 def verify_decrypted6(self, p, rxs):
2415 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2416 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2417 self.assert_packet_checksums_valid(rx)
2419 def verify_encrypted6(self, p, sa, rxs):
2422 pkt = sa.decrypt(rx[IPv6])
2423 if not pkt.haslayer(IPv6):
2424 pkt = IPv6(pkt[Raw].load)
2425 self.assert_packet_checksums_valid(pkt)
2426 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2427 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2428 inner = pkt[IPv6].payload
2429 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2431 except (IndexError, AssertionError):
2432 self.logger.debug(ppp("Unexpected packet:", rx))
2434 self.logger.debug(ppp("Decrypted packet:", pkt))
2439 def test_tun_66(self):
2440 """IPSEC tunnel protect """
2442 p = self.ipv6_params
2444 self.config_network(p)
2445 self.config_sa_tun(p)
2446 self.config_protect(p)
2448 self.verify_tun_66(p, count=127)
2450 c = p.tun_if.get_rx_stats()
2451 self.assertEqual(c['packets'], 127)
2452 c = p.tun_if.get_tx_stats()
2453 self.assertEqual(c['packets'], 127)
2455 # rekey - create new SAs and update the tunnel protection
2457 np.crypt_key = b'X' + p.crypt_key[1:]
2458 np.scapy_tun_spi += 100
2459 np.scapy_tun_sa_id += 1
2460 np.vpp_tun_spi += 100
2461 np.vpp_tun_sa_id += 1
2462 np.tun_if.local_spi = p.vpp_tun_spi
2463 np.tun_if.remote_spi = p.scapy_tun_spi
2465 self.config_sa_tun(np)
2466 self.config_protect(np)
2469 self.verify_tun_66(np, count=127)
2470 c = p.tun_if.get_rx_stats()
2471 self.assertEqual(c['packets'], 254)
2472 c = p.tun_if.get_tx_stats()
2473 self.assertEqual(c['packets'], 254)
2476 self.unconfig_protect(np)
2477 self.unconfig_sa(np)
2478 self.unconfig_network(p)
2481 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2482 TemplateIpsec6TunProtect,
2484 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2486 encryption_type = ESP
2487 tun6_encrypt_node_name = "esp6-encrypt-tun"
2488 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2491 super(TestIpsec6TunProtectTunDrop, self).setUp()
2493 self.tun_if = self.pg0
2496 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2498 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2500 # the IP destination of the revelaed packet does not match
2501 # that assigned to the tunnel
2502 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2503 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2505 IPv6(src=src, dst=dst) /
2506 UDP(sport=1144, dport=2233) /
2507 Raw(b'X' * payload_size))
2508 for i in range(count)]
2510 def test_tun_drop_66(self):
2511 """IPSEC 6 tunnel protect bogus tunnel header """
2513 p = self.ipv6_params
2515 self.config_network(p)
2516 self.config_sa_tun(p)
2517 self.config_protect(p)
2519 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2520 src=p.remote_tun_if_host,
2521 dst=self.pg1.remote_ip6,
2523 self.send_and_assert_no_replies(self.tun_if, tx)
2525 self.unconfig_protect(p)
2527 self.unconfig_network(p)
2530 class TemplateIpsecItf4(object):
2531 """ IPsec Interface IPv4 """
2533 encryption_type = ESP
2534 tun4_encrypt_node_name = "esp4-encrypt-tun"
2535 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2536 tun4_input_node = "ipsec4-tun-input"
2538 def config_sa_tun(self, p, src, dst):
2539 config_tun_params(p, self.encryption_type, None, src, dst)
2541 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2542 p.auth_algo_vpp_id, p.auth_key,
2543 p.crypt_algo_vpp_id, p.crypt_key,
2544 self.vpp_esp_protocol,
2547 p.tun_sa_out.add_vpp_config()
2549 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2550 p.auth_algo_vpp_id, p.auth_key,
2551 p.crypt_algo_vpp_id, p.crypt_key,
2552 self.vpp_esp_protocol,
2555 p.tun_sa_in.add_vpp_config()
2557 def config_protect(self, p):
2558 p.tun_protect = VppIpsecTunProtect(self,
2562 p.tun_protect.add_vpp_config()
2564 def config_network(self, p, instance=0xffffffff):
2565 p.tun_if = VppIpsecInterface(self, instance=instance)
2567 p.tun_if.add_vpp_config()
2569 p.tun_if.config_ip4()
2570 p.tun_if.config_ip6()
2572 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2573 [VppRoutePath(p.tun_if.remote_ip4,
2575 p.route.add_vpp_config()
2576 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2577 [VppRoutePath(p.tun_if.remote_ip6,
2579 proto=DpoProto.DPO_PROTO_IP6)])
2582 def unconfig_network(self, p):
2583 p.route.remove_vpp_config()
2584 p.tun_if.remove_vpp_config()
2586 def unconfig_protect(self, p):
2587 p.tun_protect.remove_vpp_config()
2589 def unconfig_sa(self, p):
2590 p.tun_sa_out.remove_vpp_config()
2591 p.tun_sa_in.remove_vpp_config()
2594 @tag_fixme_vpp_workers
2595 class TestIpsecItf4(TemplateIpsec,
2598 """ IPsec Interface IPv4 """
2601 super(TestIpsecItf4, self).setUp()
2603 self.tun_if = self.pg0
2606 super(TestIpsecItf4, self).tearDown()
2608 def test_tun_instance_44(self):
2609 p = self.ipv4_params
2610 self.config_network(p, instance=3)
2612 with self.assertRaises(CliFailedCommandError):
2613 self.vapi.cli("show interface ipsec0")
2615 output = self.vapi.cli("show interface ipsec3")
2616 self.assertTrue("unknown" not in output)
2618 self.unconfig_network(p)
2620 def test_tun_44(self):
2621 """IPSEC interface IPv4"""
2624 p = self.ipv4_params
2626 self.config_network(p)
2627 self.config_sa_tun(p,
2629 self.pg0.remote_ip4)
2630 self.config_protect(p)
2632 self.verify_tun_44(p, count=n_pkts)
2633 c = p.tun_if.get_rx_stats()
2634 self.assertEqual(c['packets'], n_pkts)
2635 c = p.tun_if.get_tx_stats()
2636 self.assertEqual(c['packets'], n_pkts)
2638 p.tun_if.admin_down()
2639 self.verify_tun_dropped_44(p, count=n_pkts)
2641 self.verify_tun_44(p, count=n_pkts)
2643 c = p.tun_if.get_rx_stats()
2644 self.assertEqual(c['packets'], 3*n_pkts)
2645 c = p.tun_if.get_tx_stats()
2646 self.assertEqual(c['packets'], 2*n_pkts)
2648 # it's a v6 packet when its encrypted
2649 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2651 self.verify_tun_64(p, count=n_pkts)
2652 c = p.tun_if.get_rx_stats()
2653 self.assertEqual(c['packets'], 4*n_pkts)
2654 c = p.tun_if.get_tx_stats()
2655 self.assertEqual(c['packets'], 3*n_pkts)
2657 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2659 self.vapi.cli("clear interfaces")
2661 # rekey - create new SAs and update the tunnel protection
2663 np.crypt_key = b'X' + p.crypt_key[1:]
2664 np.scapy_tun_spi += 100
2665 np.scapy_tun_sa_id += 1
2666 np.vpp_tun_spi += 100
2667 np.vpp_tun_sa_id += 1
2668 np.tun_if.local_spi = p.vpp_tun_spi
2669 np.tun_if.remote_spi = p.scapy_tun_spi
2671 self.config_sa_tun(np,
2673 self.pg0.remote_ip4)
2674 self.config_protect(np)
2677 self.verify_tun_44(np, count=n_pkts)
2678 c = p.tun_if.get_rx_stats()
2679 self.assertEqual(c['packets'], n_pkts)
2680 c = p.tun_if.get_tx_stats()
2681 self.assertEqual(c['packets'], n_pkts)
2684 self.unconfig_protect(np)
2685 self.unconfig_sa(np)
2686 self.unconfig_network(p)
2688 def test_tun_44_null(self):
2689 """IPSEC interface IPv4 NULL auth/crypto"""
2692 p = copy.copy(self.ipv4_params)
2694 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2695 IPSEC_API_INTEG_ALG_NONE)
2696 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2697 IPSEC_API_CRYPTO_ALG_NONE)
2698 p.crypt_algo = "NULL"
2699 p.auth_algo = "NULL"
2701 self.config_network(p)
2702 self.config_sa_tun(p,
2704 self.pg0.remote_ip4)
2705 self.config_protect(p)
2707 self.verify_tun_44(p, count=n_pkts)
2710 self.unconfig_protect(p)
2712 self.unconfig_network(p)
2714 def test_tun_44_police(self):
2715 """IPSEC interface IPv4 with input policer"""
2717 p = self.ipv4_params
2719 self.config_network(p)
2720 self.config_sa_tun(p,
2722 self.pg0.remote_ip4)
2723 self.config_protect(p)
2725 action_tx = PolicerAction(
2726 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2728 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2729 conform_action=action_tx,
2730 exceed_action=action_tx,
2731 violate_action=action_tx)
2732 policer.add_vpp_config()
2734 # Start policing on tun
2735 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2737 self.verify_tun_44(p, count=n_pkts)
2738 c = p.tun_if.get_rx_stats()
2739 self.assertEqual(c['packets'], n_pkts)
2740 c = p.tun_if.get_tx_stats()
2741 self.assertEqual(c['packets'], n_pkts)
2743 stats = policer.get_stats()
2745 # Single rate, 2 colour policer - expect conform, violate but no exceed
2746 self.assertGreater(stats['conform_packets'], 0)
2747 self.assertEqual(stats['exceed_packets'], 0)
2748 self.assertGreater(stats['violate_packets'], 0)
2750 # Stop policing on tun
2751 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2752 self.verify_tun_44(p, count=n_pkts)
2754 # No new policer stats
2755 statsnew = policer.get_stats()
2756 self.assertEqual(stats, statsnew)
2759 policer.remove_vpp_config()
2760 self.unconfig_protect(p)
2762 self.unconfig_network(p)
2765 class TestIpsecItf4MPLS(TemplateIpsec,
2768 """ IPsec Interface MPLSoIPv4 """
2770 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2773 super(TestIpsecItf4MPLS, self).setUp()
2775 self.tun_if = self.pg0
2778 super(TestIpsecItf4MPLS, self).tearDown()
2780 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2782 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2783 sa.encrypt(MPLS(label=44, ttl=3) /
2784 IP(src=src, dst=dst) /
2785 UDP(sport=1166, dport=2233) /
2786 Raw(b'X' * payload_size))
2787 for i in range(count)]
2789 def verify_encrypted(self, p, sa, rxs):
2792 pkt = sa.decrypt(rx[IP])
2793 if not pkt.haslayer(IP):
2794 pkt = IP(pkt[Raw].load)
2795 self.assert_packet_checksums_valid(pkt)
2796 self.assert_equal(pkt[MPLS].label, 44)
2797 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2798 except (IndexError, AssertionError):
2799 self.logger.debug(ppp("Unexpected packet:", rx))
2801 self.logger.debug(ppp("Decrypted packet:", pkt))
2806 def test_tun_mpls_o_ip4(self):
2807 """IPSEC interface MPLS over IPv4"""
2810 p = self.ipv4_params
2813 tbl = VppMplsTable(self, 0)
2814 tbl.add_vpp_config()
2816 self.config_network(p)
2817 # deag MPLS routes from the tunnel
2818 r4 = VppMplsRoute(self, 44, 1,
2820 self.pg1.remote_ip4,
2821 self.pg1.sw_if_index)]).add_vpp_config()
2822 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2823 p.tun_if.sw_if_index,
2824 labels=[VppMplsLabel(44)])])
2825 p.tun_if.enable_mpls()
2827 self.config_sa_tun(p,
2829 self.pg0.remote_ip4)
2830 self.config_protect(p)
2832 self.verify_tun_44(p, count=n_pkts)
2835 p.tun_if.disable_mpls()
2836 self.unconfig_protect(p)
2838 self.unconfig_network(p)
2841 class TemplateIpsecItf6(object):
2842 """ IPsec Interface IPv6 """
2844 encryption_type = ESP
2845 tun6_encrypt_node_name = "esp6-encrypt-tun"
2846 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2847 tun6_input_node = "ipsec6-tun-input"
2849 def config_sa_tun(self, p, src, dst):
2850 config_tun_params(p, self.encryption_type, None, src, dst)
2852 if not hasattr(p, 'tun_flags'):
2854 if not hasattr(p, 'hop_limit'):
2857 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2858 p.auth_algo_vpp_id, p.auth_key,
2859 p.crypt_algo_vpp_id, p.crypt_key,
2860 self.vpp_esp_protocol,
2863 tun_flags=p.tun_flags,
2864 hop_limit=p.hop_limit)
2865 p.tun_sa_out.add_vpp_config()
2867 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2868 p.auth_algo_vpp_id, p.auth_key,
2869 p.crypt_algo_vpp_id, p.crypt_key,
2870 self.vpp_esp_protocol,
2873 p.tun_sa_in.add_vpp_config()
2875 def config_protect(self, p):
2876 p.tun_protect = VppIpsecTunProtect(self,
2880 p.tun_protect.add_vpp_config()
2882 def config_network(self, p):
2883 p.tun_if = VppIpsecInterface(self)
2885 p.tun_if.add_vpp_config()
2887 p.tun_if.config_ip4()
2888 p.tun_if.config_ip6()
2890 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2891 [VppRoutePath(p.tun_if.remote_ip4,
2895 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2896 [VppRoutePath(p.tun_if.remote_ip6,
2898 proto=DpoProto.DPO_PROTO_IP6)])
2899 p.route.add_vpp_config()
2901 def unconfig_network(self, p):
2902 p.route.remove_vpp_config()
2903 p.tun_if.remove_vpp_config()
2905 def unconfig_protect(self, p):
2906 p.tun_protect.remove_vpp_config()
2908 def unconfig_sa(self, p):
2909 p.tun_sa_out.remove_vpp_config()
2910 p.tun_sa_in.remove_vpp_config()
2913 @tag_fixme_vpp_workers
2914 class TestIpsecItf6(TemplateIpsec,
2917 """ IPsec Interface IPv6 """
2920 super(TestIpsecItf6, self).setUp()
2922 self.tun_if = self.pg0
2925 super(TestIpsecItf6, self).tearDown()
2927 def test_tun_44(self):
2928 """IPSEC interface IPv6"""
2930 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2932 p = self.ipv6_params
2933 p.inner_hop_limit = 24
2934 p.outer_hop_limit = 23
2935 p.outer_flow_label = 243224
2936 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2938 self.config_network(p)
2939 self.config_sa_tun(p,
2941 self.pg0.remote_ip6)
2942 self.config_protect(p)
2944 self.verify_tun_66(p, count=n_pkts)
2945 c = p.tun_if.get_rx_stats()
2946 self.assertEqual(c['packets'], n_pkts)
2947 c = p.tun_if.get_tx_stats()
2948 self.assertEqual(c['packets'], n_pkts)
2950 p.tun_if.admin_down()
2951 self.verify_drop_tun_66(p, count=n_pkts)
2953 self.verify_tun_66(p, count=n_pkts)
2955 c = p.tun_if.get_rx_stats()
2956 self.assertEqual(c['packets'], 3*n_pkts)
2957 c = p.tun_if.get_tx_stats()
2958 self.assertEqual(c['packets'], 2*n_pkts)
2960 # it's a v4 packet when its encrypted
2961 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2963 self.verify_tun_46(p, count=n_pkts)
2964 c = p.tun_if.get_rx_stats()
2965 self.assertEqual(c['packets'], 4*n_pkts)
2966 c = p.tun_if.get_tx_stats()
2967 self.assertEqual(c['packets'], 3*n_pkts)
2969 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2971 self.vapi.cli("clear interfaces")
2973 # rekey - create new SAs and update the tunnel protection
2975 np.crypt_key = b'X' + p.crypt_key[1:]
2976 np.scapy_tun_spi += 100
2977 np.scapy_tun_sa_id += 1
2978 np.vpp_tun_spi += 100
2979 np.vpp_tun_sa_id += 1
2980 np.tun_if.local_spi = p.vpp_tun_spi
2981 np.tun_if.remote_spi = p.scapy_tun_spi
2982 np.inner_hop_limit = 24
2983 np.outer_hop_limit = 128
2984 np.inner_flow_label = 0xabcde
2985 np.outer_flow_label = 0xabcde
2987 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2989 self.config_sa_tun(np,
2991 self.pg0.remote_ip6)
2992 self.config_protect(np)
2995 self.verify_tun_66(np, count=n_pkts)
2996 c = p.tun_if.get_rx_stats()
2997 self.assertEqual(c['packets'], n_pkts)
2998 c = p.tun_if.get_tx_stats()
2999 self.assertEqual(c['packets'], n_pkts)
3002 self.unconfig_protect(np)
3003 self.unconfig_sa(np)
3004 self.unconfig_network(p)
3006 def test_tun_66_police(self):
3007 """IPSEC interface IPv6 with input policer"""
3008 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
3010 p = self.ipv6_params
3011 p.inner_hop_limit = 24
3012 p.outer_hop_limit = 23
3013 p.outer_flow_label = 243224
3014 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3016 self.config_network(p)
3017 self.config_sa_tun(p,
3019 self.pg0.remote_ip6)
3020 self.config_protect(p)
3022 action_tx = PolicerAction(
3023 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
3025 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
3026 conform_action=action_tx,
3027 exceed_action=action_tx,
3028 violate_action=action_tx)
3029 policer.add_vpp_config()
3031 # Start policing on tun
3032 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
3034 self.verify_tun_66(p, count=n_pkts)
3035 c = p.tun_if.get_rx_stats()
3036 self.assertEqual(c['packets'], n_pkts)
3037 c = p.tun_if.get_tx_stats()
3038 self.assertEqual(c['packets'], n_pkts)
3040 stats = policer.get_stats()
3042 # Single rate, 2 colour policer - expect conform, violate but no exceed
3043 self.assertGreater(stats['conform_packets'], 0)
3044 self.assertEqual(stats['exceed_packets'], 0)
3045 self.assertGreater(stats['violate_packets'], 0)
3047 # Stop policing on tun
3048 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3049 self.verify_tun_66(p, count=n_pkts)
3051 # No new policer stats
3052 statsnew = policer.get_stats()
3053 self.assertEqual(stats, statsnew)
3056 policer.remove_vpp_config()
3057 self.unconfig_protect(p)
3059 self.unconfig_network(p)
3062 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3063 """ Ipsec P2MP ESP v4 tests """
3064 tun4_encrypt_node_name = "esp4-encrypt-tun"
3065 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3066 encryption_type = ESP
3068 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3070 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3071 sa.encrypt(IP(src=self.pg1.local_ip4,
3072 dst=self.pg1.remote_ip4) /
3073 UDP(sport=1144, dport=2233) /
3074 Raw(b'X' * payload_size))
3075 for i in range(count)]
3077 def gen_pkts(self, sw_intf, src, dst, count=1,
3079 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3080 IP(src="1.1.1.1", dst=dst) /
3081 UDP(sport=1144, dport=2233) /
3082 Raw(b'X' * payload_size)
3083 for i in range(count)]
3085 def verify_decrypted(self, p, rxs):
3087 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3088 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3090 def verify_encrypted(self, p, sa, rxs):
3093 self.assertEqual(rx[IP].tos,
3094 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3095 self.assertEqual(rx[IP].ttl, p.hop_limit)
3096 pkt = sa.decrypt(rx[IP])
3097 if not pkt.haslayer(IP):
3098 pkt = IP(pkt[Raw].load)
3099 self.assert_packet_checksums_valid(pkt)
3101 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3102 except (IndexError, AssertionError):
3103 self.logger.debug(ppp("Unexpected packet:", rx))
3105 self.logger.debug(ppp("Decrypted packet:", pkt))
3111 super(TestIpsecMIfEsp4, self).setUp()
3114 self.tun_if = self.pg0
3115 p = self.ipv4_params
3116 p.tun_if = VppIpsecInterface(self,
3117 mode=(VppEnum.vl_api_tunnel_mode_t.
3118 TUNNEL_API_MODE_MP))
3119 p.tun_if.add_vpp_config()
3121 p.tun_if.config_ip4()
3122 p.tun_if.unconfig_ip4()
3123 p.tun_if.config_ip4()
3124 p.tun_if.generate_remote_hosts(N_NHS)
3125 self.pg0.generate_remote_hosts(N_NHS)
3126 self.pg0.configure_ipv4_neighbors()
3128 # setup some SAs for several next-hops on the interface
3129 self.multi_params = []
3131 for ii in range(N_NHS):
3132 p = copy.copy(self.ipv4_params)
3134 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3135 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3136 p.scapy_tun_spi = p.scapy_tun_spi + ii
3137 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3138 p.vpp_tun_spi = p.vpp_tun_spi + ii
3140 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3141 p.scapy_tra_spi = p.scapy_tra_spi + ii
3142 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3143 p.vpp_tra_spi = p.vpp_tra_spi + ii
3145 p.tun_sa_out = VppIpsecSA(
3146 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3147 p.auth_algo_vpp_id, p.auth_key,
3148 p.crypt_algo_vpp_id, p.crypt_key,
3149 self.vpp_esp_protocol,
3151 self.pg0.remote_hosts[ii].ip4,
3152 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3153 hop_limit=p.hop_limit)
3154 p.tun_sa_out.add_vpp_config()
3156 p.tun_sa_in = VppIpsecSA(
3157 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3158 p.auth_algo_vpp_id, p.auth_key,
3159 p.crypt_algo_vpp_id, p.crypt_key,
3160 self.vpp_esp_protocol,
3161 self.pg0.remote_hosts[ii].ip4,
3163 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3164 hop_limit=p.hop_limit)
3165 p.tun_sa_in.add_vpp_config()
3167 p.tun_protect = VppIpsecTunProtect(
3172 nh=p.tun_if.remote_hosts[ii].ip4)
3173 p.tun_protect.add_vpp_config()
3174 config_tun_params(p, self.encryption_type, None,
3176 self.pg0.remote_hosts[ii].ip4)
3177 self.multi_params.append(p)
3179 VppIpRoute(self, p.remote_tun_if_host, 32,
3180 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3181 p.tun_if.sw_if_index)]).add_vpp_config()
3183 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3186 p = self.ipv4_params
3187 p.tun_if.unconfig_ip4()
3188 super(TestIpsecMIfEsp4, self).tearDown()
3190 def test_tun_44(self):
3193 for p in self.multi_params:
3194 self.verify_tun_44(p, count=N_PKTS)
3197 class TestIpsecItf6MPLS(TemplateIpsec,
3200 """ IPsec Interface MPLSoIPv6 """
3202 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3205 super(TestIpsecItf6MPLS, self).setUp()
3207 self.tun_if = self.pg0
3210 super(TestIpsecItf6MPLS, self).tearDown()
3212 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3214 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3215 sa.encrypt(MPLS(label=66, ttl=3) /
3216 IPv6(src=src, dst=dst) /
3217 UDP(sport=1166, dport=2233) /
3218 Raw(b'X' * payload_size))
3219 for i in range(count)]
3221 def verify_encrypted6(self, p, sa, rxs):
3224 pkt = sa.decrypt(rx[IPv6])
3225 if not pkt.haslayer(IPv6):
3226 pkt = IP(pkt[Raw].load)
3227 self.assert_packet_checksums_valid(pkt)
3228 self.assert_equal(pkt[MPLS].label, 66)
3229 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3230 except (IndexError, AssertionError):
3231 self.logger.debug(ppp("Unexpected packet:", rx))
3233 self.logger.debug(ppp("Decrypted packet:", pkt))
3238 def test_tun_mpls_o_ip6(self):
3239 """IPSEC interface MPLS over IPv6"""
3242 p = self.ipv6_params
3245 tbl = VppMplsTable(self, 0)
3246 tbl.add_vpp_config()
3248 self.config_network(p)
3249 # deag MPLS routes from the tunnel
3250 r6 = VppMplsRoute(self, 66, 1,
3252 self.pg1.remote_ip6,
3253 self.pg1.sw_if_index)],
3254 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3255 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3256 p.tun_if.sw_if_index,
3257 labels=[VppMplsLabel(66)])])
3258 p.tun_if.enable_mpls()
3260 self.config_sa_tun(p,
3262 self.pg0.remote_ip6)
3263 self.config_protect(p)
3265 self.verify_tun_66(p, count=n_pkts)
3268 p.tun_if.disable_mpls()
3269 self.unconfig_protect(p)
3271 self.unconfig_network(p)
3274 if __name__ == '__main__':
3275 unittest.main(testRunner=VppTestRunner)