5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
15 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19 VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34 IPSEC_API_SAD_FLAG_USE_ESN))
35 crypt_key = mk_scapy_crypt_key(p)
37 p.tun_dst = tun_if.remote_ip
38 p.tun_src = tun_if.local_ip
44 is_default_port = (p.nat_header.dport == 4500)
46 is_default_port = True
49 outbound_nat_header = p.nat_header
51 outbound_nat_header = UDP(sport=p.nat_header.dport,
52 dport=p.nat_header.sport)
53 bind_layers(UDP, ESP, dport=p.nat_header.dport)
55 p.scapy_tun_sa = SecurityAssociation(
56 encryption_type, spi=p.vpp_tun_spi,
57 crypt_algo=p.crypt_algo,
59 auth_algo=p.auth_algo, auth_key=p.auth_key,
60 tunnel_header=ip_class_by_addr_type[p.addr_type](
63 nat_t_header=outbound_nat_header,
65 p.vpp_tun_sa = SecurityAssociation(
66 encryption_type, spi=p.scapy_tun_spi,
67 crypt_algo=p.crypt_algo,
69 auth_algo=p.auth_algo, auth_key=p.auth_key,
70 tunnel_header=ip_class_by_addr_type[p.addr_type](
73 nat_t_header=p.nat_header,
77 def config_tra_params(p, encryption_type, tun_if):
78 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
79 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
80 IPSEC_API_SAD_FLAG_USE_ESN))
81 crypt_key = mk_scapy_crypt_key(p)
82 p.tun_dst = tun_if.remote_ip
83 p.tun_src = tun_if.local_ip
86 is_default_port = (p.nat_header.dport == 4500)
88 is_default_port = True
91 outbound_nat_header = p.nat_header
93 outbound_nat_header = UDP(sport=p.nat_header.dport,
94 dport=p.nat_header.sport)
95 bind_layers(UDP, ESP, dport=p.nat_header.dport)
97 p.scapy_tun_sa = SecurityAssociation(
98 encryption_type, spi=p.vpp_tun_spi,
99 crypt_algo=p.crypt_algo,
101 auth_algo=p.auth_algo, auth_key=p.auth_key,
103 nat_t_header=outbound_nat_header)
104 p.vpp_tun_sa = SecurityAssociation(
105 encryption_type, spi=p.scapy_tun_spi,
106 crypt_algo=p.crypt_algo,
108 auth_algo=p.auth_algo, auth_key=p.auth_key,
110 nat_t_header=p.nat_header)
113 class TemplateIpsec4TunProtect(object):
114 """ IPsec IPv4 Tunnel protect """
116 encryption_type = ESP
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
119 tun4_input_node = "ipsec4-tun-input"
121 def config_sa_tra(self, p):
122 config_tun_params(p, self.encryption_type, p.tun_if)
124 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
125 p.auth_algo_vpp_id, p.auth_key,
126 p.crypt_algo_vpp_id, p.crypt_key,
127 self.vpp_esp_protocol,
129 p.tun_sa_out.add_vpp_config()
131 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
132 p.auth_algo_vpp_id, p.auth_key,
133 p.crypt_algo_vpp_id, p.crypt_key,
134 self.vpp_esp_protocol,
136 p.tun_sa_in.add_vpp_config()
138 def config_sa_tun(self, p):
139 config_tun_params(p, self.encryption_type, p.tun_if)
141 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
142 p.auth_algo_vpp_id, p.auth_key,
143 p.crypt_algo_vpp_id, p.crypt_key,
144 self.vpp_esp_protocol,
145 self.tun_if.local_addr[p.addr_type],
146 self.tun_if.remote_addr[p.addr_type],
148 p.tun_sa_out.add_vpp_config()
150 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
151 p.auth_algo_vpp_id, p.auth_key,
152 p.crypt_algo_vpp_id, p.crypt_key,
153 self.vpp_esp_protocol,
154 self.tun_if.remote_addr[p.addr_type],
155 self.tun_if.local_addr[p.addr_type],
157 p.tun_sa_in.add_vpp_config()
159 def config_protect(self, p):
160 p.tun_protect = VppIpsecTunProtect(self,
164 p.tun_protect.add_vpp_config()
166 def config_network(self, p):
167 if hasattr(p, 'tun_dst'):
170 tun_dst = self.pg0.remote_ip4
171 p.tun_if = VppIpIpTunInterface(self, self.pg0,
174 p.tun_if.add_vpp_config()
176 p.tun_if.config_ip4()
177 p.tun_if.config_ip6()
179 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
180 [VppRoutePath(p.tun_if.remote_ip4,
182 p.route.add_vpp_config()
183 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
184 [VppRoutePath(p.tun_if.remote_ip6,
186 proto=DpoProto.DPO_PROTO_IP6)])
189 def unconfig_network(self, p):
190 p.route.remove_vpp_config()
191 p.tun_if.remove_vpp_config()
193 def unconfig_protect(self, p):
194 p.tun_protect.remove_vpp_config()
196 def unconfig_sa(self, p):
197 p.tun_sa_out.remove_vpp_config()
198 p.tun_sa_in.remove_vpp_config()
201 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
203 """ IPsec tunnel interface tests """
205 encryption_type = ESP
209 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
212 def tearDownClass(cls):
213 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
216 super(TemplateIpsec4TunIfEsp, self).setUp()
218 self.tun_if = self.pg0
222 self.config_network(p)
223 self.config_sa_tra(p)
224 self.config_protect(p)
227 super(TemplateIpsec4TunIfEsp, self).tearDown()
230 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
232 """ IPsec UDP tunnel interface tests """
234 tun4_encrypt_node_name = "esp4-encrypt-tun"
235 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
236 encryption_type = ESP
240 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
243 def tearDownClass(cls):
244 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
246 def verify_encrypted(self, p, sa, rxs):
249 # ensure the UDP ports are correct before we decrypt
251 self.assertTrue(rx.haslayer(UDP))
252 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
253 self.assert_equal(rx[UDP].dport, 4500)
255 pkt = sa.decrypt(rx[IP])
256 if not pkt.haslayer(IP):
257 pkt = IP(pkt[Raw].load)
259 self.assert_packet_checksums_valid(pkt)
260 self.assert_equal(pkt[IP].dst, "1.1.1.1")
261 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
262 except (IndexError, AssertionError):
263 self.logger.debug(ppp("Unexpected packet:", rx))
265 self.logger.debug(ppp("Decrypted packet:", pkt))
270 def config_sa_tra(self, p):
271 config_tun_params(p, self.encryption_type, p.tun_if)
273 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
274 p.auth_algo_vpp_id, p.auth_key,
275 p.crypt_algo_vpp_id, p.crypt_key,
276 self.vpp_esp_protocol,
278 udp_src=p.nat_header.sport,
279 udp_dst=p.nat_header.dport)
280 p.tun_sa_out.add_vpp_config()
282 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
283 p.auth_algo_vpp_id, p.auth_key,
284 p.crypt_algo_vpp_id, p.crypt_key,
285 self.vpp_esp_protocol,
287 udp_src=p.nat_header.sport,
288 udp_dst=p.nat_header.dport)
289 p.tun_sa_in.add_vpp_config()
292 super(TemplateIpsec4TunIfEspUdp, self).setUp()
295 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
296 IPSEC_API_SAD_FLAG_UDP_ENCAP)
297 p.nat_header = UDP(sport=5454, dport=4500)
299 self.tun_if = self.pg0
301 self.config_network(p)
302 self.config_sa_tra(p)
303 self.config_protect(p)
306 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
309 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
310 """ Ipsec ESP - TUN tests """
311 tun4_encrypt_node_name = "esp4-encrypt-tun"
312 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
314 def test_tun_basic64(self):
315 """ ipsec 6o4 tunnel basic test """
316 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
318 self.verify_tun_64(self.params[socket.AF_INET], count=1)
320 def test_tun_burst64(self):
321 """ ipsec 6o4 tunnel basic test """
322 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
324 self.verify_tun_64(self.params[socket.AF_INET], count=257)
326 def test_tun_basic_frag44(self):
327 """ ipsec 4o4 tunnel frag basic test """
328 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
332 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
334 self.verify_tun_44(self.params[socket.AF_INET],
335 count=1, payload_size=1800, n_rx=2)
336 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
340 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
341 """ Ipsec ESP UDP tests """
343 tun4_input_node = "ipsec4-tun-input"
346 super(TestIpsec4TunIfEspUdp, self).setUp()
348 def test_keepalive(self):
349 """ IPSEC NAT Keepalive """
350 self.verify_keepalive(self.ipv4_params)
353 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
354 """ Ipsec ESP UDP GCM tests """
356 tun4_input_node = "ipsec4-tun-input"
359 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
361 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
362 IPSEC_API_INTEG_ALG_NONE)
363 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
364 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
365 p.crypt_algo = "AES-GCM"
367 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
371 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
372 """ Ipsec ESP - TCP tests """
376 class TemplateIpsec6TunProtect(object):
377 """ IPsec IPv6 Tunnel protect """
379 def config_sa_tra(self, p):
380 config_tun_params(p, self.encryption_type, p.tun_if)
382 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
383 p.auth_algo_vpp_id, p.auth_key,
384 p.crypt_algo_vpp_id, p.crypt_key,
385 self.vpp_esp_protocol)
386 p.tun_sa_out.add_vpp_config()
388 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
389 p.auth_algo_vpp_id, p.auth_key,
390 p.crypt_algo_vpp_id, p.crypt_key,
391 self.vpp_esp_protocol)
392 p.tun_sa_in.add_vpp_config()
394 def config_sa_tun(self, p):
395 config_tun_params(p, self.encryption_type, p.tun_if)
397 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
398 p.auth_algo_vpp_id, p.auth_key,
399 p.crypt_algo_vpp_id, p.crypt_key,
400 self.vpp_esp_protocol,
401 self.tun_if.local_addr[p.addr_type],
402 self.tun_if.remote_addr[p.addr_type])
403 p.tun_sa_out.add_vpp_config()
405 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
406 p.auth_algo_vpp_id, p.auth_key,
407 p.crypt_algo_vpp_id, p.crypt_key,
408 self.vpp_esp_protocol,
409 self.tun_if.remote_addr[p.addr_type],
410 self.tun_if.local_addr[p.addr_type])
411 p.tun_sa_in.add_vpp_config()
413 def config_protect(self, p):
414 p.tun_protect = VppIpsecTunProtect(self,
418 p.tun_protect.add_vpp_config()
420 def config_network(self, p):
421 if hasattr(p, 'tun_dst'):
424 tun_dst = self.pg0.remote_ip6
425 p.tun_if = VppIpIpTunInterface(self, self.pg0,
428 p.tun_if.add_vpp_config()
430 p.tun_if.config_ip6()
431 p.tun_if.config_ip4()
433 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
434 [VppRoutePath(p.tun_if.remote_ip6,
436 proto=DpoProto.DPO_PROTO_IP6)])
437 p.route.add_vpp_config()
438 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
439 [VppRoutePath(p.tun_if.remote_ip4,
443 def unconfig_network(self, p):
444 p.route.remove_vpp_config()
445 p.tun_if.remove_vpp_config()
447 def unconfig_protect(self, p):
448 p.tun_protect.remove_vpp_config()
450 def unconfig_sa(self, p):
451 p.tun_sa_out.remove_vpp_config()
452 p.tun_sa_in.remove_vpp_config()
455 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
457 """ IPsec tunnel interface tests """
459 encryption_type = ESP
462 super(TemplateIpsec6TunIfEsp, self).setUp()
464 self.tun_if = self.pg0
467 self.config_network(p)
468 self.config_sa_tra(p)
469 self.config_protect(p)
472 super(TemplateIpsec6TunIfEsp, self).tearDown()
475 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
477 """ Ipsec ESP - TUN tests """
478 tun6_encrypt_node_name = "esp6-encrypt-tun"
479 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
481 def test_tun_basic46(self):
482 """ ipsec 4o6 tunnel basic test """
483 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
484 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
486 def test_tun_burst46(self):
487 """ ipsec 4o6 tunnel burst test """
488 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
489 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
492 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
493 IpsecTun6HandoffTests):
494 """ Ipsec ESP 6 Handoff tests """
495 tun6_encrypt_node_name = "esp6-encrypt-tun"
496 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
498 def test_tun_handoff_66_police(self):
499 """ ESP 6o6 tunnel with policer worker hand-off test """
500 self.vapi.cli("clear errors")
501 self.vapi.cli("clear ipsec sa")
504 p = self.params[socket.AF_INET6]
506 action_tx = PolicerAction(
507 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
509 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
510 conform_action=action_tx,
511 exceed_action=action_tx,
512 violate_action=action_tx)
513 policer.add_vpp_config()
515 # Start policing on tun
516 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
518 for pol_bind in [1, 0]:
519 policer.bind_vpp_config(pol_bind, True)
521 # inject alternately on worker 0 and 1.
522 for worker in [0, 1, 0, 1]:
523 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
525 src=p.remote_tun_if_host,
526 dst=self.pg1.remote_ip6,
528 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
529 self.pg1, worker=worker)
530 self.verify_decrypted6(p, recv_pkts)
531 self.logger.debug(self.vapi.cli("show trace max 100"))
533 stats = policer.get_stats()
534 stats0 = policer.get_stats(worker=0)
535 stats1 = policer.get_stats(worker=1)
538 # First pass: Worker 1, should have done all the policing
539 self.assertEqual(stats, stats1)
541 # Worker 0, should have handed everything off
542 self.assertEqual(stats0['conform_packets'], 0)
543 self.assertEqual(stats0['exceed_packets'], 0)
544 self.assertEqual(stats0['violate_packets'], 0)
546 # Second pass: both workers should have policed equal amounts
547 self.assertGreater(stats1['conform_packets'], 0)
548 self.assertEqual(stats1['exceed_packets'], 0)
549 self.assertGreater(stats1['violate_packets'], 0)
551 self.assertGreater(stats0['conform_packets'], 0)
552 self.assertEqual(stats0['exceed_packets'], 0)
553 self.assertGreater(stats0['violate_packets'], 0)
555 self.assertEqual(stats0['conform_packets'] +
556 stats0['violate_packets'],
557 stats1['conform_packets'] +
558 stats1['violate_packets'])
560 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
561 policer.remove_vpp_config()
564 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
565 IpsecTun4HandoffTests):
566 """ Ipsec ESP 4 Handoff tests """
567 tun4_encrypt_node_name = "esp4-encrypt-tun"
568 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
570 def test_tun_handoff_44_police(self):
571 """ ESP 4o4 tunnel with policer worker hand-off test """
572 self.vapi.cli("clear errors")
573 self.vapi.cli("clear ipsec sa")
576 p = self.params[socket.AF_INET]
578 action_tx = PolicerAction(
579 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
581 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
582 conform_action=action_tx,
583 exceed_action=action_tx,
584 violate_action=action_tx)
585 policer.add_vpp_config()
587 # Start policing on tun
588 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
590 for pol_bind in [1, 0]:
591 policer.bind_vpp_config(pol_bind, True)
593 # inject alternately on worker 0 and 1.
594 for worker in [0, 1, 0, 1]:
595 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
597 src=p.remote_tun_if_host,
598 dst=self.pg1.remote_ip4,
600 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
601 self.pg1, worker=worker)
602 self.verify_decrypted(p, recv_pkts)
603 self.logger.debug(self.vapi.cli("show trace max 100"))
605 stats = policer.get_stats()
606 stats0 = policer.get_stats(worker=0)
607 stats1 = policer.get_stats(worker=1)
610 # First pass: Worker 1, should have done all the policing
611 self.assertEqual(stats, stats1)
613 # Worker 0, should have handed everything off
614 self.assertEqual(stats0['conform_packets'], 0)
615 self.assertEqual(stats0['exceed_packets'], 0)
616 self.assertEqual(stats0['violate_packets'], 0)
618 # Second pass: both workers should have policed equal amounts
619 self.assertGreater(stats1['conform_packets'], 0)
620 self.assertEqual(stats1['exceed_packets'], 0)
621 self.assertGreater(stats1['violate_packets'], 0)
623 self.assertGreater(stats0['conform_packets'], 0)
624 self.assertEqual(stats0['exceed_packets'], 0)
625 self.assertGreater(stats0['violate_packets'], 0)
627 self.assertEqual(stats0['conform_packets'] +
628 stats0['violate_packets'],
629 stats1['conform_packets'] +
630 stats1['violate_packets'])
632 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
633 policer.remove_vpp_config()
636 @tag_fixme_vpp_workers
637 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
640 """ IPsec IPv4 Multi Tunnel interface """
642 encryption_type = ESP
643 tun4_encrypt_node_name = "esp4-encrypt-tun"
644 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
647 super(TestIpsec4MultiTunIfEsp, self).setUp()
649 self.tun_if = self.pg0
651 self.multi_params = []
652 self.pg0.generate_remote_hosts(10)
653 self.pg0.configure_ipv4_neighbors()
656 p = copy.copy(self.ipv4_params)
658 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
659 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
660 p.scapy_tun_spi = p.scapy_tun_spi + ii
661 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
662 p.vpp_tun_spi = p.vpp_tun_spi + ii
664 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
665 p.scapy_tra_spi = p.scapy_tra_spi + ii
666 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
667 p.vpp_tra_spi = p.vpp_tra_spi + ii
668 p.tun_dst = self.pg0.remote_hosts[ii].ip4
670 self.multi_params.append(p)
671 self.config_network(p)
672 self.config_sa_tra(p)
673 self.config_protect(p)
676 super(TestIpsec4MultiTunIfEsp, self).tearDown()
678 def test_tun_44(self):
679 """Multiple IPSEC tunnel interfaces """
680 for p in self.multi_params:
681 self.verify_tun_44(p, count=127)
682 self.assertEqual(p.tun_if.get_rx_stats(), 127)
683 self.assertEqual(p.tun_if.get_tx_stats(), 127)
685 def test_tun_rr_44(self):
686 """ Round-robin packets acrros multiple interface """
688 for p in self.multi_params:
689 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
690 src=p.remote_tun_if_host,
691 dst=self.pg1.remote_ip4)
692 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
694 for rx, p in zip(rxs, self.multi_params):
695 self.verify_decrypted(p, [rx])
698 for p in self.multi_params:
699 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
700 dst=p.remote_tun_if_host)
701 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
703 for rx, p in zip(rxs, self.multi_params):
704 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
707 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
710 """ IPsec IPv4 Tunnel interface all Algos """
712 encryption_type = ESP
713 tun4_encrypt_node_name = "esp4-encrypt-tun"
714 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
717 super(TestIpsec4TunIfEspAll, self).setUp()
719 self.tun_if = self.pg0
722 self.config_network(p)
723 self.config_sa_tra(p)
724 self.config_protect(p)
728 self.unconfig_protect(p)
729 self.unconfig_network(p)
732 super(TestIpsec4TunIfEspAll, self).tearDown()
736 # change the key and the SPI
739 p.crypt_key = b'X' + p.crypt_key[1:]
741 p.scapy_tun_sa_id += 1
744 p.tun_if.local_spi = p.vpp_tun_spi
745 p.tun_if.remote_spi = p.scapy_tun_spi
747 config_tun_params(p, self.encryption_type, p.tun_if)
749 p.tun_sa_out = VppIpsecSA(self,
756 self.vpp_esp_protocol,
759 p.tun_sa_in = VppIpsecSA(self,
766 self.vpp_esp_protocol,
769 p.tun_sa_in.add_vpp_config()
770 p.tun_sa_out.add_vpp_config()
772 self.config_protect(p)
773 np.tun_sa_out.remove_vpp_config()
774 np.tun_sa_in.remove_vpp_config()
775 self.logger.info(self.vapi.cli("sh ipsec sa"))
777 def test_tun_44(self):
778 """IPSEC tunnel all algos """
780 # foreach VPP crypto engine
781 engines = ["ia32", "ipsecmb", "openssl"]
783 # foreach crypto algorithm
784 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
785 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
786 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
787 IPSEC_API_INTEG_ALG_NONE),
788 'scapy-crypto': "AES-GCM",
789 'scapy-integ': "NULL",
790 'key': b"JPjyOWBeVEQiMe7h",
792 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
793 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
794 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
795 IPSEC_API_INTEG_ALG_NONE),
796 'scapy-crypto': "AES-GCM",
797 'scapy-integ': "NULL",
798 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
800 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
801 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
802 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
803 IPSEC_API_INTEG_ALG_NONE),
804 'scapy-crypto': "AES-GCM",
805 'scapy-integ': "NULL",
806 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
808 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
809 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
810 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
811 IPSEC_API_INTEG_ALG_SHA1_96),
812 'scapy-crypto': "AES-CBC",
813 'scapy-integ': "HMAC-SHA1-96",
815 'key': b"JPjyOWBeVEQiMe7h"},
816 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
817 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
818 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
819 IPSEC_API_INTEG_ALG_SHA_512_256),
820 'scapy-crypto': "AES-CBC",
821 'scapy-integ': "SHA2-512-256",
823 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
824 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
825 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
826 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
827 IPSEC_API_INTEG_ALG_SHA_256_128),
828 'scapy-crypto': "AES-CBC",
829 'scapy-integ': "SHA2-256-128",
831 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
832 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
833 IPSEC_API_CRYPTO_ALG_NONE),
834 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
835 IPSEC_API_INTEG_ALG_SHA1_96),
836 'scapy-crypto': "NULL",
837 'scapy-integ': "HMAC-SHA1-96",
839 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
841 for engine in engines:
842 self.vapi.cli("set crypto handler all %s" % engine)
845 # loop through each of the algorithms
848 # with self.subTest(algo=algo['scapy']):
851 p.auth_algo_vpp_id = algo['vpp-integ']
852 p.crypt_algo_vpp_id = algo['vpp-crypto']
853 p.crypt_algo = algo['scapy-crypto']
854 p.auth_algo = algo['scapy-integ']
855 p.crypt_key = algo['key']
856 p.salt = algo['salt']
862 self.verify_tun_44(p, count=127)
865 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
868 """ IPsec IPv4 Tunnel interface no Algos """
870 encryption_type = ESP
871 tun4_encrypt_node_name = "esp4-encrypt-tun"
872 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
875 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
877 self.tun_if = self.pg0
879 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
880 IPSEC_API_INTEG_ALG_NONE)
884 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
885 IPSEC_API_CRYPTO_ALG_NONE)
886 p.crypt_algo = 'NULL'
890 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
892 def test_tun_44(self):
893 """ IPSec SA with NULL algos """
896 self.config_network(p)
897 self.config_sa_tra(p)
898 self.config_protect(p)
900 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
901 dst=p.remote_tun_if_host)
902 self.send_and_assert_no_replies(self.pg1, tx)
904 self.unconfig_protect(p)
906 self.unconfig_network(p)
909 @tag_fixme_vpp_workers
910 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
913 """ IPsec IPv6 Multi Tunnel interface """
915 encryption_type = ESP
916 tun6_encrypt_node_name = "esp6-encrypt-tun"
917 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
920 super(TestIpsec6MultiTunIfEsp, self).setUp()
922 self.tun_if = self.pg0
924 self.multi_params = []
925 self.pg0.generate_remote_hosts(10)
926 self.pg0.configure_ipv6_neighbors()
929 p = copy.copy(self.ipv6_params)
931 p.remote_tun_if_host = "1111::%d" % (ii + 1)
932 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
933 p.scapy_tun_spi = p.scapy_tun_spi + ii
934 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
935 p.vpp_tun_spi = p.vpp_tun_spi + ii
937 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
938 p.scapy_tra_spi = p.scapy_tra_spi + ii
939 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
940 p.vpp_tra_spi = p.vpp_tra_spi + ii
941 p.tun_dst = self.pg0.remote_hosts[ii].ip6
943 self.multi_params.append(p)
944 self.config_network(p)
945 self.config_sa_tra(p)
946 self.config_protect(p)
949 super(TestIpsec6MultiTunIfEsp, self).tearDown()
951 def test_tun_66(self):
952 """Multiple IPSEC tunnel interfaces """
953 for p in self.multi_params:
954 self.verify_tun_66(p, count=127)
955 self.assertEqual(p.tun_if.get_rx_stats(), 127)
956 self.assertEqual(p.tun_if.get_tx_stats(), 127)
959 class TestIpsecGreTebIfEsp(TemplateIpsec,
961 """ Ipsec GRE TEB ESP - TUN tests """
962 tun4_encrypt_node_name = "esp4-encrypt-tun"
963 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
964 encryption_type = ESP
965 omac = "00:11:22:33:44:55"
967 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
969 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
970 sa.encrypt(IP(src=self.pg0.remote_ip4,
971 dst=self.pg0.local_ip4) /
973 Ether(dst=self.omac) /
974 IP(src="1.1.1.1", dst="1.1.1.2") /
975 UDP(sport=1144, dport=2233) /
976 Raw(b'X' * payload_size))
977 for i in range(count)]
979 def gen_pkts(self, sw_intf, src, dst, count=1,
981 return [Ether(dst=self.omac) /
982 IP(src="1.1.1.1", dst="1.1.1.2") /
983 UDP(sport=1144, dport=2233) /
984 Raw(b'X' * payload_size)
985 for i in range(count)]
987 def verify_decrypted(self, p, rxs):
989 self.assert_equal(rx[Ether].dst, self.omac)
990 self.assert_equal(rx[IP].dst, "1.1.1.2")
992 def verify_encrypted(self, p, sa, rxs):
995 pkt = sa.decrypt(rx[IP])
996 if not pkt.haslayer(IP):
997 pkt = IP(pkt[Raw].load)
998 self.assert_packet_checksums_valid(pkt)
999 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1000 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1001 self.assertTrue(pkt.haslayer(GRE))
1003 self.assertEqual(e[Ether].dst, self.omac)
1004 self.assertEqual(e[IP].dst, "1.1.1.2")
1005 except (IndexError, AssertionError):
1006 self.logger.debug(ppp("Unexpected packet:", rx))
1008 self.logger.debug(ppp("Decrypted packet:", pkt))
1014 super(TestIpsecGreTebIfEsp, self).setUp()
1016 self.tun_if = self.pg0
1018 p = self.ipv4_params
1020 bd1 = VppBridgeDomain(self, 1)
1021 bd1.add_vpp_config()
1023 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1024 p.auth_algo_vpp_id, p.auth_key,
1025 p.crypt_algo_vpp_id, p.crypt_key,
1026 self.vpp_esp_protocol,
1028 self.pg0.remote_ip4)
1029 p.tun_sa_out.add_vpp_config()
1031 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1032 p.auth_algo_vpp_id, p.auth_key,
1033 p.crypt_algo_vpp_id, p.crypt_key,
1034 self.vpp_esp_protocol,
1035 self.pg0.remote_ip4,
1037 p.tun_sa_in.add_vpp_config()
1039 p.tun_if = VppGreInterface(self,
1041 self.pg0.remote_ip4,
1042 type=(VppEnum.vl_api_gre_tunnel_type_t.
1043 GRE_API_TUNNEL_TYPE_TEB))
1044 p.tun_if.add_vpp_config()
1046 p.tun_protect = VppIpsecTunProtect(self,
1051 p.tun_protect.add_vpp_config()
1054 p.tun_if.config_ip4()
1055 config_tun_params(p, self.encryption_type, p.tun_if)
1057 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1058 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1060 self.vapi.cli("clear ipsec sa")
1061 self.vapi.cli("sh adj")
1062 self.vapi.cli("sh ipsec tun")
1065 p = self.ipv4_params
1066 p.tun_if.unconfig_ip4()
1067 super(TestIpsecGreTebIfEsp, self).tearDown()
1070 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1072 """ Ipsec GRE TEB ESP - TUN tests """
1073 tun4_encrypt_node_name = "esp4-encrypt-tun"
1074 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1075 encryption_type = ESP
1076 omac = "00:11:22:33:44:55"
1078 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1080 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1081 sa.encrypt(IP(src=self.pg0.remote_ip4,
1082 dst=self.pg0.local_ip4) /
1084 Ether(dst=self.omac) /
1085 IP(src="1.1.1.1", dst="1.1.1.2") /
1086 UDP(sport=1144, dport=2233) /
1087 Raw(b'X' * payload_size))
1088 for i in range(count)]
1090 def gen_pkts(self, sw_intf, src, dst, count=1,
1092 return [Ether(dst=self.omac) /
1094 IP(src="1.1.1.1", dst="1.1.1.2") /
1095 UDP(sport=1144, dport=2233) /
1096 Raw(b'X' * payload_size)
1097 for i in range(count)]
1099 def verify_decrypted(self, p, rxs):
1101 self.assert_equal(rx[Ether].dst, self.omac)
1102 self.assert_equal(rx[Dot1Q].vlan, 11)
1103 self.assert_equal(rx[IP].dst, "1.1.1.2")
1105 def verify_encrypted(self, p, sa, rxs):
1108 pkt = sa.decrypt(rx[IP])
1109 if not pkt.haslayer(IP):
1110 pkt = IP(pkt[Raw].load)
1111 self.assert_packet_checksums_valid(pkt)
1112 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1113 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1114 self.assertTrue(pkt.haslayer(GRE))
1116 self.assertEqual(e[Ether].dst, self.omac)
1117 self.assertFalse(e.haslayer(Dot1Q))
1118 self.assertEqual(e[IP].dst, "1.1.1.2")
1119 except (IndexError, AssertionError):
1120 self.logger.debug(ppp("Unexpected packet:", rx))
1122 self.logger.debug(ppp("Decrypted packet:", pkt))
1128 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1130 self.tun_if = self.pg0
1132 p = self.ipv4_params
1134 bd1 = VppBridgeDomain(self, 1)
1135 bd1.add_vpp_config()
1137 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1138 self.vapi.l2_interface_vlan_tag_rewrite(
1139 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1141 self.pg1_11.admin_up()
1143 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1144 p.auth_algo_vpp_id, p.auth_key,
1145 p.crypt_algo_vpp_id, p.crypt_key,
1146 self.vpp_esp_protocol,
1148 self.pg0.remote_ip4)
1149 p.tun_sa_out.add_vpp_config()
1151 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1152 p.auth_algo_vpp_id, p.auth_key,
1153 p.crypt_algo_vpp_id, p.crypt_key,
1154 self.vpp_esp_protocol,
1155 self.pg0.remote_ip4,
1157 p.tun_sa_in.add_vpp_config()
1159 p.tun_if = VppGreInterface(self,
1161 self.pg0.remote_ip4,
1162 type=(VppEnum.vl_api_gre_tunnel_type_t.
1163 GRE_API_TUNNEL_TYPE_TEB))
1164 p.tun_if.add_vpp_config()
1166 p.tun_protect = VppIpsecTunProtect(self,
1171 p.tun_protect.add_vpp_config()
1174 p.tun_if.config_ip4()
1175 config_tun_params(p, self.encryption_type, p.tun_if)
1177 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1178 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1180 self.vapi.cli("clear ipsec sa")
1183 p = self.ipv4_params
1184 p.tun_if.unconfig_ip4()
1185 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1186 self.pg1_11.admin_down()
1187 self.pg1_11.remove_vpp_config()
1190 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1192 """ Ipsec GRE TEB ESP - Tra tests """
1193 tun4_encrypt_node_name = "esp4-encrypt-tun"
1194 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1195 encryption_type = ESP
1196 omac = "00:11:22:33:44:55"
1198 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1200 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1201 sa.encrypt(IP(src=self.pg0.remote_ip4,
1202 dst=self.pg0.local_ip4) /
1204 Ether(dst=self.omac) /
1205 IP(src="1.1.1.1", dst="1.1.1.2") /
1206 UDP(sport=1144, dport=2233) /
1207 Raw(b'X' * payload_size))
1208 for i in range(count)]
1210 def gen_pkts(self, sw_intf, src, dst, count=1,
1212 return [Ether(dst=self.omac) /
1213 IP(src="1.1.1.1", dst="1.1.1.2") /
1214 UDP(sport=1144, dport=2233) /
1215 Raw(b'X' * payload_size)
1216 for i in range(count)]
1218 def verify_decrypted(self, p, rxs):
1220 self.assert_equal(rx[Ether].dst, self.omac)
1221 self.assert_equal(rx[IP].dst, "1.1.1.2")
1223 def verify_encrypted(self, p, sa, rxs):
1226 pkt = sa.decrypt(rx[IP])
1227 if not pkt.haslayer(IP):
1228 pkt = IP(pkt[Raw].load)
1229 self.assert_packet_checksums_valid(pkt)
1230 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1231 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1232 self.assertTrue(pkt.haslayer(GRE))
1234 self.assertEqual(e[Ether].dst, self.omac)
1235 self.assertEqual(e[IP].dst, "1.1.1.2")
1236 except (IndexError, AssertionError):
1237 self.logger.debug(ppp("Unexpected packet:", rx))
1239 self.logger.debug(ppp("Decrypted packet:", pkt))
1245 super(TestIpsecGreTebIfEspTra, self).setUp()
1247 self.tun_if = self.pg0
1249 p = self.ipv4_params
1251 bd1 = VppBridgeDomain(self, 1)
1252 bd1.add_vpp_config()
1254 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1255 p.auth_algo_vpp_id, p.auth_key,
1256 p.crypt_algo_vpp_id, p.crypt_key,
1257 self.vpp_esp_protocol)
1258 p.tun_sa_out.add_vpp_config()
1260 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1261 p.auth_algo_vpp_id, p.auth_key,
1262 p.crypt_algo_vpp_id, p.crypt_key,
1263 self.vpp_esp_protocol)
1264 p.tun_sa_in.add_vpp_config()
1266 p.tun_if = VppGreInterface(self,
1268 self.pg0.remote_ip4,
1269 type=(VppEnum.vl_api_gre_tunnel_type_t.
1270 GRE_API_TUNNEL_TYPE_TEB))
1271 p.tun_if.add_vpp_config()
1273 p.tun_protect = VppIpsecTunProtect(self,
1278 p.tun_protect.add_vpp_config()
1281 p.tun_if.config_ip4()
1282 config_tra_params(p, self.encryption_type, p.tun_if)
1284 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1285 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1287 self.vapi.cli("clear ipsec sa")
1290 p = self.ipv4_params
1291 p.tun_if.unconfig_ip4()
1292 super(TestIpsecGreTebIfEspTra, self).tearDown()
1295 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1297 """ Ipsec GRE TEB UDP ESP - Tra tests """
1298 tun4_encrypt_node_name = "esp4-encrypt-tun"
1299 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1300 encryption_type = ESP
1301 omac = "00:11:22:33:44:55"
1303 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1305 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1306 sa.encrypt(IP(src=self.pg0.remote_ip4,
1307 dst=self.pg0.local_ip4) /
1309 Ether(dst=self.omac) /
1310 IP(src="1.1.1.1", dst="1.1.1.2") /
1311 UDP(sport=1144, dport=2233) /
1312 Raw(b'X' * payload_size))
1313 for i in range(count)]
1315 def gen_pkts(self, sw_intf, src, dst, count=1,
1317 return [Ether(dst=self.omac) /
1318 IP(src="1.1.1.1", dst="1.1.1.2") /
1319 UDP(sport=1144, dport=2233) /
1320 Raw(b'X' * payload_size)
1321 for i in range(count)]
1323 def verify_decrypted(self, p, rxs):
1325 self.assert_equal(rx[Ether].dst, self.omac)
1326 self.assert_equal(rx[IP].dst, "1.1.1.2")
1328 def verify_encrypted(self, p, sa, rxs):
1330 self.assertTrue(rx.haslayer(UDP))
1331 self.assertEqual(rx[UDP].dport, 4545)
1332 self.assertEqual(rx[UDP].sport, 5454)
1334 pkt = sa.decrypt(rx[IP])
1335 if not pkt.haslayer(IP):
1336 pkt = IP(pkt[Raw].load)
1337 self.assert_packet_checksums_valid(pkt)
1338 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1339 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1340 self.assertTrue(pkt.haslayer(GRE))
1342 self.assertEqual(e[Ether].dst, self.omac)
1343 self.assertEqual(e[IP].dst, "1.1.1.2")
1344 except (IndexError, AssertionError):
1345 self.logger.debug(ppp("Unexpected packet:", rx))
1347 self.logger.debug(ppp("Decrypted packet:", pkt))
1353 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1355 self.tun_if = self.pg0
1357 p = self.ipv4_params
1358 p = self.ipv4_params
1359 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1360 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1361 p.nat_header = UDP(sport=5454, dport=4545)
1363 bd1 = VppBridgeDomain(self, 1)
1364 bd1.add_vpp_config()
1366 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1367 p.auth_algo_vpp_id, p.auth_key,
1368 p.crypt_algo_vpp_id, p.crypt_key,
1369 self.vpp_esp_protocol,
1373 p.tun_sa_out.add_vpp_config()
1375 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1376 p.auth_algo_vpp_id, p.auth_key,
1377 p.crypt_algo_vpp_id, p.crypt_key,
1378 self.vpp_esp_protocol,
1380 VppEnum.vl_api_ipsec_sad_flags_t.
1381 IPSEC_API_SAD_FLAG_IS_INBOUND),
1384 p.tun_sa_in.add_vpp_config()
1386 p.tun_if = VppGreInterface(self,
1388 self.pg0.remote_ip4,
1389 type=(VppEnum.vl_api_gre_tunnel_type_t.
1390 GRE_API_TUNNEL_TYPE_TEB))
1391 p.tun_if.add_vpp_config()
1393 p.tun_protect = VppIpsecTunProtect(self,
1398 p.tun_protect.add_vpp_config()
1401 p.tun_if.config_ip4()
1402 config_tra_params(p, self.encryption_type, p.tun_if)
1404 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1405 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1407 self.vapi.cli("clear ipsec sa")
1408 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1411 p = self.ipv4_params
1412 p.tun_if.unconfig_ip4()
1413 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1416 class TestIpsecGreIfEsp(TemplateIpsec,
1418 """ Ipsec GRE ESP - TUN tests """
1419 tun4_encrypt_node_name = "esp4-encrypt-tun"
1420 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1421 encryption_type = ESP
1423 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1425 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1426 sa.encrypt(IP(src=self.pg0.remote_ip4,
1427 dst=self.pg0.local_ip4) /
1429 IP(src=self.pg1.local_ip4,
1430 dst=self.pg1.remote_ip4) /
1431 UDP(sport=1144, dport=2233) /
1432 Raw(b'X' * payload_size))
1433 for i in range(count)]
1435 def gen_pkts(self, sw_intf, src, dst, count=1,
1437 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1438 IP(src="1.1.1.1", dst="1.1.1.2") /
1439 UDP(sport=1144, dport=2233) /
1440 Raw(b'X' * payload_size)
1441 for i in range(count)]
1443 def verify_decrypted(self, p, rxs):
1445 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1446 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1448 def verify_encrypted(self, p, sa, rxs):
1451 pkt = sa.decrypt(rx[IP])
1452 if not pkt.haslayer(IP):
1453 pkt = IP(pkt[Raw].load)
1454 self.assert_packet_checksums_valid(pkt)
1455 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1456 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1457 self.assertTrue(pkt.haslayer(GRE))
1459 self.assertEqual(e[IP].dst, "1.1.1.2")
1460 except (IndexError, AssertionError):
1461 self.logger.debug(ppp("Unexpected packet:", rx))
1463 self.logger.debug(ppp("Decrypted packet:", pkt))
1469 super(TestIpsecGreIfEsp, self).setUp()
1471 self.tun_if = self.pg0
1473 p = self.ipv4_params
1475 bd1 = VppBridgeDomain(self, 1)
1476 bd1.add_vpp_config()
1478 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1479 p.auth_algo_vpp_id, p.auth_key,
1480 p.crypt_algo_vpp_id, p.crypt_key,
1481 self.vpp_esp_protocol,
1483 self.pg0.remote_ip4)
1484 p.tun_sa_out.add_vpp_config()
1486 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1487 p.auth_algo_vpp_id, p.auth_key,
1488 p.crypt_algo_vpp_id, p.crypt_key,
1489 self.vpp_esp_protocol,
1490 self.pg0.remote_ip4,
1492 p.tun_sa_in.add_vpp_config()
1494 p.tun_if = VppGreInterface(self,
1496 self.pg0.remote_ip4)
1497 p.tun_if.add_vpp_config()
1499 p.tun_protect = VppIpsecTunProtect(self,
1503 p.tun_protect.add_vpp_config()
1506 p.tun_if.config_ip4()
1507 config_tun_params(p, self.encryption_type, p.tun_if)
1509 VppIpRoute(self, "1.1.1.2", 32,
1510 [VppRoutePath(p.tun_if.remote_ip4,
1511 0xffffffff)]).add_vpp_config()
1514 p = self.ipv4_params
1515 p.tun_if.unconfig_ip4()
1516 super(TestIpsecGreIfEsp, self).tearDown()
1519 class TestIpsecGreIfEspTra(TemplateIpsec,
1521 """ Ipsec GRE ESP - TRA tests """
1522 tun4_encrypt_node_name = "esp4-encrypt-tun"
1523 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1524 encryption_type = ESP
1526 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1528 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1529 sa.encrypt(IP(src=self.pg0.remote_ip4,
1530 dst=self.pg0.local_ip4) /
1532 IP(src=self.pg1.local_ip4,
1533 dst=self.pg1.remote_ip4) /
1534 UDP(sport=1144, dport=2233) /
1535 Raw(b'X' * payload_size))
1536 for i in range(count)]
1538 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1540 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1541 sa.encrypt(IP(src=self.pg0.remote_ip4,
1542 dst=self.pg0.local_ip4) /
1544 UDP(sport=1144, dport=2233) /
1545 Raw(b'X' * payload_size))
1546 for i in range(count)]
1548 def gen_pkts(self, sw_intf, src, dst, count=1,
1550 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1551 IP(src="1.1.1.1", dst="1.1.1.2") /
1552 UDP(sport=1144, dport=2233) /
1553 Raw(b'X' * payload_size)
1554 for i in range(count)]
1556 def verify_decrypted(self, p, rxs):
1558 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1559 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1561 def verify_encrypted(self, p, sa, rxs):
1564 pkt = sa.decrypt(rx[IP])
1565 if not pkt.haslayer(IP):
1566 pkt = IP(pkt[Raw].load)
1567 self.assert_packet_checksums_valid(pkt)
1568 self.assertTrue(pkt.haslayer(GRE))
1570 self.assertEqual(e[IP].dst, "1.1.1.2")
1571 except (IndexError, AssertionError):
1572 self.logger.debug(ppp("Unexpected packet:", rx))
1574 self.logger.debug(ppp("Decrypted packet:", pkt))
1580 super(TestIpsecGreIfEspTra, self).setUp()
1582 self.tun_if = self.pg0
1584 p = self.ipv4_params
1586 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1587 p.auth_algo_vpp_id, p.auth_key,
1588 p.crypt_algo_vpp_id, p.crypt_key,
1589 self.vpp_esp_protocol)
1590 p.tun_sa_out.add_vpp_config()
1592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1593 p.auth_algo_vpp_id, p.auth_key,
1594 p.crypt_algo_vpp_id, p.crypt_key,
1595 self.vpp_esp_protocol)
1596 p.tun_sa_in.add_vpp_config()
1598 p.tun_if = VppGreInterface(self,
1600 self.pg0.remote_ip4)
1601 p.tun_if.add_vpp_config()
1603 p.tun_protect = VppIpsecTunProtect(self,
1607 p.tun_protect.add_vpp_config()
1610 p.tun_if.config_ip4()
1611 config_tra_params(p, self.encryption_type, p.tun_if)
1613 VppIpRoute(self, "1.1.1.2", 32,
1614 [VppRoutePath(p.tun_if.remote_ip4,
1615 0xffffffff)]).add_vpp_config()
1618 p = self.ipv4_params
1619 p.tun_if.unconfig_ip4()
1620 super(TestIpsecGreIfEspTra, self).tearDown()
1622 def test_gre_non_ip(self):
1623 p = self.ipv4_params
1624 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1625 src=p.remote_tun_if_host,
1626 dst=self.pg1.remote_ip6)
1627 self.send_and_assert_no_replies(self.tun_if, tx)
1628 node_name = ('/err/%s/unsupported payload' %
1629 self.tun4_decrypt_node_name[0])
1630 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1633 class TestIpsecGre6IfEspTra(TemplateIpsec,
1635 """ Ipsec GRE ESP - TRA tests """
1636 tun6_encrypt_node_name = "esp6-encrypt-tun"
1637 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1638 encryption_type = ESP
1640 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1642 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1643 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1644 dst=self.pg0.local_ip6) /
1646 IPv6(src=self.pg1.local_ip6,
1647 dst=self.pg1.remote_ip6) /
1648 UDP(sport=1144, dport=2233) /
1649 Raw(b'X' * payload_size))
1650 for i in range(count)]
1652 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1654 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1655 IPv6(src="1::1", dst="1::2") /
1656 UDP(sport=1144, dport=2233) /
1657 Raw(b'X' * payload_size)
1658 for i in range(count)]
1660 def verify_decrypted6(self, p, rxs):
1662 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1663 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1665 def verify_encrypted6(self, p, sa, rxs):
1668 pkt = sa.decrypt(rx[IPv6])
1669 if not pkt.haslayer(IPv6):
1670 pkt = IPv6(pkt[Raw].load)
1671 self.assert_packet_checksums_valid(pkt)
1672 self.assertTrue(pkt.haslayer(GRE))
1674 self.assertEqual(e[IPv6].dst, "1::2")
1675 except (IndexError, AssertionError):
1676 self.logger.debug(ppp("Unexpected packet:", rx))
1678 self.logger.debug(ppp("Decrypted packet:", pkt))
1684 super(TestIpsecGre6IfEspTra, self).setUp()
1686 self.tun_if = self.pg0
1688 p = self.ipv6_params
1690 bd1 = VppBridgeDomain(self, 1)
1691 bd1.add_vpp_config()
1693 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1694 p.auth_algo_vpp_id, p.auth_key,
1695 p.crypt_algo_vpp_id, p.crypt_key,
1696 self.vpp_esp_protocol)
1697 p.tun_sa_out.add_vpp_config()
1699 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1700 p.auth_algo_vpp_id, p.auth_key,
1701 p.crypt_algo_vpp_id, p.crypt_key,
1702 self.vpp_esp_protocol)
1703 p.tun_sa_in.add_vpp_config()
1705 p.tun_if = VppGreInterface(self,
1707 self.pg0.remote_ip6)
1708 p.tun_if.add_vpp_config()
1710 p.tun_protect = VppIpsecTunProtect(self,
1714 p.tun_protect.add_vpp_config()
1717 p.tun_if.config_ip6()
1718 config_tra_params(p, self.encryption_type, p.tun_if)
1720 r = VppIpRoute(self, "1::2", 128,
1721 [VppRoutePath(p.tun_if.remote_ip6,
1723 proto=DpoProto.DPO_PROTO_IP6)])
1727 p = self.ipv6_params
1728 p.tun_if.unconfig_ip6()
1729 super(TestIpsecGre6IfEspTra, self).tearDown()
1732 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1733 """ Ipsec mGRE ESP v4 TRA tests """
1734 tun4_encrypt_node_name = "esp4-encrypt-tun"
1735 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1736 encryption_type = ESP
1738 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1740 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1741 sa.encrypt(IP(src=p.tun_dst,
1742 dst=self.pg0.local_ip4) /
1744 IP(src=self.pg1.local_ip4,
1745 dst=self.pg1.remote_ip4) /
1746 UDP(sport=1144, dport=2233) /
1747 Raw(b'X' * payload_size))
1748 for i in range(count)]
1750 def gen_pkts(self, sw_intf, src, dst, count=1,
1752 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1753 IP(src="1.1.1.1", dst=dst) /
1754 UDP(sport=1144, dport=2233) /
1755 Raw(b'X' * payload_size)
1756 for i in range(count)]
1758 def verify_decrypted(self, p, rxs):
1760 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1761 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1763 def verify_encrypted(self, p, sa, rxs):
1766 pkt = sa.decrypt(rx[IP])
1767 if not pkt.haslayer(IP):
1768 pkt = IP(pkt[Raw].load)
1769 self.assert_packet_checksums_valid(pkt)
1770 self.assertTrue(pkt.haslayer(GRE))
1772 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1773 except (IndexError, AssertionError):
1774 self.logger.debug(ppp("Unexpected packet:", rx))
1776 self.logger.debug(ppp("Decrypted packet:", pkt))
1782 super(TestIpsecMGreIfEspTra4, self).setUp()
1785 self.tun_if = self.pg0
1786 p = self.ipv4_params
1787 p.tun_if = VppGreInterface(self,
1790 mode=(VppEnum.vl_api_tunnel_mode_t.
1791 TUNNEL_API_MODE_MP))
1792 p.tun_if.add_vpp_config()
1794 p.tun_if.config_ip4()
1795 p.tun_if.generate_remote_hosts(N_NHS)
1796 self.pg0.generate_remote_hosts(N_NHS)
1797 self.pg0.configure_ipv4_neighbors()
1799 # setup some SAs for several next-hops on the interface
1800 self.multi_params = []
1802 for ii in range(N_NHS):
1803 p = copy.copy(self.ipv4_params)
1805 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1806 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1807 p.scapy_tun_spi = p.scapy_tun_spi + ii
1808 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1809 p.vpp_tun_spi = p.vpp_tun_spi + ii
1811 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1812 p.scapy_tra_spi = p.scapy_tra_spi + ii
1813 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1814 p.vpp_tra_spi = p.vpp_tra_spi + ii
1815 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1816 p.auth_algo_vpp_id, p.auth_key,
1817 p.crypt_algo_vpp_id, p.crypt_key,
1818 self.vpp_esp_protocol)
1819 p.tun_sa_out.add_vpp_config()
1821 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1822 p.auth_algo_vpp_id, p.auth_key,
1823 p.crypt_algo_vpp_id, p.crypt_key,
1824 self.vpp_esp_protocol)
1825 p.tun_sa_in.add_vpp_config()
1827 p.tun_protect = VppIpsecTunProtect(
1832 nh=p.tun_if.remote_hosts[ii].ip4)
1833 p.tun_protect.add_vpp_config()
1834 config_tra_params(p, self.encryption_type, p.tun_if)
1835 self.multi_params.append(p)
1837 VppIpRoute(self, p.remote_tun_if_host, 32,
1838 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1839 p.tun_if.sw_if_index)]).add_vpp_config()
1841 # in this v4 variant add the teibs after the protect
1842 p.teib = VppTeib(self, p.tun_if,
1843 p.tun_if.remote_hosts[ii].ip4,
1844 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1845 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1846 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1849 p = self.ipv4_params
1850 p.tun_if.unconfig_ip4()
1851 super(TestIpsecMGreIfEspTra4, self).tearDown()
1853 def test_tun_44(self):
1856 for p in self.multi_params:
1857 self.verify_tun_44(p, count=N_PKTS)
1858 p.teib.remove_vpp_config()
1859 self.verify_tun_dropped_44(p, count=N_PKTS)
1860 p.teib.add_vpp_config()
1861 self.verify_tun_44(p, count=N_PKTS)
1864 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1865 """ Ipsec mGRE ESP v6 TRA tests """
1866 tun6_encrypt_node_name = "esp6-encrypt-tun"
1867 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1868 encryption_type = ESP
1870 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1872 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1873 sa.encrypt(IPv6(src=p.tun_dst,
1874 dst=self.pg0.local_ip6) /
1876 IPv6(src=self.pg1.local_ip6,
1877 dst=self.pg1.remote_ip6) /
1878 UDP(sport=1144, dport=2233) /
1879 Raw(b'X' * payload_size))
1880 for i in range(count)]
1882 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1884 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1885 IPv6(src="1::1", dst=dst) /
1886 UDP(sport=1144, dport=2233) /
1887 Raw(b'X' * payload_size)
1888 for i in range(count)]
1890 def verify_decrypted6(self, p, rxs):
1892 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1893 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1895 def verify_encrypted6(self, p, sa, rxs):
1898 pkt = sa.decrypt(rx[IPv6])
1899 if not pkt.haslayer(IPv6):
1900 pkt = IPv6(pkt[Raw].load)
1901 self.assert_packet_checksums_valid(pkt)
1902 self.assertTrue(pkt.haslayer(GRE))
1904 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1905 except (IndexError, AssertionError):
1906 self.logger.debug(ppp("Unexpected packet:", rx))
1908 self.logger.debug(ppp("Decrypted packet:", pkt))
1914 super(TestIpsecMGreIfEspTra6, self).setUp()
1916 self.vapi.cli("set logging class ipsec level debug")
1919 self.tun_if = self.pg0
1920 p = self.ipv6_params
1921 p.tun_if = VppGreInterface(self,
1924 mode=(VppEnum.vl_api_tunnel_mode_t.
1925 TUNNEL_API_MODE_MP))
1926 p.tun_if.add_vpp_config()
1928 p.tun_if.config_ip6()
1929 p.tun_if.generate_remote_hosts(N_NHS)
1930 self.pg0.generate_remote_hosts(N_NHS)
1931 self.pg0.configure_ipv6_neighbors()
1933 # setup some SAs for several next-hops on the interface
1934 self.multi_params = []
1936 for ii in range(N_NHS):
1937 p = copy.copy(self.ipv6_params)
1939 p.remote_tun_if_host = "1::%d" % (ii + 1)
1940 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1941 p.scapy_tun_spi = p.scapy_tun_spi + ii
1942 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1943 p.vpp_tun_spi = p.vpp_tun_spi + ii
1945 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1946 p.scapy_tra_spi = p.scapy_tra_spi + ii
1947 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1948 p.vpp_tra_spi = p.vpp_tra_spi + ii
1949 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1950 p.auth_algo_vpp_id, p.auth_key,
1951 p.crypt_algo_vpp_id, p.crypt_key,
1952 self.vpp_esp_protocol)
1953 p.tun_sa_out.add_vpp_config()
1955 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1956 p.auth_algo_vpp_id, p.auth_key,
1957 p.crypt_algo_vpp_id, p.crypt_key,
1958 self.vpp_esp_protocol)
1959 p.tun_sa_in.add_vpp_config()
1961 # in this v6 variant add the teibs first then the protection
1962 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1963 VppTeib(self, p.tun_if,
1964 p.tun_if.remote_hosts[ii].ip6,
1965 p.tun_dst).add_vpp_config()
1967 p.tun_protect = VppIpsecTunProtect(
1972 nh=p.tun_if.remote_hosts[ii].ip6)
1973 p.tun_protect.add_vpp_config()
1974 config_tra_params(p, self.encryption_type, p.tun_if)
1975 self.multi_params.append(p)
1977 VppIpRoute(self, p.remote_tun_if_host, 128,
1978 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1979 p.tun_if.sw_if_index)]).add_vpp_config()
1980 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1982 self.logger.info(self.vapi.cli("sh log"))
1983 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1984 self.logger.info(self.vapi.cli("sh adj 41"))
1987 p = self.ipv6_params
1988 p.tun_if.unconfig_ip6()
1989 super(TestIpsecMGreIfEspTra6, self).tearDown()
1991 def test_tun_66(self):
1993 for p in self.multi_params:
1994 self.verify_tun_66(p, count=63)
1997 @tag_fixme_vpp_workers
1998 class TestIpsec4TunProtect(TemplateIpsec,
1999 TemplateIpsec4TunProtect,
2001 """ IPsec IPv4 Tunnel protect - transport mode"""
2004 super(TestIpsec4TunProtect, self).setUp()
2006 self.tun_if = self.pg0
2009 super(TestIpsec4TunProtect, self).tearDown()
2011 def test_tun_44(self):
2012 """IPSEC tunnel protect"""
2014 p = self.ipv4_params
2016 self.config_network(p)
2017 self.config_sa_tra(p)
2018 self.config_protect(p)
2020 self.verify_tun_44(p, count=127)
2021 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2022 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2024 self.vapi.cli("clear ipsec sa")
2025 self.verify_tun_64(p, count=127)
2026 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2027 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2029 # rekey - create new SAs and update the tunnel protection
2031 np.crypt_key = b'X' + p.crypt_key[1:]
2032 np.scapy_tun_spi += 100
2033 np.scapy_tun_sa_id += 1
2034 np.vpp_tun_spi += 100
2035 np.vpp_tun_sa_id += 1
2036 np.tun_if.local_spi = p.vpp_tun_spi
2037 np.tun_if.remote_spi = p.scapy_tun_spi
2039 self.config_sa_tra(np)
2040 self.config_protect(np)
2043 self.verify_tun_44(np, count=127)
2044 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2045 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2048 self.unconfig_protect(np)
2049 self.unconfig_sa(np)
2050 self.unconfig_network(p)
2053 @tag_fixme_vpp_workers
2054 class TestIpsec4TunProtectUdp(TemplateIpsec,
2055 TemplateIpsec4TunProtect,
2057 """ IPsec IPv4 Tunnel protect - transport mode"""
2060 super(TestIpsec4TunProtectUdp, self).setUp()
2062 self.tun_if = self.pg0
2064 p = self.ipv4_params
2065 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2066 IPSEC_API_SAD_FLAG_UDP_ENCAP)
2067 p.nat_header = UDP(sport=4500, dport=4500)
2068 self.config_network(p)
2069 self.config_sa_tra(p)
2070 self.config_protect(p)
2073 p = self.ipv4_params
2074 self.unconfig_protect(p)
2076 self.unconfig_network(p)
2077 super(TestIpsec4TunProtectUdp, self).tearDown()
2079 def verify_encrypted(self, p, sa, rxs):
2080 # ensure encrypted packets are recieved with the default UDP ports
2082 self.assertEqual(rx[UDP].sport, 4500)
2083 self.assertEqual(rx[UDP].dport, 4500)
2084 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2086 def test_tun_44(self):
2087 """IPSEC UDP tunnel protect"""
2089 p = self.ipv4_params
2091 self.verify_tun_44(p, count=127)
2092 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2093 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2095 def test_keepalive(self):
2096 """ IPSEC NAT Keepalive """
2097 self.verify_keepalive(self.ipv4_params)
2100 @tag_fixme_vpp_workers
2101 class TestIpsec4TunProtectTun(TemplateIpsec,
2102 TemplateIpsec4TunProtect,
2104 """ IPsec IPv4 Tunnel protect - tunnel mode"""
2106 encryption_type = ESP
2107 tun4_encrypt_node_name = "esp4-encrypt-tun"
2108 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2111 super(TestIpsec4TunProtectTun, self).setUp()
2113 self.tun_if = self.pg0
2116 super(TestIpsec4TunProtectTun, self).tearDown()
2118 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2120 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2121 sa.encrypt(IP(src=sw_intf.remote_ip4,
2122 dst=sw_intf.local_ip4) /
2123 IP(src=src, dst=dst) /
2124 UDP(sport=1144, dport=2233) /
2125 Raw(b'X' * payload_size))
2126 for i in range(count)]
2128 def gen_pkts(self, sw_intf, src, dst, count=1,
2130 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2131 IP(src=src, dst=dst) /
2132 UDP(sport=1144, dport=2233) /
2133 Raw(b'X' * payload_size)
2134 for i in range(count)]
2136 def verify_decrypted(self, p, rxs):
2138 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2139 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2140 self.assert_packet_checksums_valid(rx)
2142 def verify_encrypted(self, p, sa, rxs):
2145 pkt = sa.decrypt(rx[IP])
2146 if not pkt.haslayer(IP):
2147 pkt = IP(pkt[Raw].load)
2148 self.assert_packet_checksums_valid(pkt)
2149 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2150 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2151 inner = pkt[IP].payload
2152 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2154 except (IndexError, AssertionError):
2155 self.logger.debug(ppp("Unexpected packet:", rx))
2157 self.logger.debug(ppp("Decrypted packet:", pkt))
2162 def test_tun_44(self):
2163 """IPSEC tunnel protect """
2165 p = self.ipv4_params
2167 self.config_network(p)
2168 self.config_sa_tun(p)
2169 self.config_protect(p)
2171 # also add an output features on the tunnel and physical interface
2172 # so we test they still work
2173 r_all = AclRule(True,
2174 src_prefix="0.0.0.0/0",
2175 dst_prefix="0.0.0.0/0",
2177 a = VppAcl(self, [r_all]).add_vpp_config()
2179 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2180 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2182 self.verify_tun_44(p, count=127)
2184 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2185 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2187 # rekey - create new SAs and update the tunnel protection
2189 np.crypt_key = b'X' + p.crypt_key[1:]
2190 np.scapy_tun_spi += 100
2191 np.scapy_tun_sa_id += 1
2192 np.vpp_tun_spi += 100
2193 np.vpp_tun_sa_id += 1
2194 np.tun_if.local_spi = p.vpp_tun_spi
2195 np.tun_if.remote_spi = p.scapy_tun_spi
2197 self.config_sa_tun(np)
2198 self.config_protect(np)
2201 self.verify_tun_44(np, count=127)
2202 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2203 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2206 self.unconfig_protect(np)
2207 self.unconfig_sa(np)
2208 self.unconfig_network(p)
2211 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2212 TemplateIpsec4TunProtect,
2214 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2216 encryption_type = ESP
2217 tun4_encrypt_node_name = "esp4-encrypt-tun"
2218 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2221 super(TestIpsec4TunProtectTunDrop, self).setUp()
2223 self.tun_if = self.pg0
2226 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2228 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2230 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2231 sa.encrypt(IP(src=sw_intf.remote_ip4,
2233 IP(src=src, dst=dst) /
2234 UDP(sport=1144, dport=2233) /
2235 Raw(b'X' * payload_size))
2236 for i in range(count)]
2238 def test_tun_drop_44(self):
2239 """IPSEC tunnel protect bogus tunnel header """
2241 p = self.ipv4_params
2243 self.config_network(p)
2244 self.config_sa_tun(p)
2245 self.config_protect(p)
2247 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2248 src=p.remote_tun_if_host,
2249 dst=self.pg1.remote_ip4,
2251 self.send_and_assert_no_replies(self.tun_if, tx)
2254 self.unconfig_protect(p)
2256 self.unconfig_network(p)
2259 @tag_fixme_vpp_workers
2260 class TestIpsec6TunProtect(TemplateIpsec,
2261 TemplateIpsec6TunProtect,
2263 """ IPsec IPv6 Tunnel protect - transport mode"""
2265 encryption_type = ESP
2266 tun6_encrypt_node_name = "esp6-encrypt-tun"
2267 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2270 super(TestIpsec6TunProtect, self).setUp()
2272 self.tun_if = self.pg0
2275 super(TestIpsec6TunProtect, self).tearDown()
2277 def test_tun_66(self):
2278 """IPSEC tunnel protect 6o6"""
2280 p = self.ipv6_params
2282 self.config_network(p)
2283 self.config_sa_tra(p)
2284 self.config_protect(p)
2286 self.verify_tun_66(p, count=127)
2287 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2288 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2290 # rekey - create new SAs and update the tunnel protection
2292 np.crypt_key = b'X' + p.crypt_key[1:]
2293 np.scapy_tun_spi += 100
2294 np.scapy_tun_sa_id += 1
2295 np.vpp_tun_spi += 100
2296 np.vpp_tun_sa_id += 1
2297 np.tun_if.local_spi = p.vpp_tun_spi
2298 np.tun_if.remote_spi = p.scapy_tun_spi
2300 self.config_sa_tra(np)
2301 self.config_protect(np)
2304 self.verify_tun_66(np, count=127)
2305 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2306 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2308 # bounce the interface state
2309 p.tun_if.admin_down()
2310 self.verify_drop_tun_66(np, count=127)
2311 node = ('/err/ipsec6-tun-input/%s' %
2312 'ipsec packets received on disabled interface')
2313 self.assertEqual(127, self.statistics.get_err_counter(node))
2315 self.verify_tun_66(np, count=127)
2318 # 1) add two input SAs [old, new]
2319 # 2) swap output SA to [new]
2320 # 3) use only [new] input SA
2322 np3.crypt_key = b'Z' + p.crypt_key[1:]
2323 np3.scapy_tun_spi += 100
2324 np3.scapy_tun_sa_id += 1
2325 np3.vpp_tun_spi += 100
2326 np3.vpp_tun_sa_id += 1
2327 np3.tun_if.local_spi = p.vpp_tun_spi
2328 np3.tun_if.remote_spi = p.scapy_tun_spi
2330 self.config_sa_tra(np3)
2333 p.tun_protect.update_vpp_config(np.tun_sa_out,
2334 [np.tun_sa_in, np3.tun_sa_in])
2335 self.verify_tun_66(np, np, count=127)
2336 self.verify_tun_66(np3, np, count=127)
2339 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2340 [np.tun_sa_in, np3.tun_sa_in])
2341 self.verify_tun_66(np, np3, count=127)
2342 self.verify_tun_66(np3, np3, count=127)
2345 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2347 self.verify_tun_66(np3, np3, count=127)
2348 self.verify_drop_tun_rx_66(np, count=127)
2350 self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
2351 self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
2352 self.unconfig_sa(np)
2355 self.unconfig_protect(np3)
2356 self.unconfig_sa(np3)
2357 self.unconfig_network(p)
2359 def test_tun_46(self):
2360 """IPSEC tunnel protect 4o6"""
2362 p = self.ipv6_params
2364 self.config_network(p)
2365 self.config_sa_tra(p)
2366 self.config_protect(p)
2368 self.verify_tun_46(p, count=127)
2369 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2370 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2373 self.unconfig_protect(p)
2375 self.unconfig_network(p)
2378 @tag_fixme_vpp_workers
2379 class TestIpsec6TunProtectTun(TemplateIpsec,
2380 TemplateIpsec6TunProtect,
2382 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2384 encryption_type = ESP
2385 tun6_encrypt_node_name = "esp6-encrypt-tun"
2386 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2389 super(TestIpsec6TunProtectTun, self).setUp()
2391 self.tun_if = self.pg0
2394 super(TestIpsec6TunProtectTun, self).tearDown()
2396 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2398 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2399 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2400 dst=sw_intf.local_ip6) /
2401 IPv6(src=src, dst=dst) /
2402 UDP(sport=1166, dport=2233) /
2403 Raw(b'X' * payload_size))
2404 for i in range(count)]
2406 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2408 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2409 IPv6(src=src, dst=dst) /
2410 UDP(sport=1166, dport=2233) /
2411 Raw(b'X' * payload_size)
2412 for i in range(count)]
2414 def verify_decrypted6(self, p, rxs):
2416 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2417 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2418 self.assert_packet_checksums_valid(rx)
2420 def verify_encrypted6(self, p, sa, rxs):
2423 pkt = sa.decrypt(rx[IPv6])
2424 if not pkt.haslayer(IPv6):
2425 pkt = IPv6(pkt[Raw].load)
2426 self.assert_packet_checksums_valid(pkt)
2427 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2428 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2429 inner = pkt[IPv6].payload
2430 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2432 except (IndexError, AssertionError):
2433 self.logger.debug(ppp("Unexpected packet:", rx))
2435 self.logger.debug(ppp("Decrypted packet:", pkt))
2440 def test_tun_66(self):
2441 """IPSEC tunnel protect """
2443 p = self.ipv6_params
2445 self.config_network(p)
2446 self.config_sa_tun(p)
2447 self.config_protect(p)
2449 self.verify_tun_66(p, count=127)
2451 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2454 # rekey - create new SAs and update the tunnel protection
2456 np.crypt_key = b'X' + p.crypt_key[1:]
2457 np.scapy_tun_spi += 100
2458 np.scapy_tun_sa_id += 1
2459 np.vpp_tun_spi += 100
2460 np.vpp_tun_sa_id += 1
2461 np.tun_if.local_spi = p.vpp_tun_spi
2462 np.tun_if.remote_spi = p.scapy_tun_spi
2464 self.config_sa_tun(np)
2465 self.config_protect(np)
2468 self.verify_tun_66(np, count=127)
2469 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2473 self.unconfig_protect(np)
2474 self.unconfig_sa(np)
2475 self.unconfig_network(p)
2478 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2479 TemplateIpsec6TunProtect,
2481 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2483 encryption_type = ESP
2484 tun6_encrypt_node_name = "esp6-encrypt-tun"
2485 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2488 super(TestIpsec6TunProtectTunDrop, self).setUp()
2490 self.tun_if = self.pg0
2493 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2495 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2497 # the IP destination of the revelaed packet does not match
2498 # that assigned to the tunnel
2499 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2500 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2502 IPv6(src=src, dst=dst) /
2503 UDP(sport=1144, dport=2233) /
2504 Raw(b'X' * payload_size))
2505 for i in range(count)]
2507 def test_tun_drop_66(self):
2508 """IPSEC 6 tunnel protect bogus tunnel header """
2510 p = self.ipv6_params
2512 self.config_network(p)
2513 self.config_sa_tun(p)
2514 self.config_protect(p)
2516 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2517 src=p.remote_tun_if_host,
2518 dst=self.pg1.remote_ip6,
2520 self.send_and_assert_no_replies(self.tun_if, tx)
2522 self.unconfig_protect(p)
2524 self.unconfig_network(p)
2527 class TemplateIpsecItf4(object):
2528 """ IPsec Interface IPv4 """
2530 encryption_type = ESP
2531 tun4_encrypt_node_name = "esp4-encrypt-tun"
2532 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2533 tun4_input_node = "ipsec4-tun-input"
2535 def config_sa_tun(self, p, src, dst):
2536 config_tun_params(p, self.encryption_type, None, src, dst)
2538 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2539 p.auth_algo_vpp_id, p.auth_key,
2540 p.crypt_algo_vpp_id, p.crypt_key,
2541 self.vpp_esp_protocol,
2544 p.tun_sa_out.add_vpp_config()
2546 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2547 p.auth_algo_vpp_id, p.auth_key,
2548 p.crypt_algo_vpp_id, p.crypt_key,
2549 self.vpp_esp_protocol,
2552 p.tun_sa_in.add_vpp_config()
2554 def config_protect(self, p):
2555 p.tun_protect = VppIpsecTunProtect(self,
2559 p.tun_protect.add_vpp_config()
2561 def config_network(self, p, instance=0xffffffff):
2562 p.tun_if = VppIpsecInterface(self, instance=instance)
2564 p.tun_if.add_vpp_config()
2566 p.tun_if.config_ip4()
2567 p.tun_if.config_ip6()
2569 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2570 [VppRoutePath(p.tun_if.remote_ip4,
2572 p.route.add_vpp_config()
2573 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2574 [VppRoutePath(p.tun_if.remote_ip6,
2576 proto=DpoProto.DPO_PROTO_IP6)])
2579 def unconfig_network(self, p):
2580 p.route.remove_vpp_config()
2581 p.tun_if.remove_vpp_config()
2583 def unconfig_protect(self, p):
2584 p.tun_protect.remove_vpp_config()
2586 def unconfig_sa(self, p):
2587 p.tun_sa_out.remove_vpp_config()
2588 p.tun_sa_in.remove_vpp_config()
2591 @tag_fixme_vpp_workers
2592 class TestIpsecItf4(TemplateIpsec,
2595 """ IPsec Interface IPv4 """
2598 super(TestIpsecItf4, self).setUp()
2600 self.tun_if = self.pg0
2603 super(TestIpsecItf4, self).tearDown()
2605 def test_tun_instance_44(self):
2606 p = self.ipv4_params
2607 self.config_network(p, instance=3)
2609 with self.assertRaises(CliFailedCommandError):
2610 self.vapi.cli("show interface ipsec0")
2612 output = self.vapi.cli("show interface ipsec3")
2613 self.assertTrue("unknown" not in output)
2615 self.unconfig_network(p)
2617 def test_tun_44(self):
2618 """IPSEC interface IPv4"""
2621 p = self.ipv4_params
2623 self.config_network(p)
2624 config_tun_params(p, self.encryption_type, None,
2626 self.pg0.remote_ip4)
2627 self.verify_tun_dropped_44(p, count=n_pkts)
2628 self.config_sa_tun(p,
2630 self.pg0.remote_ip4)
2631 self.config_protect(p)
2633 self.verify_tun_44(p, count=n_pkts)
2634 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2635 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2637 p.tun_if.admin_down()
2638 self.verify_tun_dropped_44(p, count=n_pkts)
2640 self.verify_tun_44(p, count=n_pkts)
2642 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2643 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2645 # it's a v6 packet when its encrypted
2646 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2648 self.verify_tun_64(p, count=n_pkts)
2649 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2650 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2652 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2654 self.vapi.cli("clear interfaces")
2656 # rekey - create new SAs and update the tunnel protection
2658 np.crypt_key = b'X' + p.crypt_key[1:]
2659 np.scapy_tun_spi += 100
2660 np.scapy_tun_sa_id += 1
2661 np.vpp_tun_spi += 100
2662 np.vpp_tun_sa_id += 1
2663 np.tun_if.local_spi = p.vpp_tun_spi
2664 np.tun_if.remote_spi = p.scapy_tun_spi
2666 self.config_sa_tun(np,
2668 self.pg0.remote_ip4)
2669 self.config_protect(np)
2672 self.verify_tun_44(np, count=n_pkts)
2673 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2674 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2677 self.unconfig_protect(np)
2678 self.unconfig_sa(np)
2679 self.unconfig_network(p)
2681 def test_tun_44_null(self):
2682 """IPSEC interface IPv4 NULL auth/crypto"""
2685 p = copy.copy(self.ipv4_params)
2687 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2688 IPSEC_API_INTEG_ALG_NONE)
2689 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2690 IPSEC_API_CRYPTO_ALG_NONE)
2691 p.crypt_algo = "NULL"
2692 p.auth_algo = "NULL"
2694 self.config_network(p)
2695 self.config_sa_tun(p,
2697 self.pg0.remote_ip4)
2698 self.config_protect(p)
2700 self.logger.info(self.vapi.cli("sh ipsec sa"))
2701 self.verify_tun_44(p, count=n_pkts)
2704 self.unconfig_protect(p)
2706 self.unconfig_network(p)
2708 def test_tun_44_police(self):
2709 """IPSEC interface IPv4 with input policer"""
2711 p = self.ipv4_params
2713 self.config_network(p)
2714 self.config_sa_tun(p,
2716 self.pg0.remote_ip4)
2717 self.config_protect(p)
2719 action_tx = PolicerAction(
2720 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2722 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2723 conform_action=action_tx,
2724 exceed_action=action_tx,
2725 violate_action=action_tx)
2726 policer.add_vpp_config()
2728 # Start policing on tun
2729 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2731 self.verify_tun_44(p, count=n_pkts)
2732 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2733 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2735 stats = policer.get_stats()
2737 # Single rate, 2 colour policer - expect conform, violate but no exceed
2738 self.assertGreater(stats['conform_packets'], 0)
2739 self.assertEqual(stats['exceed_packets'], 0)
2740 self.assertGreater(stats['violate_packets'], 0)
2742 # Stop policing on tun
2743 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2744 self.verify_tun_44(p, count=n_pkts)
2746 # No new policer stats
2747 statsnew = policer.get_stats()
2748 self.assertEqual(stats, statsnew)
2751 policer.remove_vpp_config()
2752 self.unconfig_protect(p)
2754 self.unconfig_network(p)
2757 class TestIpsecItf4MPLS(TemplateIpsec,
2760 """ IPsec Interface MPLSoIPv4 """
2762 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2765 super(TestIpsecItf4MPLS, self).setUp()
2767 self.tun_if = self.pg0
2770 super(TestIpsecItf4MPLS, self).tearDown()
2772 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2774 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2775 sa.encrypt(MPLS(label=44, ttl=3) /
2776 IP(src=src, dst=dst) /
2777 UDP(sport=1166, dport=2233) /
2778 Raw(b'X' * payload_size))
2779 for i in range(count)]
2781 def verify_encrypted(self, p, sa, rxs):
2784 pkt = sa.decrypt(rx[IP])
2785 if not pkt.haslayer(IP):
2786 pkt = IP(pkt[Raw].load)
2787 self.assert_packet_checksums_valid(pkt)
2788 self.assert_equal(pkt[MPLS].label, 44)
2789 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2790 except (IndexError, AssertionError):
2791 self.logger.debug(ppp("Unexpected packet:", rx))
2793 self.logger.debug(ppp("Decrypted packet:", pkt))
2798 def test_tun_mpls_o_ip4(self):
2799 """IPSEC interface MPLS over IPv4"""
2802 p = self.ipv4_params
2805 tbl = VppMplsTable(self, 0)
2806 tbl.add_vpp_config()
2808 self.config_network(p)
2809 # deag MPLS routes from the tunnel
2810 r4 = VppMplsRoute(self, 44, 1,
2812 self.pg1.remote_ip4,
2813 self.pg1.sw_if_index)]).add_vpp_config()
2814 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2815 p.tun_if.sw_if_index,
2816 labels=[VppMplsLabel(44)])])
2817 p.tun_if.enable_mpls()
2819 self.config_sa_tun(p,
2821 self.pg0.remote_ip4)
2822 self.config_protect(p)
2824 self.verify_tun_44(p, count=n_pkts)
2827 p.tun_if.disable_mpls()
2828 self.unconfig_protect(p)
2830 self.unconfig_network(p)
2833 class TemplateIpsecItf6(object):
2834 """ IPsec Interface IPv6 """
2836 encryption_type = ESP
2837 tun6_encrypt_node_name = "esp6-encrypt-tun"
2838 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2839 tun6_input_node = "ipsec6-tun-input"
2841 def config_sa_tun(self, p, src, dst):
2842 config_tun_params(p, self.encryption_type, None, src, dst)
2844 if not hasattr(p, 'tun_flags'):
2846 if not hasattr(p, 'hop_limit'):
2849 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2850 p.auth_algo_vpp_id, p.auth_key,
2851 p.crypt_algo_vpp_id, p.crypt_key,
2852 self.vpp_esp_protocol,
2855 tun_flags=p.tun_flags,
2856 hop_limit=p.hop_limit)
2857 p.tun_sa_out.add_vpp_config()
2859 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2860 p.auth_algo_vpp_id, p.auth_key,
2861 p.crypt_algo_vpp_id, p.crypt_key,
2862 self.vpp_esp_protocol,
2865 p.tun_sa_in.add_vpp_config()
2867 def config_protect(self, p):
2868 p.tun_protect = VppIpsecTunProtect(self,
2872 p.tun_protect.add_vpp_config()
2874 def config_network(self, p):
2875 p.tun_if = VppIpsecInterface(self)
2877 p.tun_if.add_vpp_config()
2879 p.tun_if.config_ip4()
2880 p.tun_if.config_ip6()
2882 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2883 [VppRoutePath(p.tun_if.remote_ip4,
2887 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2888 [VppRoutePath(p.tun_if.remote_ip6,
2890 proto=DpoProto.DPO_PROTO_IP6)])
2891 p.route.add_vpp_config()
2893 def unconfig_network(self, p):
2894 p.route.remove_vpp_config()
2895 p.tun_if.remove_vpp_config()
2897 def unconfig_protect(self, p):
2898 p.tun_protect.remove_vpp_config()
2900 def unconfig_sa(self, p):
2901 p.tun_sa_out.remove_vpp_config()
2902 p.tun_sa_in.remove_vpp_config()
2905 @tag_fixme_vpp_workers
2906 class TestIpsecItf6(TemplateIpsec,
2909 """ IPsec Interface IPv6 """
2912 super(TestIpsecItf6, self).setUp()
2914 self.tun_if = self.pg0
2917 super(TestIpsecItf6, self).tearDown()
2919 def test_tun_66(self):
2920 """IPSEC interface IPv6"""
2922 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2924 p = self.ipv6_params
2925 p.inner_hop_limit = 24
2926 p.outer_hop_limit = 23
2927 p.outer_flow_label = 243224
2928 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2930 self.config_network(p)
2931 config_tun_params(p, self.encryption_type, None,
2933 self.pg0.remote_ip6)
2934 self.verify_drop_tun_66(p, count=n_pkts)
2935 self.config_sa_tun(p,
2937 self.pg0.remote_ip6)
2938 self.config_protect(p)
2940 self.verify_tun_66(p, count=n_pkts)
2941 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2942 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2944 p.tun_if.admin_down()
2945 self.verify_drop_tun_66(p, count=n_pkts)
2947 self.verify_tun_66(p, count=n_pkts)
2949 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2950 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2952 # it's a v4 packet when its encrypted
2953 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2955 self.verify_tun_46(p, count=n_pkts)
2956 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2957 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2959 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2961 self.vapi.cli("clear interfaces")
2963 # rekey - create new SAs and update the tunnel protection
2965 np.crypt_key = b'X' + p.crypt_key[1:]
2966 np.scapy_tun_spi += 100
2967 np.scapy_tun_sa_id += 1
2968 np.vpp_tun_spi += 100
2969 np.vpp_tun_sa_id += 1
2970 np.tun_if.local_spi = p.vpp_tun_spi
2971 np.tun_if.remote_spi = p.scapy_tun_spi
2972 np.inner_hop_limit = 24
2973 np.outer_hop_limit = 128
2974 np.inner_flow_label = 0xabcde
2975 np.outer_flow_label = 0xabcde
2977 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2979 self.config_sa_tun(np,
2981 self.pg0.remote_ip6)
2982 self.config_protect(np)
2985 self.verify_tun_66(np, count=n_pkts)
2986 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2987 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2990 self.unconfig_protect(np)
2991 self.unconfig_sa(np)
2992 self.unconfig_network(p)
2994 def test_tun_66_police(self):
2995 """IPSEC interface IPv6 with input policer"""
2996 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2998 p = self.ipv6_params
2999 p.inner_hop_limit = 24
3000 p.outer_hop_limit = 23
3001 p.outer_flow_label = 243224
3002 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
3004 self.config_network(p)
3005 self.config_sa_tun(p,
3007 self.pg0.remote_ip6)
3008 self.config_protect(p)
3010 action_tx = PolicerAction(
3011 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
3013 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
3014 conform_action=action_tx,
3015 exceed_action=action_tx,
3016 violate_action=action_tx)
3017 policer.add_vpp_config()
3019 # Start policing on tun
3020 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
3022 self.verify_tun_66(p, count=n_pkts)
3023 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3024 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3026 stats = policer.get_stats()
3028 # Single rate, 2 colour policer - expect conform, violate but no exceed
3029 self.assertGreater(stats['conform_packets'], 0)
3030 self.assertEqual(stats['exceed_packets'], 0)
3031 self.assertGreater(stats['violate_packets'], 0)
3033 # Stop policing on tun
3034 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3035 self.verify_tun_66(p, count=n_pkts)
3037 # No new policer stats
3038 statsnew = policer.get_stats()
3039 self.assertEqual(stats, statsnew)
3042 policer.remove_vpp_config()
3043 self.unconfig_protect(p)
3045 self.unconfig_network(p)
3048 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3049 """ Ipsec P2MP ESP v4 tests """
3050 tun4_encrypt_node_name = "esp4-encrypt-tun"
3051 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3052 encryption_type = ESP
3054 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3056 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3057 sa.encrypt(IP(src=self.pg1.local_ip4,
3058 dst=self.pg1.remote_ip4) /
3059 UDP(sport=1144, dport=2233) /
3060 Raw(b'X' * payload_size))
3061 for i in range(count)]
3063 def gen_pkts(self, sw_intf, src, dst, count=1,
3065 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3066 IP(src="1.1.1.1", dst=dst) /
3067 UDP(sport=1144, dport=2233) /
3068 Raw(b'X' * payload_size)
3069 for i in range(count)]
3071 def verify_decrypted(self, p, rxs):
3073 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3074 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3076 def verify_encrypted(self, p, sa, rxs):
3079 self.assertEqual(rx[IP].tos,
3080 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3081 self.assertEqual(rx[IP].ttl, p.hop_limit)
3082 pkt = sa.decrypt(rx[IP])
3083 if not pkt.haslayer(IP):
3084 pkt = IP(pkt[Raw].load)
3085 self.assert_packet_checksums_valid(pkt)
3087 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3088 except (IndexError, AssertionError):
3089 self.logger.debug(ppp("Unexpected packet:", rx))
3091 self.logger.debug(ppp("Decrypted packet:", pkt))
3097 super(TestIpsecMIfEsp4, self).setUp()
3100 self.tun_if = self.pg0
3101 p = self.ipv4_params
3102 p.tun_if = VppIpsecInterface(self,
3103 mode=(VppEnum.vl_api_tunnel_mode_t.
3104 TUNNEL_API_MODE_MP))
3105 p.tun_if.add_vpp_config()
3107 p.tun_if.config_ip4()
3108 p.tun_if.unconfig_ip4()
3109 p.tun_if.config_ip4()
3110 p.tun_if.generate_remote_hosts(N_NHS)
3111 self.pg0.generate_remote_hosts(N_NHS)
3112 self.pg0.configure_ipv4_neighbors()
3114 r_all = AclRule(True,
3115 src_prefix="0.0.0.0/0",
3116 dst_prefix="0.0.0.0/0",
3118 a = VppAcl(self, [r_all]).add_vpp_config()
3120 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3121 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3123 # setup some SAs for several next-hops on the interface
3124 self.multi_params = []
3126 for ii in range(N_NHS):
3127 p = copy.copy(self.ipv4_params)
3129 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3130 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3131 p.scapy_tun_spi = p.scapy_tun_spi + ii
3132 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3133 p.vpp_tun_spi = p.vpp_tun_spi + ii
3135 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3136 p.scapy_tra_spi = p.scapy_tra_spi + ii
3137 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3138 p.vpp_tra_spi = p.vpp_tra_spi + ii
3140 p.tun_sa_out = VppIpsecSA(
3141 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3142 p.auth_algo_vpp_id, p.auth_key,
3143 p.crypt_algo_vpp_id, p.crypt_key,
3144 self.vpp_esp_protocol,
3146 self.pg0.remote_hosts[ii].ip4,
3147 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3148 hop_limit=p.hop_limit)
3149 p.tun_sa_out.add_vpp_config()
3151 p.tun_sa_in = VppIpsecSA(
3152 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3153 p.auth_algo_vpp_id, p.auth_key,
3154 p.crypt_algo_vpp_id, p.crypt_key,
3155 self.vpp_esp_protocol,
3156 self.pg0.remote_hosts[ii].ip4,
3158 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3159 hop_limit=p.hop_limit)
3160 p.tun_sa_in.add_vpp_config()
3162 p.tun_protect = VppIpsecTunProtect(
3167 nh=p.tun_if.remote_hosts[ii].ip4)
3168 p.tun_protect.add_vpp_config()
3169 config_tun_params(p, self.encryption_type, None,
3171 self.pg0.remote_hosts[ii].ip4)
3172 self.multi_params.append(p)
3174 p.via_tun_route = VppIpRoute(
3175 self, p.remote_tun_if_host, 32,
3176 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3177 p.tun_if.sw_if_index)]).add_vpp_config()
3179 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3182 p = self.ipv4_params
3183 p.tun_if.unconfig_ip4()
3184 super(TestIpsecMIfEsp4, self).tearDown()
3186 def test_tun_44(self):
3189 for p in self.multi_params:
3190 self.verify_tun_44(p, count=N_PKTS)
3192 # remove one tunnel protect, the rest should still work
3193 self.multi_params[0].tun_protect.remove_vpp_config()
3194 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3195 self.multi_params[0].via_tun_route.remove_vpp_config()
3196 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3198 for p in self.multi_params[1:]:
3199 self.verify_tun_44(p, count=N_PKTS)
3201 self.multi_params[0].tun_protect.add_vpp_config()
3202 self.multi_params[0].via_tun_route.add_vpp_config()
3204 for p in self.multi_params:
3205 self.verify_tun_44(p, count=N_PKTS)
3208 class TestIpsecItf6MPLS(TemplateIpsec,
3211 """ IPsec Interface MPLSoIPv6 """
3213 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3216 super(TestIpsecItf6MPLS, self).setUp()
3218 self.tun_if = self.pg0
3221 super(TestIpsecItf6MPLS, self).tearDown()
3223 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3225 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3226 sa.encrypt(MPLS(label=66, ttl=3) /
3227 IPv6(src=src, dst=dst) /
3228 UDP(sport=1166, dport=2233) /
3229 Raw(b'X' * payload_size))
3230 for i in range(count)]
3232 def verify_encrypted6(self, p, sa, rxs):
3235 pkt = sa.decrypt(rx[IPv6])
3236 if not pkt.haslayer(IPv6):
3237 pkt = IP(pkt[Raw].load)
3238 self.assert_packet_checksums_valid(pkt)
3239 self.assert_equal(pkt[MPLS].label, 66)
3240 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3241 except (IndexError, AssertionError):
3242 self.logger.debug(ppp("Unexpected packet:", rx))
3244 self.logger.debug(ppp("Decrypted packet:", pkt))
3249 def test_tun_mpls_o_ip6(self):
3250 """IPSEC interface MPLS over IPv6"""
3253 p = self.ipv6_params
3256 tbl = VppMplsTable(self, 0)
3257 tbl.add_vpp_config()
3259 self.config_network(p)
3260 # deag MPLS routes from the tunnel
3261 r6 = VppMplsRoute(self, 66, 1,
3263 self.pg1.remote_ip6,
3264 self.pg1.sw_if_index)],
3265 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3266 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3267 p.tun_if.sw_if_index,
3268 labels=[VppMplsLabel(66)])])
3269 p.tun_if.enable_mpls()
3271 self.config_sa_tun(p,
3273 self.pg0.remote_ip6)
3274 self.config_protect(p)
3276 self.verify_tun_66(p, count=n_pkts)
3279 p.tun_if.disable_mpls()
3280 self.unconfig_protect(p)
3282 self.unconfig_network(p)
3285 if __name__ == '__main__':
3286 unittest.main(testRunner=VppTestRunner)