5 from scapy.layers.ipsec import SecurityAssociation, ESP
6 from scapy.layers.l2 import Ether, GRE, Dot1Q
7 from scapy.packet import Raw, bind_layers
8 from scapy.layers.inet import IP, UDP
9 from scapy.layers.inet6 import IPv6
10 from scapy.contrib.mpls import MPLS
11 from framework import tag_fixme_vpp_workers
12 from framework import VppTestRunner
13 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
14 IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
15 IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
16 from vpp_gre_interface import VppGreInterface
17 from vpp_ipip_tun_interface import VppIpIpTunInterface
18 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
19 VppMplsTable, VppMplsRoute, FibPathProto
20 from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
21 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
22 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
23 from vpp_teib import VppTeib
25 from vpp_papi import VppEnum
26 from vpp_papi_provider import CliFailedCommandError
27 from vpp_acl import AclRule, VppAcl, VppAclInterface
28 from vpp_policer import PolicerAction, VppPolicer
31 def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
32 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
33 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
34 IPSEC_API_SAD_FLAG_USE_ESN))
35 crypt_key = mk_scapy_crypt_key(p)
37 p.tun_dst = tun_if.remote_ip
38 p.tun_src = tun_if.local_ip
44 is_default_port = (p.nat_header.dport == 4500)
46 is_default_port = True
49 outbound_nat_header = p.nat_header
51 outbound_nat_header = UDP(sport=p.nat_header.dport,
52 dport=p.nat_header.sport)
53 bind_layers(UDP, ESP, dport=p.nat_header.dport)
55 p.scapy_tun_sa = SecurityAssociation(
56 encryption_type, spi=p.vpp_tun_spi,
57 crypt_algo=p.crypt_algo,
59 auth_algo=p.auth_algo, auth_key=p.auth_key,
60 tunnel_header=ip_class_by_addr_type[p.addr_type](
63 nat_t_header=outbound_nat_header,
65 p.vpp_tun_sa = SecurityAssociation(
66 encryption_type, spi=p.scapy_tun_spi,
67 crypt_algo=p.crypt_algo,
69 auth_algo=p.auth_algo, auth_key=p.auth_key,
70 tunnel_header=ip_class_by_addr_type[p.addr_type](
73 nat_t_header=p.nat_header,
77 def config_tra_params(p, encryption_type, tun_if):
78 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
79 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
80 IPSEC_API_SAD_FLAG_USE_ESN))
81 crypt_key = mk_scapy_crypt_key(p)
82 p.tun_dst = tun_if.remote_ip
83 p.tun_src = tun_if.local_ip
86 is_default_port = (p.nat_header.dport == 4500)
88 is_default_port = True
91 outbound_nat_header = p.nat_header
93 outbound_nat_header = UDP(sport=p.nat_header.dport,
94 dport=p.nat_header.sport)
95 bind_layers(UDP, ESP, dport=p.nat_header.dport)
97 p.scapy_tun_sa = SecurityAssociation(
98 encryption_type, spi=p.vpp_tun_spi,
99 crypt_algo=p.crypt_algo,
101 auth_algo=p.auth_algo, auth_key=p.auth_key,
103 nat_t_header=outbound_nat_header)
104 p.vpp_tun_sa = SecurityAssociation(
105 encryption_type, spi=p.scapy_tun_spi,
106 crypt_algo=p.crypt_algo,
108 auth_algo=p.auth_algo, auth_key=p.auth_key,
110 nat_t_header=p.nat_header)
113 class TemplateIpsec4TunProtect(object):
114 """ IPsec IPv4 Tunnel protect """
116 encryption_type = ESP
117 tun4_encrypt_node_name = "esp4-encrypt-tun"
118 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
119 tun4_input_node = "ipsec4-tun-input"
121 def config_sa_tra(self, p):
122 config_tun_params(p, self.encryption_type, p.tun_if)
124 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
125 p.auth_algo_vpp_id, p.auth_key,
126 p.crypt_algo_vpp_id, p.crypt_key,
127 self.vpp_esp_protocol,
129 p.tun_sa_out.add_vpp_config()
131 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
132 p.auth_algo_vpp_id, p.auth_key,
133 p.crypt_algo_vpp_id, p.crypt_key,
134 self.vpp_esp_protocol,
136 p.tun_sa_in.add_vpp_config()
138 def config_sa_tun(self, p):
139 config_tun_params(p, self.encryption_type, p.tun_if)
141 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
142 p.auth_algo_vpp_id, p.auth_key,
143 p.crypt_algo_vpp_id, p.crypt_key,
144 self.vpp_esp_protocol,
145 self.tun_if.local_addr[p.addr_type],
146 self.tun_if.remote_addr[p.addr_type],
148 p.tun_sa_out.add_vpp_config()
150 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
151 p.auth_algo_vpp_id, p.auth_key,
152 p.crypt_algo_vpp_id, p.crypt_key,
153 self.vpp_esp_protocol,
154 self.tun_if.remote_addr[p.addr_type],
155 self.tun_if.local_addr[p.addr_type],
157 p.tun_sa_in.add_vpp_config()
159 def config_protect(self, p):
160 p.tun_protect = VppIpsecTunProtect(self,
164 p.tun_protect.add_vpp_config()
166 def config_network(self, p):
167 if hasattr(p, 'tun_dst'):
170 tun_dst = self.pg0.remote_ip4
171 p.tun_if = VppIpIpTunInterface(self, self.pg0,
174 p.tun_if.add_vpp_config()
176 p.tun_if.config_ip4()
177 p.tun_if.config_ip6()
179 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
180 [VppRoutePath(p.tun_if.remote_ip4,
182 p.route.add_vpp_config()
183 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
184 [VppRoutePath(p.tun_if.remote_ip6,
186 proto=DpoProto.DPO_PROTO_IP6)])
189 def unconfig_network(self, p):
190 p.route.remove_vpp_config()
191 p.tun_if.remove_vpp_config()
193 def unconfig_protect(self, p):
194 p.tun_protect.remove_vpp_config()
196 def unconfig_sa(self, p):
197 p.tun_sa_out.remove_vpp_config()
198 p.tun_sa_in.remove_vpp_config()
201 class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
203 """ IPsec tunnel interface tests """
205 encryption_type = ESP
209 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
212 def tearDownClass(cls):
213 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
216 super(TemplateIpsec4TunIfEsp, self).setUp()
218 self.tun_if = self.pg0
222 self.config_network(p)
223 self.config_sa_tra(p)
224 self.config_protect(p)
227 super(TemplateIpsec4TunIfEsp, self).tearDown()
230 class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
232 """ IPsec UDP tunnel interface tests """
234 tun4_encrypt_node_name = "esp4-encrypt-tun"
235 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
236 encryption_type = ESP
240 super(TemplateIpsec4TunIfEspUdp, cls).setUpClass()
243 def tearDownClass(cls):
244 super(TemplateIpsec4TunIfEspUdp, cls).tearDownClass()
246 def verify_encrypted(self, p, sa, rxs):
249 # ensure the UDP ports are correct before we decrypt
251 self.assertTrue(rx.haslayer(UDP))
252 self.assert_equal(rx[UDP].sport, p.nat_header.sport)
253 self.assert_equal(rx[UDP].dport, 4500)
255 pkt = sa.decrypt(rx[IP])
256 if not pkt.haslayer(IP):
257 pkt = IP(pkt[Raw].load)
259 self.assert_packet_checksums_valid(pkt)
260 self.assert_equal(pkt[IP].dst, "1.1.1.1")
261 self.assert_equal(pkt[IP].src, self.pg1.remote_ip4)
262 except (IndexError, AssertionError):
263 self.logger.debug(ppp("Unexpected packet:", rx))
265 self.logger.debug(ppp("Decrypted packet:", pkt))
270 def config_sa_tra(self, p):
271 config_tun_params(p, self.encryption_type, p.tun_if)
273 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
274 p.auth_algo_vpp_id, p.auth_key,
275 p.crypt_algo_vpp_id, p.crypt_key,
276 self.vpp_esp_protocol,
278 udp_src=p.nat_header.sport,
279 udp_dst=p.nat_header.dport)
280 p.tun_sa_out.add_vpp_config()
282 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
283 p.auth_algo_vpp_id, p.auth_key,
284 p.crypt_algo_vpp_id, p.crypt_key,
285 self.vpp_esp_protocol,
287 udp_src=p.nat_header.sport,
288 udp_dst=p.nat_header.dport)
289 p.tun_sa_in.add_vpp_config()
292 super(TemplateIpsec4TunIfEspUdp, self).setUp()
295 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
296 IPSEC_API_SAD_FLAG_UDP_ENCAP)
297 p.nat_header = UDP(sport=5454, dport=4500)
299 self.tun_if = self.pg0
301 self.config_network(p)
302 self.config_sa_tra(p)
303 self.config_protect(p)
306 super(TemplateIpsec4TunIfEspUdp, self).tearDown()
309 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
310 """ Ipsec ESP - TUN tests """
311 tun4_encrypt_node_name = "esp4-encrypt-tun"
312 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
314 def test_tun_basic64(self):
315 """ ipsec 6o4 tunnel basic test """
316 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
318 self.verify_tun_64(self.params[socket.AF_INET], count=1)
320 def test_tun_burst64(self):
321 """ ipsec 6o4 tunnel basic test """
322 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
324 self.verify_tun_64(self.params[socket.AF_INET], count=257)
326 def test_tun_basic_frag44(self):
327 """ ipsec 4o4 tunnel frag basic test """
328 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
332 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
334 self.verify_tun_44(self.params[socket.AF_INET],
335 count=1, payload_size=1800, n_rx=2)
336 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
340 class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
341 """ Ipsec ESP UDP tests """
343 tun4_input_node = "ipsec4-tun-input"
346 super(TestIpsec4TunIfEspUdp, self).setUp()
348 def test_keepalive(self):
349 """ IPSEC NAT Keepalive """
350 self.verify_keepalive(self.ipv4_params)
353 class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
354 """ Ipsec ESP UDP GCM tests """
356 tun4_input_node = "ipsec4-tun-input"
359 super(TestIpsec4TunIfEspUdpGCM, self).setUp()
361 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
362 IPSEC_API_INTEG_ALG_NONE)
363 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
364 IPSEC_API_CRYPTO_ALG_AES_GCM_256)
365 p.crypt_algo = "AES-GCM"
367 p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
371 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
372 """ Ipsec ESP - TCP tests """
376 class TemplateIpsec6TunProtect(object):
377 """ IPsec IPv6 Tunnel protect """
379 def config_sa_tra(self, p):
380 config_tun_params(p, self.encryption_type, p.tun_if)
382 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
383 p.auth_algo_vpp_id, p.auth_key,
384 p.crypt_algo_vpp_id, p.crypt_key,
385 self.vpp_esp_protocol)
386 p.tun_sa_out.add_vpp_config()
388 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
389 p.auth_algo_vpp_id, p.auth_key,
390 p.crypt_algo_vpp_id, p.crypt_key,
391 self.vpp_esp_protocol)
392 p.tun_sa_in.add_vpp_config()
394 def config_sa_tun(self, p):
395 config_tun_params(p, self.encryption_type, p.tun_if)
397 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
398 p.auth_algo_vpp_id, p.auth_key,
399 p.crypt_algo_vpp_id, p.crypt_key,
400 self.vpp_esp_protocol,
401 self.tun_if.local_addr[p.addr_type],
402 self.tun_if.remote_addr[p.addr_type])
403 p.tun_sa_out.add_vpp_config()
405 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
406 p.auth_algo_vpp_id, p.auth_key,
407 p.crypt_algo_vpp_id, p.crypt_key,
408 self.vpp_esp_protocol,
409 self.tun_if.remote_addr[p.addr_type],
410 self.tun_if.local_addr[p.addr_type])
411 p.tun_sa_in.add_vpp_config()
413 def config_protect(self, p):
414 p.tun_protect = VppIpsecTunProtect(self,
418 p.tun_protect.add_vpp_config()
420 def config_network(self, p):
421 if hasattr(p, 'tun_dst'):
424 tun_dst = self.pg0.remote_ip6
425 p.tun_if = VppIpIpTunInterface(self, self.pg0,
428 p.tun_if.add_vpp_config()
430 p.tun_if.config_ip6()
431 p.tun_if.config_ip4()
433 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
434 [VppRoutePath(p.tun_if.remote_ip6,
436 proto=DpoProto.DPO_PROTO_IP6)])
437 p.route.add_vpp_config()
438 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
439 [VppRoutePath(p.tun_if.remote_ip4,
443 def unconfig_network(self, p):
444 p.route.remove_vpp_config()
445 p.tun_if.remove_vpp_config()
447 def unconfig_protect(self, p):
448 p.tun_protect.remove_vpp_config()
450 def unconfig_sa(self, p):
451 p.tun_sa_out.remove_vpp_config()
452 p.tun_sa_in.remove_vpp_config()
455 class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
457 """ IPsec tunnel interface tests """
459 encryption_type = ESP
462 super(TemplateIpsec6TunIfEsp, self).setUp()
464 self.tun_if = self.pg0
467 self.config_network(p)
468 self.config_sa_tra(p)
469 self.config_protect(p)
472 super(TemplateIpsec6TunIfEsp, self).tearDown()
475 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
477 """ Ipsec ESP - TUN tests """
478 tun6_encrypt_node_name = "esp6-encrypt-tun"
479 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
481 def test_tun_basic46(self):
482 """ ipsec 4o6 tunnel basic test """
483 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
484 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
486 def test_tun_burst46(self):
487 """ ipsec 4o6 tunnel burst test """
488 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
489 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
492 class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
493 IpsecTun6HandoffTests):
494 """ Ipsec ESP 6 Handoff tests """
495 tun6_encrypt_node_name = "esp6-encrypt-tun"
496 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
498 def test_tun_handoff_66_police(self):
499 """ ESP 6o6 tunnel with policer worker hand-off test """
500 self.vapi.cli("clear errors")
501 self.vapi.cli("clear ipsec sa")
504 p = self.params[socket.AF_INET6]
506 action_tx = PolicerAction(
507 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
509 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
510 conform_action=action_tx,
511 exceed_action=action_tx,
512 violate_action=action_tx)
513 policer.add_vpp_config()
515 # Start policing on tun
516 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
518 for pol_bind in [1, 0]:
519 policer.bind_vpp_config(pol_bind, True)
521 # inject alternately on worker 0 and 1.
522 for worker in [0, 1, 0, 1]:
523 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
525 src=p.remote_tun_if_host,
526 dst=self.pg1.remote_ip6,
528 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
529 self.pg1, worker=worker)
530 self.verify_decrypted6(p, recv_pkts)
531 self.logger.debug(self.vapi.cli("show trace max 100"))
533 stats = policer.get_stats()
534 stats0 = policer.get_stats(worker=0)
535 stats1 = policer.get_stats(worker=1)
538 # First pass: Worker 1, should have done all the policing
539 self.assertEqual(stats, stats1)
541 # Worker 0, should have handed everything off
542 self.assertEqual(stats0['conform_packets'], 0)
543 self.assertEqual(stats0['exceed_packets'], 0)
544 self.assertEqual(stats0['violate_packets'], 0)
546 # Second pass: both workers should have policed equal amounts
547 self.assertGreater(stats1['conform_packets'], 0)
548 self.assertEqual(stats1['exceed_packets'], 0)
549 self.assertGreater(stats1['violate_packets'], 0)
551 self.assertGreater(stats0['conform_packets'], 0)
552 self.assertEqual(stats0['exceed_packets'], 0)
553 self.assertGreater(stats0['violate_packets'], 0)
555 self.assertEqual(stats0['conform_packets'] +
556 stats0['violate_packets'],
557 stats1['conform_packets'] +
558 stats1['violate_packets'])
560 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
561 policer.remove_vpp_config()
564 class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
565 IpsecTun4HandoffTests):
566 """ Ipsec ESP 4 Handoff tests """
567 tun4_encrypt_node_name = "esp4-encrypt-tun"
568 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
570 def test_tun_handoff_44_police(self):
571 """ ESP 4o4 tunnel with policer worker hand-off test """
572 self.vapi.cli("clear errors")
573 self.vapi.cli("clear ipsec sa")
576 p = self.params[socket.AF_INET]
578 action_tx = PolicerAction(
579 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
581 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
582 conform_action=action_tx,
583 exceed_action=action_tx,
584 violate_action=action_tx)
585 policer.add_vpp_config()
587 # Start policing on tun
588 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
590 for pol_bind in [1, 0]:
591 policer.bind_vpp_config(pol_bind, True)
593 # inject alternately on worker 0 and 1.
594 for worker in [0, 1, 0, 1]:
595 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
597 src=p.remote_tun_if_host,
598 dst=self.pg1.remote_ip4,
600 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
601 self.pg1, worker=worker)
602 self.verify_decrypted(p, recv_pkts)
603 self.logger.debug(self.vapi.cli("show trace max 100"))
605 stats = policer.get_stats()
606 stats0 = policer.get_stats(worker=0)
607 stats1 = policer.get_stats(worker=1)
610 # First pass: Worker 1, should have done all the policing
611 self.assertEqual(stats, stats1)
613 # Worker 0, should have handed everything off
614 self.assertEqual(stats0['conform_packets'], 0)
615 self.assertEqual(stats0['exceed_packets'], 0)
616 self.assertEqual(stats0['violate_packets'], 0)
618 # Second pass: both workers should have policed equal amounts
619 self.assertGreater(stats1['conform_packets'], 0)
620 self.assertEqual(stats1['exceed_packets'], 0)
621 self.assertGreater(stats1['violate_packets'], 0)
623 self.assertGreater(stats0['conform_packets'], 0)
624 self.assertEqual(stats0['exceed_packets'], 0)
625 self.assertGreater(stats0['violate_packets'], 0)
627 self.assertEqual(stats0['conform_packets'] +
628 stats0['violate_packets'],
629 stats1['conform_packets'] +
630 stats1['violate_packets'])
632 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
633 policer.remove_vpp_config()
636 @tag_fixme_vpp_workers
637 class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
640 """ IPsec IPv4 Multi Tunnel interface """
642 encryption_type = ESP
643 tun4_encrypt_node_name = "esp4-encrypt-tun"
644 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
647 super(TestIpsec4MultiTunIfEsp, self).setUp()
649 self.tun_if = self.pg0
651 self.multi_params = []
652 self.pg0.generate_remote_hosts(10)
653 self.pg0.configure_ipv4_neighbors()
656 p = copy.copy(self.ipv4_params)
658 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
659 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
660 p.scapy_tun_spi = p.scapy_tun_spi + ii
661 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
662 p.vpp_tun_spi = p.vpp_tun_spi + ii
664 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
665 p.scapy_tra_spi = p.scapy_tra_spi + ii
666 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
667 p.vpp_tra_spi = p.vpp_tra_spi + ii
668 p.tun_dst = self.pg0.remote_hosts[ii].ip4
670 self.multi_params.append(p)
671 self.config_network(p)
672 self.config_sa_tra(p)
673 self.config_protect(p)
676 super(TestIpsec4MultiTunIfEsp, self).tearDown()
678 def test_tun_44(self):
679 """Multiple IPSEC tunnel interfaces """
680 for p in self.multi_params:
681 self.verify_tun_44(p, count=127)
682 self.assertEqual(p.tun_if.get_rx_stats(), 127)
683 self.assertEqual(p.tun_if.get_tx_stats(), 127)
685 def test_tun_rr_44(self):
686 """ Round-robin packets acrros multiple interface """
688 for p in self.multi_params:
689 tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
690 src=p.remote_tun_if_host,
691 dst=self.pg1.remote_ip4)
692 rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
694 for rx, p in zip(rxs, self.multi_params):
695 self.verify_decrypted(p, [rx])
698 for p in self.multi_params:
699 tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
700 dst=p.remote_tun_if_host)
701 rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
703 for rx, p in zip(rxs, self.multi_params):
704 self.verify_encrypted(p, p.vpp_tun_sa, [rx])
707 class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
710 """ IPsec IPv4 Tunnel interface all Algos """
712 encryption_type = ESP
713 tun4_encrypt_node_name = "esp4-encrypt-tun"
714 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
717 super(TestIpsec4TunIfEspAll, self).setUp()
719 self.tun_if = self.pg0
722 self.config_network(p)
723 self.config_sa_tra(p)
724 self.config_protect(p)
728 self.unconfig_protect(p)
729 self.unconfig_network(p)
732 super(TestIpsec4TunIfEspAll, self).tearDown()
736 # change the key and the SPI
739 p.crypt_key = b'X' + p.crypt_key[1:]
741 p.scapy_tun_sa_id += 1
744 p.tun_if.local_spi = p.vpp_tun_spi
745 p.tun_if.remote_spi = p.scapy_tun_spi
747 config_tun_params(p, self.encryption_type, p.tun_if)
749 p.tun_sa_out = VppIpsecSA(self,
756 self.vpp_esp_protocol,
759 p.tun_sa_in = VppIpsecSA(self,
766 self.vpp_esp_protocol,
769 p.tun_sa_in.add_vpp_config()
770 p.tun_sa_out.add_vpp_config()
772 self.config_protect(p)
773 np.tun_sa_out.remove_vpp_config()
774 np.tun_sa_in.remove_vpp_config()
775 self.logger.info(self.vapi.cli("sh ipsec sa"))
777 def test_tun_44(self):
778 """IPSEC tunnel all algos """
780 # foreach VPP crypto engine
781 engines = ["ia32", "ipsecmb", "openssl"]
783 # foreach crypto algorithm
784 algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
785 IPSEC_API_CRYPTO_ALG_AES_GCM_128),
786 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
787 IPSEC_API_INTEG_ALG_NONE),
788 'scapy-crypto': "AES-GCM",
789 'scapy-integ': "NULL",
790 'key': b"JPjyOWBeVEQiMe7h",
792 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
793 IPSEC_API_CRYPTO_ALG_AES_GCM_192),
794 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
795 IPSEC_API_INTEG_ALG_NONE),
796 'scapy-crypto': "AES-GCM",
797 'scapy-integ': "NULL",
798 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
800 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
801 IPSEC_API_CRYPTO_ALG_AES_GCM_256),
802 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
803 IPSEC_API_INTEG_ALG_NONE),
804 'scapy-crypto': "AES-GCM",
805 'scapy-integ': "NULL",
806 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
808 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
809 IPSEC_API_CRYPTO_ALG_AES_CBC_128),
810 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
811 IPSEC_API_INTEG_ALG_SHA1_96),
812 'scapy-crypto': "AES-CBC",
813 'scapy-integ': "HMAC-SHA1-96",
815 'key': b"JPjyOWBeVEQiMe7h"},
816 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
817 IPSEC_API_CRYPTO_ALG_AES_CBC_192),
818 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
819 IPSEC_API_INTEG_ALG_SHA_512_256),
820 'scapy-crypto': "AES-CBC",
821 'scapy-integ': "SHA2-512-256",
823 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
824 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
825 IPSEC_API_CRYPTO_ALG_AES_CBC_256),
826 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
827 IPSEC_API_INTEG_ALG_SHA_256_128),
828 'scapy-crypto': "AES-CBC",
829 'scapy-integ': "SHA2-256-128",
831 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
832 {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
833 IPSEC_API_CRYPTO_ALG_NONE),
834 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
835 IPSEC_API_INTEG_ALG_SHA1_96),
836 'scapy-crypto': "NULL",
837 'scapy-integ': "HMAC-SHA1-96",
839 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
841 for engine in engines:
842 self.vapi.cli("set crypto handler all %s" % engine)
845 # loop through each of the algorithms
848 # with self.subTest(algo=algo['scapy']):
851 p.auth_algo_vpp_id = algo['vpp-integ']
852 p.crypt_algo_vpp_id = algo['vpp-crypto']
853 p.crypt_algo = algo['scapy-crypto']
854 p.auth_algo = algo['scapy-integ']
855 p.crypt_key = algo['key']
856 p.salt = algo['salt']
862 self.verify_tun_44(p, count=127)
865 class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
868 """ IPsec IPv4 Tunnel interface no Algos """
870 encryption_type = ESP
871 tun4_encrypt_node_name = "esp4-encrypt-tun"
872 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
875 super(TestIpsec4TunIfEspNoAlgo, self).setUp()
877 self.tun_if = self.pg0
879 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
880 IPSEC_API_INTEG_ALG_NONE)
884 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
885 IPSEC_API_CRYPTO_ALG_NONE)
886 p.crypt_algo = 'NULL'
890 super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
892 def test_tun_44(self):
893 """ IPSec SA with NULL algos """
896 self.config_network(p)
897 self.config_sa_tra(p)
898 self.config_protect(p)
900 tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
901 dst=p.remote_tun_if_host)
902 self.send_and_assert_no_replies(self.pg1, tx)
904 self.unconfig_protect(p)
906 self.unconfig_network(p)
909 @tag_fixme_vpp_workers
910 class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
913 """ IPsec IPv6 Multi Tunnel interface """
915 encryption_type = ESP
916 tun6_encrypt_node_name = "esp6-encrypt-tun"
917 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
920 super(TestIpsec6MultiTunIfEsp, self).setUp()
922 self.tun_if = self.pg0
924 self.multi_params = []
925 self.pg0.generate_remote_hosts(10)
926 self.pg0.configure_ipv6_neighbors()
929 p = copy.copy(self.ipv6_params)
931 p.remote_tun_if_host = "1111::%d" % (ii + 1)
932 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
933 p.scapy_tun_spi = p.scapy_tun_spi + ii
934 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
935 p.vpp_tun_spi = p.vpp_tun_spi + ii
937 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
938 p.scapy_tra_spi = p.scapy_tra_spi + ii
939 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
940 p.vpp_tra_spi = p.vpp_tra_spi + ii
941 p.tun_dst = self.pg0.remote_hosts[ii].ip6
943 self.multi_params.append(p)
944 self.config_network(p)
945 self.config_sa_tra(p)
946 self.config_protect(p)
949 super(TestIpsec6MultiTunIfEsp, self).tearDown()
951 def test_tun_66(self):
952 """Multiple IPSEC tunnel interfaces """
953 for p in self.multi_params:
954 self.verify_tun_66(p, count=127)
955 self.assertEqual(p.tun_if.get_rx_stats(), 127)
956 self.assertEqual(p.tun_if.get_tx_stats(), 127)
959 class TestIpsecGreTebIfEsp(TemplateIpsec,
961 """ Ipsec GRE TEB ESP - TUN tests """
962 tun4_encrypt_node_name = "esp4-encrypt-tun"
963 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
964 encryption_type = ESP
965 omac = "00:11:22:33:44:55"
967 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
969 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
970 sa.encrypt(IP(src=self.pg0.remote_ip4,
971 dst=self.pg0.local_ip4) /
973 Ether(dst=self.omac) /
974 IP(src="1.1.1.1", dst="1.1.1.2") /
975 UDP(sport=1144, dport=2233) /
976 Raw(b'X' * payload_size))
977 for i in range(count)]
979 def gen_pkts(self, sw_intf, src, dst, count=1,
981 return [Ether(dst=self.omac) /
982 IP(src="1.1.1.1", dst="1.1.1.2") /
983 UDP(sport=1144, dport=2233) /
984 Raw(b'X' * payload_size)
985 for i in range(count)]
987 def verify_decrypted(self, p, rxs):
989 self.assert_equal(rx[Ether].dst, self.omac)
990 self.assert_equal(rx[IP].dst, "1.1.1.2")
992 def verify_encrypted(self, p, sa, rxs):
995 pkt = sa.decrypt(rx[IP])
996 if not pkt.haslayer(IP):
997 pkt = IP(pkt[Raw].load)
998 self.assert_packet_checksums_valid(pkt)
999 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1000 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1001 self.assertTrue(pkt.haslayer(GRE))
1003 self.assertEqual(e[Ether].dst, self.omac)
1004 self.assertEqual(e[IP].dst, "1.1.1.2")
1005 except (IndexError, AssertionError):
1006 self.logger.debug(ppp("Unexpected packet:", rx))
1008 self.logger.debug(ppp("Decrypted packet:", pkt))
1014 super(TestIpsecGreTebIfEsp, self).setUp()
1016 self.tun_if = self.pg0
1018 p = self.ipv4_params
1020 bd1 = VppBridgeDomain(self, 1)
1021 bd1.add_vpp_config()
1023 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1024 p.auth_algo_vpp_id, p.auth_key,
1025 p.crypt_algo_vpp_id, p.crypt_key,
1026 self.vpp_esp_protocol,
1028 self.pg0.remote_ip4)
1029 p.tun_sa_out.add_vpp_config()
1031 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1032 p.auth_algo_vpp_id, p.auth_key,
1033 p.crypt_algo_vpp_id, p.crypt_key,
1034 self.vpp_esp_protocol,
1035 self.pg0.remote_ip4,
1037 p.tun_sa_in.add_vpp_config()
1039 p.tun_if = VppGreInterface(self,
1041 self.pg0.remote_ip4,
1042 type=(VppEnum.vl_api_gre_tunnel_type_t.
1043 GRE_API_TUNNEL_TYPE_TEB))
1044 p.tun_if.add_vpp_config()
1046 p.tun_protect = VppIpsecTunProtect(self,
1051 p.tun_protect.add_vpp_config()
1054 p.tun_if.config_ip4()
1055 config_tun_params(p, self.encryption_type, p.tun_if)
1057 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1058 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1060 self.vapi.cli("clear ipsec sa")
1061 self.vapi.cli("sh adj")
1062 self.vapi.cli("sh ipsec tun")
1065 p = self.ipv4_params
1066 p.tun_if.unconfig_ip4()
1067 super(TestIpsecGreTebIfEsp, self).tearDown()
1070 class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
1072 """ Ipsec GRE TEB ESP - TUN tests """
1073 tun4_encrypt_node_name = "esp4-encrypt-tun"
1074 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1075 encryption_type = ESP
1076 omac = "00:11:22:33:44:55"
1078 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1080 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1081 sa.encrypt(IP(src=self.pg0.remote_ip4,
1082 dst=self.pg0.local_ip4) /
1084 Ether(dst=self.omac) /
1085 IP(src="1.1.1.1", dst="1.1.1.2") /
1086 UDP(sport=1144, dport=2233) /
1087 Raw(b'X' * payload_size))
1088 for i in range(count)]
1090 def gen_pkts(self, sw_intf, src, dst, count=1,
1092 return [Ether(dst=self.omac) /
1094 IP(src="1.1.1.1", dst="1.1.1.2") /
1095 UDP(sport=1144, dport=2233) /
1096 Raw(b'X' * payload_size)
1097 for i in range(count)]
1099 def verify_decrypted(self, p, rxs):
1101 self.assert_equal(rx[Ether].dst, self.omac)
1102 self.assert_equal(rx[Dot1Q].vlan, 11)
1103 self.assert_equal(rx[IP].dst, "1.1.1.2")
1105 def verify_encrypted(self, p, sa, rxs):
1108 pkt = sa.decrypt(rx[IP])
1109 if not pkt.haslayer(IP):
1110 pkt = IP(pkt[Raw].load)
1111 self.assert_packet_checksums_valid(pkt)
1112 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1113 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1114 self.assertTrue(pkt.haslayer(GRE))
1116 self.assertEqual(e[Ether].dst, self.omac)
1117 self.assertFalse(e.haslayer(Dot1Q))
1118 self.assertEqual(e[IP].dst, "1.1.1.2")
1119 except (IndexError, AssertionError):
1120 self.logger.debug(ppp("Unexpected packet:", rx))
1122 self.logger.debug(ppp("Decrypted packet:", pkt))
1128 super(TestIpsecGreTebVlanIfEsp, self).setUp()
1130 self.tun_if = self.pg0
1132 p = self.ipv4_params
1134 bd1 = VppBridgeDomain(self, 1)
1135 bd1.add_vpp_config()
1137 self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
1138 self.vapi.l2_interface_vlan_tag_rewrite(
1139 sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
1141 self.pg1_11.admin_up()
1143 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1144 p.auth_algo_vpp_id, p.auth_key,
1145 p.crypt_algo_vpp_id, p.crypt_key,
1146 self.vpp_esp_protocol,
1148 self.pg0.remote_ip4)
1149 p.tun_sa_out.add_vpp_config()
1151 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1152 p.auth_algo_vpp_id, p.auth_key,
1153 p.crypt_algo_vpp_id, p.crypt_key,
1154 self.vpp_esp_protocol,
1155 self.pg0.remote_ip4,
1157 p.tun_sa_in.add_vpp_config()
1159 p.tun_if = VppGreInterface(self,
1161 self.pg0.remote_ip4,
1162 type=(VppEnum.vl_api_gre_tunnel_type_t.
1163 GRE_API_TUNNEL_TYPE_TEB))
1164 p.tun_if.add_vpp_config()
1166 p.tun_protect = VppIpsecTunProtect(self,
1171 p.tun_protect.add_vpp_config()
1174 p.tun_if.config_ip4()
1175 config_tun_params(p, self.encryption_type, p.tun_if)
1177 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1178 VppBridgeDomainPort(self, bd1, self.pg1_11).add_vpp_config()
1180 self.vapi.cli("clear ipsec sa")
1183 p = self.ipv4_params
1184 p.tun_if.unconfig_ip4()
1185 super(TestIpsecGreTebVlanIfEsp, self).tearDown()
1186 self.pg1_11.admin_down()
1187 self.pg1_11.remove_vpp_config()
1190 class TestIpsecGreTebIfEspTra(TemplateIpsec,
1192 """ Ipsec GRE TEB ESP - Tra tests """
1193 tun4_encrypt_node_name = "esp4-encrypt-tun"
1194 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1195 encryption_type = ESP
1196 omac = "00:11:22:33:44:55"
1198 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1200 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1201 sa.encrypt(IP(src=self.pg0.remote_ip4,
1202 dst=self.pg0.local_ip4) /
1204 Ether(dst=self.omac) /
1205 IP(src="1.1.1.1", dst="1.1.1.2") /
1206 UDP(sport=1144, dport=2233) /
1207 Raw(b'X' * payload_size))
1208 for i in range(count)]
1210 def gen_pkts(self, sw_intf, src, dst, count=1,
1212 return [Ether(dst=self.omac) /
1213 IP(src="1.1.1.1", dst="1.1.1.2") /
1214 UDP(sport=1144, dport=2233) /
1215 Raw(b'X' * payload_size)
1216 for i in range(count)]
1218 def verify_decrypted(self, p, rxs):
1220 self.assert_equal(rx[Ether].dst, self.omac)
1221 self.assert_equal(rx[IP].dst, "1.1.1.2")
1223 def verify_encrypted(self, p, sa, rxs):
1226 pkt = sa.decrypt(rx[IP])
1227 if not pkt.haslayer(IP):
1228 pkt = IP(pkt[Raw].load)
1229 self.assert_packet_checksums_valid(pkt)
1230 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1231 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1232 self.assertTrue(pkt.haslayer(GRE))
1234 self.assertEqual(e[Ether].dst, self.omac)
1235 self.assertEqual(e[IP].dst, "1.1.1.2")
1236 except (IndexError, AssertionError):
1237 self.logger.debug(ppp("Unexpected packet:", rx))
1239 self.logger.debug(ppp("Decrypted packet:", pkt))
1245 super(TestIpsecGreTebIfEspTra, self).setUp()
1247 self.tun_if = self.pg0
1249 p = self.ipv4_params
1251 bd1 = VppBridgeDomain(self, 1)
1252 bd1.add_vpp_config()
1254 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1255 p.auth_algo_vpp_id, p.auth_key,
1256 p.crypt_algo_vpp_id, p.crypt_key,
1257 self.vpp_esp_protocol)
1258 p.tun_sa_out.add_vpp_config()
1260 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1261 p.auth_algo_vpp_id, p.auth_key,
1262 p.crypt_algo_vpp_id, p.crypt_key,
1263 self.vpp_esp_protocol)
1264 p.tun_sa_in.add_vpp_config()
1266 p.tun_if = VppGreInterface(self,
1268 self.pg0.remote_ip4,
1269 type=(VppEnum.vl_api_gre_tunnel_type_t.
1270 GRE_API_TUNNEL_TYPE_TEB))
1271 p.tun_if.add_vpp_config()
1273 p.tun_protect = VppIpsecTunProtect(self,
1278 p.tun_protect.add_vpp_config()
1281 p.tun_if.config_ip4()
1282 config_tra_params(p, self.encryption_type, p.tun_if)
1284 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1285 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1287 self.vapi.cli("clear ipsec sa")
1290 p = self.ipv4_params
1291 p.tun_if.unconfig_ip4()
1292 super(TestIpsecGreTebIfEspTra, self).tearDown()
1295 class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
1297 """ Ipsec GRE TEB UDP ESP - Tra tests """
1298 tun4_encrypt_node_name = "esp4-encrypt-tun"
1299 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1300 encryption_type = ESP
1301 omac = "00:11:22:33:44:55"
1303 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1305 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1306 sa.encrypt(IP(src=self.pg0.remote_ip4,
1307 dst=self.pg0.local_ip4) /
1309 Ether(dst=self.omac) /
1310 IP(src="1.1.1.1", dst="1.1.1.2") /
1311 UDP(sport=1144, dport=2233) /
1312 Raw(b'X' * payload_size))
1313 for i in range(count)]
1315 def gen_pkts(self, sw_intf, src, dst, count=1,
1317 return [Ether(dst=self.omac) /
1318 IP(src="1.1.1.1", dst="1.1.1.2") /
1319 UDP(sport=1144, dport=2233) /
1320 Raw(b'X' * payload_size)
1321 for i in range(count)]
1323 def verify_decrypted(self, p, rxs):
1325 self.assert_equal(rx[Ether].dst, self.omac)
1326 self.assert_equal(rx[IP].dst, "1.1.1.2")
1328 def verify_encrypted(self, p, sa, rxs):
1330 self.assertTrue(rx.haslayer(UDP))
1331 self.assertEqual(rx[UDP].dport, 4545)
1332 self.assertEqual(rx[UDP].sport, 5454)
1334 pkt = sa.decrypt(rx[IP])
1335 if not pkt.haslayer(IP):
1336 pkt = IP(pkt[Raw].load)
1337 self.assert_packet_checksums_valid(pkt)
1338 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1339 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1340 self.assertTrue(pkt.haslayer(GRE))
1342 self.assertEqual(e[Ether].dst, self.omac)
1343 self.assertEqual(e[IP].dst, "1.1.1.2")
1344 except (IndexError, AssertionError):
1345 self.logger.debug(ppp("Unexpected packet:", rx))
1347 self.logger.debug(ppp("Decrypted packet:", pkt))
1353 super(TestIpsecGreTebUdpIfEspTra, self).setUp()
1355 self.tun_if = self.pg0
1357 p = self.ipv4_params
1358 p = self.ipv4_params
1359 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
1360 IPSEC_API_SAD_FLAG_UDP_ENCAP)
1361 p.nat_header = UDP(sport=5454, dport=4545)
1363 bd1 = VppBridgeDomain(self, 1)
1364 bd1.add_vpp_config()
1366 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1367 p.auth_algo_vpp_id, p.auth_key,
1368 p.crypt_algo_vpp_id, p.crypt_key,
1369 self.vpp_esp_protocol,
1373 p.tun_sa_out.add_vpp_config()
1375 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1376 p.auth_algo_vpp_id, p.auth_key,
1377 p.crypt_algo_vpp_id, p.crypt_key,
1378 self.vpp_esp_protocol,
1380 VppEnum.vl_api_ipsec_sad_flags_t.
1381 IPSEC_API_SAD_FLAG_IS_INBOUND),
1384 p.tun_sa_in.add_vpp_config()
1386 p.tun_if = VppGreInterface(self,
1388 self.pg0.remote_ip4,
1389 type=(VppEnum.vl_api_gre_tunnel_type_t.
1390 GRE_API_TUNNEL_TYPE_TEB))
1391 p.tun_if.add_vpp_config()
1393 p.tun_protect = VppIpsecTunProtect(self,
1398 p.tun_protect.add_vpp_config()
1401 p.tun_if.config_ip4()
1402 config_tra_params(p, self.encryption_type, p.tun_if)
1404 VppBridgeDomainPort(self, bd1, p.tun_if).add_vpp_config()
1405 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
1407 self.vapi.cli("clear ipsec sa")
1408 self.logger.info(self.vapi.cli("sh ipsec sa 0"))
1411 p = self.ipv4_params
1412 p.tun_if.unconfig_ip4()
1413 super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
1416 class TestIpsecGreIfEsp(TemplateIpsec,
1418 """ Ipsec GRE ESP - TUN tests """
1419 tun4_encrypt_node_name = "esp4-encrypt-tun"
1420 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1421 encryption_type = ESP
1423 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1425 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1426 sa.encrypt(IP(src=self.pg0.remote_ip4,
1427 dst=self.pg0.local_ip4) /
1429 IP(src=self.pg1.local_ip4,
1430 dst=self.pg1.remote_ip4) /
1431 UDP(sport=1144, dport=2233) /
1432 Raw(b'X' * payload_size))
1433 for i in range(count)]
1435 def gen_pkts(self, sw_intf, src, dst, count=1,
1437 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1438 IP(src="1.1.1.1", dst="1.1.1.2") /
1439 UDP(sport=1144, dport=2233) /
1440 Raw(b'X' * payload_size)
1441 for i in range(count)]
1443 def verify_decrypted(self, p, rxs):
1445 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1446 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1448 def verify_encrypted(self, p, sa, rxs):
1451 pkt = sa.decrypt(rx[IP])
1452 if not pkt.haslayer(IP):
1453 pkt = IP(pkt[Raw].load)
1454 self.assert_packet_checksums_valid(pkt)
1455 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
1456 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
1457 self.assertTrue(pkt.haslayer(GRE))
1459 self.assertEqual(e[IP].dst, "1.1.1.2")
1460 except (IndexError, AssertionError):
1461 self.logger.debug(ppp("Unexpected packet:", rx))
1463 self.logger.debug(ppp("Decrypted packet:", pkt))
1469 super(TestIpsecGreIfEsp, self).setUp()
1471 self.tun_if = self.pg0
1473 p = self.ipv4_params
1475 bd1 = VppBridgeDomain(self, 1)
1476 bd1.add_vpp_config()
1478 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1479 p.auth_algo_vpp_id, p.auth_key,
1480 p.crypt_algo_vpp_id, p.crypt_key,
1481 self.vpp_esp_protocol,
1483 self.pg0.remote_ip4)
1484 p.tun_sa_out.add_vpp_config()
1486 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1487 p.auth_algo_vpp_id, p.auth_key,
1488 p.crypt_algo_vpp_id, p.crypt_key,
1489 self.vpp_esp_protocol,
1490 self.pg0.remote_ip4,
1492 p.tun_sa_in.add_vpp_config()
1494 p.tun_if = VppGreInterface(self,
1496 self.pg0.remote_ip4)
1497 p.tun_if.add_vpp_config()
1499 p.tun_protect = VppIpsecTunProtect(self,
1503 p.tun_protect.add_vpp_config()
1506 p.tun_if.config_ip4()
1507 config_tun_params(p, self.encryption_type, p.tun_if)
1509 VppIpRoute(self, "1.1.1.2", 32,
1510 [VppRoutePath(p.tun_if.remote_ip4,
1511 0xffffffff)]).add_vpp_config()
1514 p = self.ipv4_params
1515 p.tun_if.unconfig_ip4()
1516 super(TestIpsecGreIfEsp, self).tearDown()
1519 class TestIpsecGreIfEspTra(TemplateIpsec,
1521 """ Ipsec GRE ESP - TRA tests """
1522 tun4_encrypt_node_name = "esp4-encrypt-tun"
1523 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1524 encryption_type = ESP
1526 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1528 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1529 sa.encrypt(IP(src=self.pg0.remote_ip4,
1530 dst=self.pg0.local_ip4) /
1532 IP(src=self.pg1.local_ip4,
1533 dst=self.pg1.remote_ip4) /
1534 UDP(sport=1144, dport=2233) /
1535 Raw(b'X' * payload_size))
1536 for i in range(count)]
1538 def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
1540 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1541 sa.encrypt(IP(src=self.pg0.remote_ip4,
1542 dst=self.pg0.local_ip4) /
1544 UDP(sport=1144, dport=2233) /
1545 Raw(b'X' * payload_size))
1546 for i in range(count)]
1548 def gen_pkts(self, sw_intf, src, dst, count=1,
1550 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1551 IP(src="1.1.1.1", dst="1.1.1.2") /
1552 UDP(sport=1144, dport=2233) /
1553 Raw(b'X' * payload_size)
1554 for i in range(count)]
1556 def verify_decrypted(self, p, rxs):
1558 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1559 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1561 def verify_encrypted(self, p, sa, rxs):
1564 pkt = sa.decrypt(rx[IP])
1565 if not pkt.haslayer(IP):
1566 pkt = IP(pkt[Raw].load)
1567 self.assert_packet_checksums_valid(pkt)
1568 self.assertTrue(pkt.haslayer(GRE))
1570 self.assertEqual(e[IP].dst, "1.1.1.2")
1571 except (IndexError, AssertionError):
1572 self.logger.debug(ppp("Unexpected packet:", rx))
1574 self.logger.debug(ppp("Decrypted packet:", pkt))
1580 super(TestIpsecGreIfEspTra, self).setUp()
1582 self.tun_if = self.pg0
1584 p = self.ipv4_params
1586 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1587 p.auth_algo_vpp_id, p.auth_key,
1588 p.crypt_algo_vpp_id, p.crypt_key,
1589 self.vpp_esp_protocol)
1590 p.tun_sa_out.add_vpp_config()
1592 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1593 p.auth_algo_vpp_id, p.auth_key,
1594 p.crypt_algo_vpp_id, p.crypt_key,
1595 self.vpp_esp_protocol)
1596 p.tun_sa_in.add_vpp_config()
1598 p.tun_if = VppGreInterface(self,
1600 self.pg0.remote_ip4)
1601 p.tun_if.add_vpp_config()
1603 p.tun_protect = VppIpsecTunProtect(self,
1607 p.tun_protect.add_vpp_config()
1610 p.tun_if.config_ip4()
1611 config_tra_params(p, self.encryption_type, p.tun_if)
1613 VppIpRoute(self, "1.1.1.2", 32,
1614 [VppRoutePath(p.tun_if.remote_ip4,
1615 0xffffffff)]).add_vpp_config()
1618 p = self.ipv4_params
1619 p.tun_if.unconfig_ip4()
1620 super(TestIpsecGreIfEspTra, self).tearDown()
1622 def test_gre_non_ip(self):
1623 p = self.ipv4_params
1624 tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
1625 src=p.remote_tun_if_host,
1626 dst=self.pg1.remote_ip6)
1627 self.send_and_assert_no_replies(self.tun_if, tx)
1628 node_name = ('/err/%s/unsupported payload' %
1629 self.tun4_decrypt_node_name[0])
1630 self.assertEqual(1, self.statistics.get_err_counter(node_name))
1633 class TestIpsecGre6IfEspTra(TemplateIpsec,
1635 """ Ipsec GRE ESP - TRA tests """
1636 tun6_encrypt_node_name = "esp6-encrypt-tun"
1637 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1638 encryption_type = ESP
1640 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1642 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1643 sa.encrypt(IPv6(src=self.pg0.remote_ip6,
1644 dst=self.pg0.local_ip6) /
1646 IPv6(src=self.pg1.local_ip6,
1647 dst=self.pg1.remote_ip6) /
1648 UDP(sport=1144, dport=2233) /
1649 Raw(b'X' * payload_size))
1650 for i in range(count)]
1652 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1654 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1655 IPv6(src="1::1", dst="1::2") /
1656 UDP(sport=1144, dport=2233) /
1657 Raw(b'X' * payload_size)
1658 for i in range(count)]
1660 def verify_decrypted6(self, p, rxs):
1662 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1663 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1665 def verify_encrypted6(self, p, sa, rxs):
1668 pkt = sa.decrypt(rx[IPv6])
1669 if not pkt.haslayer(IPv6):
1670 pkt = IPv6(pkt[Raw].load)
1671 self.assert_packet_checksums_valid(pkt)
1672 self.assertTrue(pkt.haslayer(GRE))
1674 self.assertEqual(e[IPv6].dst, "1::2")
1675 except (IndexError, AssertionError):
1676 self.logger.debug(ppp("Unexpected packet:", rx))
1678 self.logger.debug(ppp("Decrypted packet:", pkt))
1684 super(TestIpsecGre6IfEspTra, self).setUp()
1686 self.tun_if = self.pg0
1688 p = self.ipv6_params
1690 bd1 = VppBridgeDomain(self, 1)
1691 bd1.add_vpp_config()
1693 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1694 p.auth_algo_vpp_id, p.auth_key,
1695 p.crypt_algo_vpp_id, p.crypt_key,
1696 self.vpp_esp_protocol)
1697 p.tun_sa_out.add_vpp_config()
1699 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1700 p.auth_algo_vpp_id, p.auth_key,
1701 p.crypt_algo_vpp_id, p.crypt_key,
1702 self.vpp_esp_protocol)
1703 p.tun_sa_in.add_vpp_config()
1705 p.tun_if = VppGreInterface(self,
1707 self.pg0.remote_ip6)
1708 p.tun_if.add_vpp_config()
1710 p.tun_protect = VppIpsecTunProtect(self,
1714 p.tun_protect.add_vpp_config()
1717 p.tun_if.config_ip6()
1718 config_tra_params(p, self.encryption_type, p.tun_if)
1720 r = VppIpRoute(self, "1::2", 128,
1721 [VppRoutePath(p.tun_if.remote_ip6,
1723 proto=DpoProto.DPO_PROTO_IP6)])
1727 p = self.ipv6_params
1728 p.tun_if.unconfig_ip6()
1729 super(TestIpsecGre6IfEspTra, self).tearDown()
1732 class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
1733 """ Ipsec mGRE ESP v4 TRA tests """
1734 tun4_encrypt_node_name = "esp4-encrypt-tun"
1735 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
1736 encryption_type = ESP
1738 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
1740 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1741 sa.encrypt(IP(src=p.tun_dst,
1742 dst=self.pg0.local_ip4) /
1744 IP(src=self.pg1.local_ip4,
1745 dst=self.pg1.remote_ip4) /
1746 UDP(sport=1144, dport=2233) /
1747 Raw(b'X' * payload_size))
1748 for i in range(count)]
1750 def gen_pkts(self, sw_intf, src, dst, count=1,
1752 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1753 IP(src="1.1.1.1", dst=dst) /
1754 UDP(sport=1144, dport=2233) /
1755 Raw(b'X' * payload_size)
1756 for i in range(count)]
1758 def verify_decrypted(self, p, rxs):
1760 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1761 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
1763 def verify_encrypted(self, p, sa, rxs):
1766 pkt = sa.decrypt(rx[IP])
1767 if not pkt.haslayer(IP):
1768 pkt = IP(pkt[Raw].load)
1769 self.assert_packet_checksums_valid(pkt)
1770 self.assertTrue(pkt.haslayer(GRE))
1772 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
1773 except (IndexError, AssertionError):
1774 self.logger.debug(ppp("Unexpected packet:", rx))
1776 self.logger.debug(ppp("Decrypted packet:", pkt))
1782 super(TestIpsecMGreIfEspTra4, self).setUp()
1785 self.tun_if = self.pg0
1786 p = self.ipv4_params
1787 p.tun_if = VppGreInterface(self,
1790 mode=(VppEnum.vl_api_tunnel_mode_t.
1791 TUNNEL_API_MODE_MP))
1792 p.tun_if.add_vpp_config()
1794 p.tun_if.config_ip4()
1795 p.tun_if.generate_remote_hosts(N_NHS)
1796 self.pg0.generate_remote_hosts(N_NHS)
1797 self.pg0.configure_ipv4_neighbors()
1799 # setup some SAs for several next-hops on the interface
1800 self.multi_params = []
1802 for ii in range(N_NHS):
1803 p = copy.copy(self.ipv4_params)
1805 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
1806 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1807 p.scapy_tun_spi = p.scapy_tun_spi + ii
1808 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1809 p.vpp_tun_spi = p.vpp_tun_spi + ii
1811 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1812 p.scapy_tra_spi = p.scapy_tra_spi + ii
1813 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1814 p.vpp_tra_spi = p.vpp_tra_spi + ii
1815 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1816 p.auth_algo_vpp_id, p.auth_key,
1817 p.crypt_algo_vpp_id, p.crypt_key,
1818 self.vpp_esp_protocol)
1819 p.tun_sa_out.add_vpp_config()
1821 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1822 p.auth_algo_vpp_id, p.auth_key,
1823 p.crypt_algo_vpp_id, p.crypt_key,
1824 self.vpp_esp_protocol)
1825 p.tun_sa_in.add_vpp_config()
1827 p.tun_protect = VppIpsecTunProtect(
1832 nh=p.tun_if.remote_hosts[ii].ip4)
1833 p.tun_protect.add_vpp_config()
1834 config_tra_params(p, self.encryption_type, p.tun_if)
1835 self.multi_params.append(p)
1837 VppIpRoute(self, p.remote_tun_if_host, 32,
1838 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
1839 p.tun_if.sw_if_index)]).add_vpp_config()
1841 # in this v4 variant add the teibs after the protect
1842 p.teib = VppTeib(self, p.tun_if,
1843 p.tun_if.remote_hosts[ii].ip4,
1844 self.pg0.remote_hosts[ii].ip4).add_vpp_config()
1845 p.tun_dst = self.pg0.remote_hosts[ii].ip4
1846 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1849 p = self.ipv4_params
1850 p.tun_if.unconfig_ip4()
1851 super(TestIpsecMGreIfEspTra4, self).tearDown()
1853 def test_tun_44(self):
1856 for p in self.multi_params:
1857 self.verify_tun_44(p, count=N_PKTS)
1858 p.teib.remove_vpp_config()
1859 self.verify_tun_dropped_44(p, count=N_PKTS)
1860 p.teib.add_vpp_config()
1861 self.verify_tun_44(p, count=N_PKTS)
1864 class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
1865 """ Ipsec mGRE ESP v6 TRA tests """
1866 tun6_encrypt_node_name = "esp6-encrypt-tun"
1867 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
1868 encryption_type = ESP
1870 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
1872 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1873 sa.encrypt(IPv6(src=p.tun_dst,
1874 dst=self.pg0.local_ip6) /
1876 IPv6(src=self.pg1.local_ip6,
1877 dst=self.pg1.remote_ip6) /
1878 UDP(sport=1144, dport=2233) /
1879 Raw(b'X' * payload_size))
1880 for i in range(count)]
1882 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
1884 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
1885 IPv6(src="1::1", dst=dst) /
1886 UDP(sport=1144, dport=2233) /
1887 Raw(b'X' * payload_size)
1888 for i in range(count)]
1890 def verify_decrypted6(self, p, rxs):
1892 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
1893 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1895 def verify_encrypted6(self, p, sa, rxs):
1898 pkt = sa.decrypt(rx[IPv6])
1899 if not pkt.haslayer(IPv6):
1900 pkt = IPv6(pkt[Raw].load)
1901 self.assert_packet_checksums_valid(pkt)
1902 self.assertTrue(pkt.haslayer(GRE))
1904 self.assertEqual(e[IPv6].dst, p.remote_tun_if_host)
1905 except (IndexError, AssertionError):
1906 self.logger.debug(ppp("Unexpected packet:", rx))
1908 self.logger.debug(ppp("Decrypted packet:", pkt))
1914 super(TestIpsecMGreIfEspTra6, self).setUp()
1916 self.vapi.cli("set logging class ipsec level debug")
1919 self.tun_if = self.pg0
1920 p = self.ipv6_params
1921 p.tun_if = VppGreInterface(self,
1924 mode=(VppEnum.vl_api_tunnel_mode_t.
1925 TUNNEL_API_MODE_MP))
1926 p.tun_if.add_vpp_config()
1928 p.tun_if.config_ip6()
1929 p.tun_if.generate_remote_hosts(N_NHS)
1930 self.pg0.generate_remote_hosts(N_NHS)
1931 self.pg0.configure_ipv6_neighbors()
1933 # setup some SAs for several next-hops on the interface
1934 self.multi_params = []
1936 for ii in range(N_NHS):
1937 p = copy.copy(self.ipv6_params)
1939 p.remote_tun_if_host = "1::%d" % (ii + 1)
1940 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
1941 p.scapy_tun_spi = p.scapy_tun_spi + ii
1942 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
1943 p.vpp_tun_spi = p.vpp_tun_spi + ii
1945 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
1946 p.scapy_tra_spi = p.scapy_tra_spi + ii
1947 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
1948 p.vpp_tra_spi = p.vpp_tra_spi + ii
1949 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
1950 p.auth_algo_vpp_id, p.auth_key,
1951 p.crypt_algo_vpp_id, p.crypt_key,
1952 self.vpp_esp_protocol)
1953 p.tun_sa_out.add_vpp_config()
1955 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
1956 p.auth_algo_vpp_id, p.auth_key,
1957 p.crypt_algo_vpp_id, p.crypt_key,
1958 self.vpp_esp_protocol)
1959 p.tun_sa_in.add_vpp_config()
1961 # in this v6 variant add the teibs first then the protection
1962 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1963 VppTeib(self, p.tun_if,
1964 p.tun_if.remote_hosts[ii].ip6,
1965 p.tun_dst).add_vpp_config()
1967 p.tun_protect = VppIpsecTunProtect(
1972 nh=p.tun_if.remote_hosts[ii].ip6)
1973 p.tun_protect.add_vpp_config()
1974 config_tra_params(p, self.encryption_type, p.tun_if)
1975 self.multi_params.append(p)
1977 VppIpRoute(self, p.remote_tun_if_host, 128,
1978 [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
1979 p.tun_if.sw_if_index)]).add_vpp_config()
1980 p.tun_dst = self.pg0.remote_hosts[ii].ip6
1982 self.logger.info(self.vapi.cli("sh log"))
1983 self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
1984 self.logger.info(self.vapi.cli("sh adj 41"))
1987 p = self.ipv6_params
1988 p.tun_if.unconfig_ip6()
1989 super(TestIpsecMGreIfEspTra6, self).tearDown()
1991 def test_tun_66(self):
1993 for p in self.multi_params:
1994 self.verify_tun_66(p, count=63)
1997 @tag_fixme_vpp_workers
1998 class TestIpsec4TunProtect(TemplateIpsec,
1999 TemplateIpsec4TunProtect,
2001 """ IPsec IPv4 Tunnel protect - transport mode"""
2004 super(TestIpsec4TunProtect, self).setUp()
2006 self.tun_if = self.pg0
2009 super(TestIpsec4TunProtect, self).tearDown()
2011 def test_tun_44(self):
2012 """IPSEC tunnel protect"""
2014 p = self.ipv4_params
2016 self.config_network(p)
2017 self.config_sa_tra(p)
2018 self.config_protect(p)
2020 self.verify_tun_44(p, count=127)
2021 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2022 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2024 self.vapi.cli("clear ipsec sa")
2025 self.verify_tun_64(p, count=127)
2026 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2027 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2029 # rekey - create new SAs and update the tunnel protection
2031 np.crypt_key = b'X' + p.crypt_key[1:]
2032 np.scapy_tun_spi += 100
2033 np.scapy_tun_sa_id += 1
2034 np.vpp_tun_spi += 100
2035 np.vpp_tun_sa_id += 1
2036 np.tun_if.local_spi = p.vpp_tun_spi
2037 np.tun_if.remote_spi = p.scapy_tun_spi
2039 self.config_sa_tra(np)
2040 self.config_protect(np)
2043 self.verify_tun_44(np, count=127)
2044 self.assertEqual(p.tun_if.get_rx_stats(), 381)
2045 self.assertEqual(p.tun_if.get_tx_stats(), 381)
2048 self.unconfig_protect(np)
2049 self.unconfig_sa(np)
2050 self.unconfig_network(p)
2053 @tag_fixme_vpp_workers
2054 class TestIpsec4TunProtectUdp(TemplateIpsec,
2055 TemplateIpsec4TunProtect,
2057 """ IPsec IPv4 Tunnel protect - transport mode"""
2060 super(TestIpsec4TunProtectUdp, self).setUp()
2062 self.tun_if = self.pg0
2064 p = self.ipv4_params
2065 p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
2066 IPSEC_API_SAD_FLAG_UDP_ENCAP)
2067 p.nat_header = UDP(sport=4500, dport=4500)
2068 self.config_network(p)
2069 self.config_sa_tra(p)
2070 self.config_protect(p)
2073 p = self.ipv4_params
2074 self.unconfig_protect(p)
2076 self.unconfig_network(p)
2077 super(TestIpsec4TunProtectUdp, self).tearDown()
2079 def verify_encrypted(self, p, sa, rxs):
2080 # ensure encrypted packets are recieved with the default UDP ports
2082 self.assertEqual(rx[UDP].sport, 4500)
2083 self.assertEqual(rx[UDP].dport, 4500)
2084 super(TestIpsec4TunProtectUdp, self).verify_encrypted(p, sa, rxs)
2086 def test_tun_44(self):
2087 """IPSEC UDP tunnel protect"""
2089 p = self.ipv4_params
2091 self.verify_tun_44(p, count=127)
2092 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2093 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2095 def test_keepalive(self):
2096 """ IPSEC NAT Keepalive """
2097 self.verify_keepalive(self.ipv4_params)
2100 @tag_fixme_vpp_workers
2101 class TestIpsec4TunProtectTun(TemplateIpsec,
2102 TemplateIpsec4TunProtect,
2104 """ IPsec IPv4 Tunnel protect - tunnel mode"""
2106 encryption_type = ESP
2107 tun4_encrypt_node_name = "esp4-encrypt-tun"
2108 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2111 super(TestIpsec4TunProtectTun, self).setUp()
2113 self.tun_if = self.pg0
2116 super(TestIpsec4TunProtectTun, self).tearDown()
2118 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2120 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2121 sa.encrypt(IP(src=sw_intf.remote_ip4,
2122 dst=sw_intf.local_ip4) /
2123 IP(src=src, dst=dst) /
2124 UDP(sport=1144, dport=2233) /
2125 Raw(b'X' * payload_size))
2126 for i in range(count)]
2128 def gen_pkts(self, sw_intf, src, dst, count=1,
2130 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2131 IP(src=src, dst=dst) /
2132 UDP(sport=1144, dport=2233) /
2133 Raw(b'X' * payload_size)
2134 for i in range(count)]
2136 def verify_decrypted(self, p, rxs):
2138 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
2139 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
2140 self.assert_packet_checksums_valid(rx)
2142 def verify_encrypted(self, p, sa, rxs):
2145 pkt = sa.decrypt(rx[IP])
2146 if not pkt.haslayer(IP):
2147 pkt = IP(pkt[Raw].load)
2148 self.assert_packet_checksums_valid(pkt)
2149 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
2150 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
2151 inner = pkt[IP].payload
2152 self.assertEqual(inner[IP][IP].dst, p.remote_tun_if_host)
2154 except (IndexError, AssertionError):
2155 self.logger.debug(ppp("Unexpected packet:", rx))
2157 self.logger.debug(ppp("Decrypted packet:", pkt))
2162 def test_tun_44(self):
2163 """IPSEC tunnel protect """
2165 p = self.ipv4_params
2167 self.config_network(p)
2168 self.config_sa_tun(p)
2169 self.config_protect(p)
2171 # also add an output features on the tunnel and physical interface
2172 # so we test they still work
2173 r_all = AclRule(True,
2174 src_prefix="0.0.0.0/0",
2175 dst_prefix="0.0.0.0/0",
2177 a = VppAcl(self, [r_all]).add_vpp_config()
2179 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
2180 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
2182 self.verify_tun_44(p, count=127)
2184 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2185 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2187 # rekey - create new SAs and update the tunnel protection
2189 np.crypt_key = b'X' + p.crypt_key[1:]
2190 np.scapy_tun_spi += 100
2191 np.scapy_tun_sa_id += 1
2192 np.vpp_tun_spi += 100
2193 np.vpp_tun_sa_id += 1
2194 np.tun_if.local_spi = p.vpp_tun_spi
2195 np.tun_if.remote_spi = p.scapy_tun_spi
2197 self.config_sa_tun(np)
2198 self.config_protect(np)
2201 self.verify_tun_44(np, count=127)
2202 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2203 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2206 self.unconfig_protect(np)
2207 self.unconfig_sa(np)
2208 self.unconfig_network(p)
2211 class TestIpsec4TunProtectTunDrop(TemplateIpsec,
2212 TemplateIpsec4TunProtect,
2214 """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
2216 encryption_type = ESP
2217 tun4_encrypt_node_name = "esp4-encrypt-tun"
2218 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2221 super(TestIpsec4TunProtectTunDrop, self).setUp()
2223 self.tun_if = self.pg0
2226 super(TestIpsec4TunProtectTunDrop, self).tearDown()
2228 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2230 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2231 sa.encrypt(IP(src=sw_intf.remote_ip4,
2233 IP(src=src, dst=dst) /
2234 UDP(sport=1144, dport=2233) /
2235 Raw(b'X' * payload_size))
2236 for i in range(count)]
2238 def test_tun_drop_44(self):
2239 """IPSEC tunnel protect bogus tunnel header """
2241 p = self.ipv4_params
2243 self.config_network(p)
2244 self.config_sa_tun(p)
2245 self.config_protect(p)
2247 tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
2248 src=p.remote_tun_if_host,
2249 dst=self.pg1.remote_ip4,
2251 self.send_and_assert_no_replies(self.tun_if, tx)
2254 self.unconfig_protect(p)
2256 self.unconfig_network(p)
2259 @tag_fixme_vpp_workers
2260 class TestIpsec6TunProtect(TemplateIpsec,
2261 TemplateIpsec6TunProtect,
2263 """ IPsec IPv6 Tunnel protect - transport mode"""
2265 encryption_type = ESP
2266 tun6_encrypt_node_name = "esp6-encrypt-tun"
2267 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2270 super(TestIpsec6TunProtect, self).setUp()
2272 self.tun_if = self.pg0
2275 super(TestIpsec6TunProtect, self).tearDown()
2277 def test_tun_66(self):
2278 """IPSEC tunnel protect 6o6"""
2280 p = self.ipv6_params
2282 self.config_network(p)
2283 self.config_sa_tra(p)
2284 self.config_protect(p)
2286 self.verify_tun_66(p, count=127)
2287 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2288 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2290 # rekey - create new SAs and update the tunnel protection
2292 np.crypt_key = b'X' + p.crypt_key[1:]
2293 np.scapy_tun_spi += 100
2294 np.scapy_tun_sa_id += 1
2295 np.vpp_tun_spi += 100
2296 np.vpp_tun_sa_id += 1
2297 np.tun_if.local_spi = p.vpp_tun_spi
2298 np.tun_if.remote_spi = p.scapy_tun_spi
2300 self.config_sa_tra(np)
2301 self.config_protect(np)
2304 self.verify_tun_66(np, count=127)
2305 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2306 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2308 # bounce the interface state
2309 p.tun_if.admin_down()
2310 self.verify_drop_tun_66(np, count=127)
2311 node = ('/err/ipsec6-tun-input/%s' %
2312 'ipsec packets received on disabled interface')
2313 self.assertEqual(127, self.statistics.get_err_counter(node))
2315 self.verify_tun_66(np, count=127)
2318 # 1) add two input SAs [old, new]
2319 # 2) swap output SA to [new]
2320 # 3) use only [new] input SA
2322 np3.crypt_key = b'Z' + p.crypt_key[1:]
2323 np3.scapy_tun_spi += 100
2324 np3.scapy_tun_sa_id += 1
2325 np3.vpp_tun_spi += 100
2326 np3.vpp_tun_sa_id += 1
2327 np3.tun_if.local_spi = p.vpp_tun_spi
2328 np3.tun_if.remote_spi = p.scapy_tun_spi
2330 self.config_sa_tra(np3)
2333 p.tun_protect.update_vpp_config(np.tun_sa_out,
2334 [np.tun_sa_in, np3.tun_sa_in])
2335 self.verify_tun_66(np, np, count=127)
2336 self.verify_tun_66(np3, np, count=127)
2339 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2340 [np.tun_sa_in, np3.tun_sa_in])
2341 self.verify_tun_66(np, np3, count=127)
2342 self.verify_tun_66(np3, np3, count=127)
2345 p.tun_protect.update_vpp_config(np3.tun_sa_out,
2347 self.verify_tun_66(np3, np3, count=127)
2348 self.verify_drop_tun_66(np, count=127)
2350 self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
2351 self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
2352 self.unconfig_sa(np)
2355 self.unconfig_protect(np3)
2356 self.unconfig_sa(np3)
2357 self.unconfig_network(p)
2359 def test_tun_46(self):
2360 """IPSEC tunnel protect 4o6"""
2362 p = self.ipv6_params
2364 self.config_network(p)
2365 self.config_sa_tra(p)
2366 self.config_protect(p)
2368 self.verify_tun_46(p, count=127)
2369 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2370 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2373 self.unconfig_protect(p)
2375 self.unconfig_network(p)
2378 @tag_fixme_vpp_workers
2379 class TestIpsec6TunProtectTun(TemplateIpsec,
2380 TemplateIpsec6TunProtect,
2382 """ IPsec IPv6 Tunnel protect - tunnel mode"""
2384 encryption_type = ESP
2385 tun6_encrypt_node_name = "esp6-encrypt-tun"
2386 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2389 super(TestIpsec6TunProtectTun, self).setUp()
2391 self.tun_if = self.pg0
2394 super(TestIpsec6TunProtectTun, self).tearDown()
2396 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2398 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2399 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2400 dst=sw_intf.local_ip6) /
2401 IPv6(src=src, dst=dst) /
2402 UDP(sport=1166, dport=2233) /
2403 Raw(b'X' * payload_size))
2404 for i in range(count)]
2406 def gen_pkts6(self, p, sw_intf, src, dst, count=1,
2408 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2409 IPv6(src=src, dst=dst) /
2410 UDP(sport=1166, dport=2233) /
2411 Raw(b'X' * payload_size)
2412 for i in range(count)]
2414 def verify_decrypted6(self, p, rxs):
2416 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
2417 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
2418 self.assert_packet_checksums_valid(rx)
2420 def verify_encrypted6(self, p, sa, rxs):
2423 pkt = sa.decrypt(rx[IPv6])
2424 if not pkt.haslayer(IPv6):
2425 pkt = IPv6(pkt[Raw].load)
2426 self.assert_packet_checksums_valid(pkt)
2427 self.assert_equal(pkt[IPv6].dst, self.pg0.remote_ip6)
2428 self.assert_equal(pkt[IPv6].src, self.pg0.local_ip6)
2429 inner = pkt[IPv6].payload
2430 self.assertEqual(inner[IPv6][IPv6].dst, p.remote_tun_if_host)
2432 except (IndexError, AssertionError):
2433 self.logger.debug(ppp("Unexpected packet:", rx))
2435 self.logger.debug(ppp("Decrypted packet:", pkt))
2440 def test_tun_66(self):
2441 """IPSEC tunnel protect """
2443 p = self.ipv6_params
2445 self.config_network(p)
2446 self.config_sa_tun(p)
2447 self.config_protect(p)
2449 self.verify_tun_66(p, count=127)
2451 self.assertEqual(p.tun_if.get_rx_stats(), 127)
2452 self.assertEqual(p.tun_if.get_tx_stats(), 127)
2454 # rekey - create new SAs and update the tunnel protection
2456 np.crypt_key = b'X' + p.crypt_key[1:]
2457 np.scapy_tun_spi += 100
2458 np.scapy_tun_sa_id += 1
2459 np.vpp_tun_spi += 100
2460 np.vpp_tun_sa_id += 1
2461 np.tun_if.local_spi = p.vpp_tun_spi
2462 np.tun_if.remote_spi = p.scapy_tun_spi
2464 self.config_sa_tun(np)
2465 self.config_protect(np)
2468 self.verify_tun_66(np, count=127)
2469 self.assertEqual(p.tun_if.get_rx_stats(), 254)
2470 self.assertEqual(p.tun_if.get_tx_stats(), 254)
2473 self.unconfig_protect(np)
2474 self.unconfig_sa(np)
2475 self.unconfig_network(p)
2478 class TestIpsec6TunProtectTunDrop(TemplateIpsec,
2479 TemplateIpsec6TunProtect,
2481 """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
2483 encryption_type = ESP
2484 tun6_encrypt_node_name = "esp6-encrypt-tun"
2485 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2488 super(TestIpsec6TunProtectTunDrop, self).setUp()
2490 self.tun_if = self.pg0
2493 super(TestIpsec6TunProtectTunDrop, self).tearDown()
2495 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
2497 # the IP destination of the revelaed packet does not match
2498 # that assigned to the tunnel
2499 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2500 sa.encrypt(IPv6(src=sw_intf.remote_ip6,
2502 IPv6(src=src, dst=dst) /
2503 UDP(sport=1144, dport=2233) /
2504 Raw(b'X' * payload_size))
2505 for i in range(count)]
2507 def test_tun_drop_66(self):
2508 """IPSEC 6 tunnel protect bogus tunnel header """
2510 p = self.ipv6_params
2512 self.config_network(p)
2513 self.config_sa_tun(p)
2514 self.config_protect(p)
2516 tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
2517 src=p.remote_tun_if_host,
2518 dst=self.pg1.remote_ip6,
2520 self.send_and_assert_no_replies(self.tun_if, tx)
2522 self.unconfig_protect(p)
2524 self.unconfig_network(p)
2527 class TemplateIpsecItf4(object):
2528 """ IPsec Interface IPv4 """
2530 encryption_type = ESP
2531 tun4_encrypt_node_name = "esp4-encrypt-tun"
2532 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
2533 tun4_input_node = "ipsec4-tun-input"
2535 def config_sa_tun(self, p, src, dst):
2536 config_tun_params(p, self.encryption_type, None, src, dst)
2538 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2539 p.auth_algo_vpp_id, p.auth_key,
2540 p.crypt_algo_vpp_id, p.crypt_key,
2541 self.vpp_esp_protocol,
2544 p.tun_sa_out.add_vpp_config()
2546 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2547 p.auth_algo_vpp_id, p.auth_key,
2548 p.crypt_algo_vpp_id, p.crypt_key,
2549 self.vpp_esp_protocol,
2552 p.tun_sa_in.add_vpp_config()
2554 def config_protect(self, p):
2555 p.tun_protect = VppIpsecTunProtect(self,
2559 p.tun_protect.add_vpp_config()
2561 def config_network(self, p, instance=0xffffffff):
2562 p.tun_if = VppIpsecInterface(self, instance=instance)
2564 p.tun_if.add_vpp_config()
2566 p.tun_if.config_ip4()
2567 p.tun_if.config_ip6()
2569 p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
2570 [VppRoutePath(p.tun_if.remote_ip4,
2572 p.route.add_vpp_config()
2573 r = VppIpRoute(self, p.remote_tun_if_host6, 128,
2574 [VppRoutePath(p.tun_if.remote_ip6,
2576 proto=DpoProto.DPO_PROTO_IP6)])
2579 def unconfig_network(self, p):
2580 p.route.remove_vpp_config()
2581 p.tun_if.remove_vpp_config()
2583 def unconfig_protect(self, p):
2584 p.tun_protect.remove_vpp_config()
2586 def unconfig_sa(self, p):
2587 p.tun_sa_out.remove_vpp_config()
2588 p.tun_sa_in.remove_vpp_config()
2591 @tag_fixme_vpp_workers
2592 class TestIpsecItf4(TemplateIpsec,
2595 """ IPsec Interface IPv4 """
2598 super(TestIpsecItf4, self).setUp()
2600 self.tun_if = self.pg0
2603 super(TestIpsecItf4, self).tearDown()
2605 def test_tun_instance_44(self):
2606 p = self.ipv4_params
2607 self.config_network(p, instance=3)
2609 with self.assertRaises(CliFailedCommandError):
2610 self.vapi.cli("show interface ipsec0")
2612 output = self.vapi.cli("show interface ipsec3")
2613 self.assertTrue("unknown" not in output)
2615 self.unconfig_network(p)
2617 def test_tun_44(self):
2618 """IPSEC interface IPv4"""
2621 p = self.ipv4_params
2623 self.config_network(p)
2624 self.config_sa_tun(p,
2626 self.pg0.remote_ip4)
2627 self.config_protect(p)
2629 self.verify_tun_44(p, count=n_pkts)
2630 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2631 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2633 p.tun_if.admin_down()
2634 self.verify_tun_dropped_44(p, count=n_pkts)
2636 self.verify_tun_44(p, count=n_pkts)
2638 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2639 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2641 # it's a v6 packet when its encrypted
2642 self.tun4_encrypt_node_name = "esp6-encrypt-tun"
2644 self.verify_tun_64(p, count=n_pkts)
2645 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2646 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2648 self.tun4_encrypt_node_name = "esp4-encrypt-tun"
2650 self.vapi.cli("clear interfaces")
2652 # rekey - create new SAs and update the tunnel protection
2654 np.crypt_key = b'X' + p.crypt_key[1:]
2655 np.scapy_tun_spi += 100
2656 np.scapy_tun_sa_id += 1
2657 np.vpp_tun_spi += 100
2658 np.vpp_tun_sa_id += 1
2659 np.tun_if.local_spi = p.vpp_tun_spi
2660 np.tun_if.remote_spi = p.scapy_tun_spi
2662 self.config_sa_tun(np,
2664 self.pg0.remote_ip4)
2665 self.config_protect(np)
2668 self.verify_tun_44(np, count=n_pkts)
2669 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2670 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2673 self.unconfig_protect(np)
2674 self.unconfig_sa(np)
2675 self.unconfig_network(p)
2677 def test_tun_44_null(self):
2678 """IPSEC interface IPv4 NULL auth/crypto"""
2681 p = copy.copy(self.ipv4_params)
2683 p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
2684 IPSEC_API_INTEG_ALG_NONE)
2685 p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
2686 IPSEC_API_CRYPTO_ALG_NONE)
2687 p.crypt_algo = "NULL"
2688 p.auth_algo = "NULL"
2690 self.config_network(p)
2691 self.config_sa_tun(p,
2693 self.pg0.remote_ip4)
2694 self.config_protect(p)
2696 self.logger.error(self.vapi.cli("sh ipsec sa"))
2697 self.verify_tun_44(p, count=n_pkts)
2700 self.unconfig_protect(p)
2702 self.unconfig_network(p)
2704 def test_tun_44_police(self):
2705 """IPSEC interface IPv4 with input policer"""
2707 p = self.ipv4_params
2709 self.config_network(p)
2710 self.config_sa_tun(p,
2712 self.pg0.remote_ip4)
2713 self.config_protect(p)
2715 action_tx = PolicerAction(
2716 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
2718 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
2719 conform_action=action_tx,
2720 exceed_action=action_tx,
2721 violate_action=action_tx)
2722 policer.add_vpp_config()
2724 # Start policing on tun
2725 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
2727 self.verify_tun_44(p, count=n_pkts)
2728 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2729 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2731 stats = policer.get_stats()
2733 # Single rate, 2 colour policer - expect conform, violate but no exceed
2734 self.assertGreater(stats['conform_packets'], 0)
2735 self.assertEqual(stats['exceed_packets'], 0)
2736 self.assertGreater(stats['violate_packets'], 0)
2738 # Stop policing on tun
2739 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
2740 self.verify_tun_44(p, count=n_pkts)
2742 # No new policer stats
2743 statsnew = policer.get_stats()
2744 self.assertEqual(stats, statsnew)
2747 policer.remove_vpp_config()
2748 self.unconfig_protect(p)
2750 self.unconfig_network(p)
2753 class TestIpsecItf4MPLS(TemplateIpsec,
2756 """ IPsec Interface MPLSoIPv4 """
2758 tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
2761 super(TestIpsecItf4MPLS, self).setUp()
2763 self.tun_if = self.pg0
2766 super(TestIpsecItf4MPLS, self).tearDown()
2768 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
2770 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
2771 sa.encrypt(MPLS(label=44, ttl=3) /
2772 IP(src=src, dst=dst) /
2773 UDP(sport=1166, dport=2233) /
2774 Raw(b'X' * payload_size))
2775 for i in range(count)]
2777 def verify_encrypted(self, p, sa, rxs):
2780 pkt = sa.decrypt(rx[IP])
2781 if not pkt.haslayer(IP):
2782 pkt = IP(pkt[Raw].load)
2783 self.assert_packet_checksums_valid(pkt)
2784 self.assert_equal(pkt[MPLS].label, 44)
2785 self.assert_equal(pkt[IP].dst, p.remote_tun_if_host)
2786 except (IndexError, AssertionError):
2787 self.logger.debug(ppp("Unexpected packet:", rx))
2789 self.logger.debug(ppp("Decrypted packet:", pkt))
2794 def test_tun_mpls_o_ip4(self):
2795 """IPSEC interface MPLS over IPv4"""
2798 p = self.ipv4_params
2801 tbl = VppMplsTable(self, 0)
2802 tbl.add_vpp_config()
2804 self.config_network(p)
2805 # deag MPLS routes from the tunnel
2806 r4 = VppMplsRoute(self, 44, 1,
2808 self.pg1.remote_ip4,
2809 self.pg1.sw_if_index)]).add_vpp_config()
2810 p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
2811 p.tun_if.sw_if_index,
2812 labels=[VppMplsLabel(44)])])
2813 p.tun_if.enable_mpls()
2815 self.config_sa_tun(p,
2817 self.pg0.remote_ip4)
2818 self.config_protect(p)
2820 self.verify_tun_44(p, count=n_pkts)
2823 p.tun_if.disable_mpls()
2824 self.unconfig_protect(p)
2826 self.unconfig_network(p)
2829 class TemplateIpsecItf6(object):
2830 """ IPsec Interface IPv6 """
2832 encryption_type = ESP
2833 tun6_encrypt_node_name = "esp6-encrypt-tun"
2834 tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
2835 tun6_input_node = "ipsec6-tun-input"
2837 def config_sa_tun(self, p, src, dst):
2838 config_tun_params(p, self.encryption_type, None, src, dst)
2840 if not hasattr(p, 'tun_flags'):
2842 if not hasattr(p, 'hop_limit'):
2845 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
2846 p.auth_algo_vpp_id, p.auth_key,
2847 p.crypt_algo_vpp_id, p.crypt_key,
2848 self.vpp_esp_protocol,
2851 tun_flags=p.tun_flags,
2852 hop_limit=p.hop_limit)
2853 p.tun_sa_out.add_vpp_config()
2855 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
2856 p.auth_algo_vpp_id, p.auth_key,
2857 p.crypt_algo_vpp_id, p.crypt_key,
2858 self.vpp_esp_protocol,
2861 p.tun_sa_in.add_vpp_config()
2863 def config_protect(self, p):
2864 p.tun_protect = VppIpsecTunProtect(self,
2868 p.tun_protect.add_vpp_config()
2870 def config_network(self, p):
2871 p.tun_if = VppIpsecInterface(self)
2873 p.tun_if.add_vpp_config()
2875 p.tun_if.config_ip4()
2876 p.tun_if.config_ip6()
2878 r = VppIpRoute(self, p.remote_tun_if_host4, 32,
2879 [VppRoutePath(p.tun_if.remote_ip4,
2883 p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
2884 [VppRoutePath(p.tun_if.remote_ip6,
2886 proto=DpoProto.DPO_PROTO_IP6)])
2887 p.route.add_vpp_config()
2889 def unconfig_network(self, p):
2890 p.route.remove_vpp_config()
2891 p.tun_if.remove_vpp_config()
2893 def unconfig_protect(self, p):
2894 p.tun_protect.remove_vpp_config()
2896 def unconfig_sa(self, p):
2897 p.tun_sa_out.remove_vpp_config()
2898 p.tun_sa_in.remove_vpp_config()
2901 @tag_fixme_vpp_workers
2902 class TestIpsecItf6(TemplateIpsec,
2905 """ IPsec Interface IPv6 """
2908 super(TestIpsecItf6, self).setUp()
2910 self.tun_if = self.pg0
2913 super(TestIpsecItf6, self).tearDown()
2915 def test_tun_44(self):
2916 """IPSEC interface IPv6"""
2918 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2920 p = self.ipv6_params
2921 p.inner_hop_limit = 24
2922 p.outer_hop_limit = 23
2923 p.outer_flow_label = 243224
2924 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2926 self.config_network(p)
2927 self.config_sa_tun(p,
2929 self.pg0.remote_ip6)
2930 self.config_protect(p)
2932 self.verify_tun_66(p, count=n_pkts)
2933 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2934 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2936 p.tun_if.admin_down()
2937 self.verify_drop_tun_66(p, count=n_pkts)
2939 self.verify_tun_66(p, count=n_pkts)
2941 self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
2942 self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
2944 # it's a v4 packet when its encrypted
2945 self.tun6_encrypt_node_name = "esp4-encrypt-tun"
2947 self.verify_tun_46(p, count=n_pkts)
2948 self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
2949 self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
2951 self.tun6_encrypt_node_name = "esp6-encrypt-tun"
2953 self.vapi.cli("clear interfaces")
2955 # rekey - create new SAs and update the tunnel protection
2957 np.crypt_key = b'X' + p.crypt_key[1:]
2958 np.scapy_tun_spi += 100
2959 np.scapy_tun_sa_id += 1
2960 np.vpp_tun_spi += 100
2961 np.vpp_tun_sa_id += 1
2962 np.tun_if.local_spi = p.vpp_tun_spi
2963 np.tun_if.remote_spi = p.scapy_tun_spi
2964 np.inner_hop_limit = 24
2965 np.outer_hop_limit = 128
2966 np.inner_flow_label = 0xabcde
2967 np.outer_flow_label = 0xabcde
2969 np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
2971 self.config_sa_tun(np,
2973 self.pg0.remote_ip6)
2974 self.config_protect(np)
2977 self.verify_tun_66(np, count=n_pkts)
2978 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
2979 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
2982 self.unconfig_protect(np)
2983 self.unconfig_sa(np)
2984 self.unconfig_network(p)
2986 def test_tun_66_police(self):
2987 """IPSEC interface IPv6 with input policer"""
2988 tf = VppEnum.vl_api_tunnel_encap_decap_flags_t
2990 p = self.ipv6_params
2991 p.inner_hop_limit = 24
2992 p.outer_hop_limit = 23
2993 p.outer_flow_label = 243224
2994 p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
2996 self.config_network(p)
2997 self.config_sa_tun(p,
2999 self.pg0.remote_ip6)
3000 self.config_protect(p)
3002 action_tx = PolicerAction(
3003 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
3005 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
3006 conform_action=action_tx,
3007 exceed_action=action_tx,
3008 violate_action=action_tx)
3009 policer.add_vpp_config()
3011 # Start policing on tun
3012 policer.apply_vpp_config(p.tun_if.sw_if_index, True)
3014 self.verify_tun_66(p, count=n_pkts)
3015 self.assertEqual(p.tun_if.get_rx_stats(), n_pkts)
3016 self.assertEqual(p.tun_if.get_tx_stats(), n_pkts)
3018 stats = policer.get_stats()
3020 # Single rate, 2 colour policer - expect conform, violate but no exceed
3021 self.assertGreater(stats['conform_packets'], 0)
3022 self.assertEqual(stats['exceed_packets'], 0)
3023 self.assertGreater(stats['violate_packets'], 0)
3025 # Stop policing on tun
3026 policer.apply_vpp_config(p.tun_if.sw_if_index, False)
3027 self.verify_tun_66(p, count=n_pkts)
3029 # No new policer stats
3030 statsnew = policer.get_stats()
3031 self.assertEqual(stats, statsnew)
3034 policer.remove_vpp_config()
3035 self.unconfig_protect(p)
3037 self.unconfig_network(p)
3040 class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
3041 """ Ipsec P2MP ESP v4 tests """
3042 tun4_encrypt_node_name = "esp4-encrypt-tun"
3043 tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
3044 encryption_type = ESP
3046 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
3048 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3049 sa.encrypt(IP(src=self.pg1.local_ip4,
3050 dst=self.pg1.remote_ip4) /
3051 UDP(sport=1144, dport=2233) /
3052 Raw(b'X' * payload_size))
3053 for i in range(count)]
3055 def gen_pkts(self, sw_intf, src, dst, count=1,
3057 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3058 IP(src="1.1.1.1", dst=dst) /
3059 UDP(sport=1144, dport=2233) /
3060 Raw(b'X' * payload_size)
3061 for i in range(count)]
3063 def verify_decrypted(self, p, rxs):
3065 self.assert_equal(rx[Ether].dst, self.pg1.remote_mac)
3066 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
3068 def verify_encrypted(self, p, sa, rxs):
3071 self.assertEqual(rx[IP].tos,
3072 VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
3073 self.assertEqual(rx[IP].ttl, p.hop_limit)
3074 pkt = sa.decrypt(rx[IP])
3075 if not pkt.haslayer(IP):
3076 pkt = IP(pkt[Raw].load)
3077 self.assert_packet_checksums_valid(pkt)
3079 self.assertEqual(e[IP].dst, p.remote_tun_if_host)
3080 except (IndexError, AssertionError):
3081 self.logger.debug(ppp("Unexpected packet:", rx))
3083 self.logger.debug(ppp("Decrypted packet:", pkt))
3089 super(TestIpsecMIfEsp4, self).setUp()
3092 self.tun_if = self.pg0
3093 p = self.ipv4_params
3094 p.tun_if = VppIpsecInterface(self,
3095 mode=(VppEnum.vl_api_tunnel_mode_t.
3096 TUNNEL_API_MODE_MP))
3097 p.tun_if.add_vpp_config()
3099 p.tun_if.config_ip4()
3100 p.tun_if.unconfig_ip4()
3101 p.tun_if.config_ip4()
3102 p.tun_if.generate_remote_hosts(N_NHS)
3103 self.pg0.generate_remote_hosts(N_NHS)
3104 self.pg0.configure_ipv4_neighbors()
3106 r_all = AclRule(True,
3107 src_prefix="0.0.0.0/0",
3108 dst_prefix="0.0.0.0/0",
3110 a = VppAcl(self, [r_all]).add_vpp_config()
3112 VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
3113 VppAclInterface(self, p.tun_if.sw_if_index, [a]).add_vpp_config()
3115 # setup some SAs for several next-hops on the interface
3116 self.multi_params = []
3118 for ii in range(N_NHS):
3119 p = copy.copy(self.ipv4_params)
3121 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
3122 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
3123 p.scapy_tun_spi = p.scapy_tun_spi + ii
3124 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
3125 p.vpp_tun_spi = p.vpp_tun_spi + ii
3127 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
3128 p.scapy_tra_spi = p.scapy_tra_spi + ii
3129 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
3130 p.vpp_tra_spi = p.vpp_tra_spi + ii
3132 p.tun_sa_out = VppIpsecSA(
3133 self, p.scapy_tun_sa_id, p.scapy_tun_spi,
3134 p.auth_algo_vpp_id, p.auth_key,
3135 p.crypt_algo_vpp_id, p.crypt_key,
3136 self.vpp_esp_protocol,
3138 self.pg0.remote_hosts[ii].ip4,
3139 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3140 hop_limit=p.hop_limit)
3141 p.tun_sa_out.add_vpp_config()
3143 p.tun_sa_in = VppIpsecSA(
3144 self, p.vpp_tun_sa_id, p.vpp_tun_spi,
3145 p.auth_algo_vpp_id, p.auth_key,
3146 p.crypt_algo_vpp_id, p.crypt_key,
3147 self.vpp_esp_protocol,
3148 self.pg0.remote_hosts[ii].ip4,
3150 dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
3151 hop_limit=p.hop_limit)
3152 p.tun_sa_in.add_vpp_config()
3154 p.tun_protect = VppIpsecTunProtect(
3159 nh=p.tun_if.remote_hosts[ii].ip4)
3160 p.tun_protect.add_vpp_config()
3161 config_tun_params(p, self.encryption_type, None,
3163 self.pg0.remote_hosts[ii].ip4)
3164 self.multi_params.append(p)
3166 p.via_tun_route = VppIpRoute(
3167 self, p.remote_tun_if_host, 32,
3168 [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
3169 p.tun_if.sw_if_index)]).add_vpp_config()
3171 p.tun_dst = self.pg0.remote_hosts[ii].ip4
3174 p = self.ipv4_params
3175 p.tun_if.unconfig_ip4()
3176 super(TestIpsecMIfEsp4, self).tearDown()
3178 def test_tun_44(self):
3181 for p in self.multi_params:
3182 self.verify_tun_44(p, count=N_PKTS)
3184 # remove one tunnel protect, the rest should still work
3185 self.multi_params[0].tun_protect.remove_vpp_config()
3186 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3187 self.multi_params[0].via_tun_route.remove_vpp_config()
3188 self.verify_tun_dropped_44(self.multi_params[0], count=N_PKTS)
3190 for p in self.multi_params[1:]:
3191 self.verify_tun_44(p, count=N_PKTS)
3193 self.multi_params[0].tun_protect.add_vpp_config()
3194 self.multi_params[0].via_tun_route.add_vpp_config()
3196 for p in self.multi_params:
3197 self.verify_tun_44(p, count=N_PKTS)
3200 class TestIpsecItf6MPLS(TemplateIpsec,
3203 """ IPsec Interface MPLSoIPv6 """
3205 tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
3208 super(TestIpsecItf6MPLS, self).setUp()
3210 self.tun_if = self.pg0
3213 super(TestIpsecItf6MPLS, self).tearDown()
3215 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
3217 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
3218 sa.encrypt(MPLS(label=66, ttl=3) /
3219 IPv6(src=src, dst=dst) /
3220 UDP(sport=1166, dport=2233) /
3221 Raw(b'X' * payload_size))
3222 for i in range(count)]
3224 def verify_encrypted6(self, p, sa, rxs):
3227 pkt = sa.decrypt(rx[IPv6])
3228 if not pkt.haslayer(IPv6):
3229 pkt = IP(pkt[Raw].load)
3230 self.assert_packet_checksums_valid(pkt)
3231 self.assert_equal(pkt[MPLS].label, 66)
3232 self.assert_equal(pkt[IPv6].dst, p.remote_tun_if_host)
3233 except (IndexError, AssertionError):
3234 self.logger.debug(ppp("Unexpected packet:", rx))
3236 self.logger.debug(ppp("Decrypted packet:", pkt))
3241 def test_tun_mpls_o_ip6(self):
3242 """IPSEC interface MPLS over IPv6"""
3245 p = self.ipv6_params
3248 tbl = VppMplsTable(self, 0)
3249 tbl.add_vpp_config()
3251 self.config_network(p)
3252 # deag MPLS routes from the tunnel
3253 r6 = VppMplsRoute(self, 66, 1,
3255 self.pg1.remote_ip6,
3256 self.pg1.sw_if_index)],
3257 eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
3258 p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
3259 p.tun_if.sw_if_index,
3260 labels=[VppMplsLabel(66)])])
3261 p.tun_if.enable_mpls()
3263 self.config_sa_tun(p,
3265 self.pg0.remote_ip6)
3266 self.config_protect(p)
3268 self.verify_tun_66(p, count=n_pkts)
3271 p.tun_if.disable_mpls()
3272 self.unconfig_protect(p)
3274 self.unconfig_network(p)
3277 if __name__ == '__main__':
3278 unittest.main(testRunner=VppTestRunner)