5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10 IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
18 class IPsecIPv4Params:
20 addr_type = socket.AF_INET
22 addr_bcast = "255.255.255.255"
27 self.remote_tun_if_host = '1.1.1.1'
28 self.remote_tun_if_host6 = '1111::1'
30 self.scapy_tun_sa_id = 100
31 self.scapy_tun_spi = 1000
32 self.vpp_tun_sa_id = 200
33 self.vpp_tun_spi = 2000
35 self.scapy_tra_sa_id = 300
36 self.scapy_tra_spi = 3000
37 self.vpp_tra_sa_id = 400
38 self.vpp_tra_spi = 4000
40 self.outer_hop_limit = 64
41 self.inner_hop_limit = 255
42 self.outer_flow_label = 0
43 self.inner_flow_label = 0x12345
45 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
46 IPSEC_API_INTEG_ALG_SHA1_96)
47 self.auth_algo = 'HMAC-SHA1-96' # scapy name
48 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
50 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
51 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
52 self.crypt_algo = 'AES-CBC' # scapy name
53 self.crypt_key = b'JPjyOWBeVEQiMe7h'
56 self.nat_header = None
57 self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
58 TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
62 class IPsecIPv6Params:
64 addr_type = socket.AF_INET6
66 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
71 self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
72 self.remote_tun_if_host4 = '1.1.1.1'
74 self.scapy_tun_sa_id = 500
75 self.scapy_tun_spi = 3001
76 self.vpp_tun_sa_id = 600
77 self.vpp_tun_spi = 3000
79 self.scapy_tra_sa_id = 700
80 self.scapy_tra_spi = 4001
81 self.vpp_tra_sa_id = 800
82 self.vpp_tra_spi = 4000
84 self.outer_hop_limit = 64
85 self.inner_hop_limit = 255
86 self.outer_flow_label = 0
87 self.inner_flow_label = 0x12345
89 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
90 IPSEC_API_INTEG_ALG_SHA1_96)
91 self.auth_algo = 'HMAC-SHA1-96' # scapy name
92 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
94 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
95 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
96 self.crypt_algo = 'AES-CBC' # scapy name
97 self.crypt_key = b'JPjyOWBeVEQiMe7h'
100 self.nat_header = None
101 self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
102 TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
106 def mk_scapy_crypt_key(p):
107 if p.crypt_algo in ("AES-GCM", "AES-CTR"):
108 return p.crypt_key + struct.pack("!I", p.salt)
113 def config_tun_params(p, encryption_type, tun_if):
114 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
115 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
116 IPSEC_API_SAD_FLAG_USE_ESN))
117 p.tun_dst = tun_if.remote_addr[p.addr_type]
118 p.tun_src = tun_if.local_addr[p.addr_type]
119 crypt_key = mk_scapy_crypt_key(p)
120 p.scapy_tun_sa = SecurityAssociation(
121 encryption_type, spi=p.vpp_tun_spi,
122 crypt_algo=p.crypt_algo,
124 auth_algo=p.auth_algo, auth_key=p.auth_key,
125 tunnel_header=ip_class_by_addr_type[p.addr_type](
128 nat_t_header=p.nat_header,
130 p.vpp_tun_sa = SecurityAssociation(
131 encryption_type, spi=p.scapy_tun_spi,
132 crypt_algo=p.crypt_algo,
134 auth_algo=p.auth_algo, auth_key=p.auth_key,
135 tunnel_header=ip_class_by_addr_type[p.addr_type](
138 nat_t_header=p.nat_header,
142 def config_tra_params(p, encryption_type):
143 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
144 IPSEC_API_SAD_FLAG_USE_ESN))
145 crypt_key = mk_scapy_crypt_key(p)
146 p.scapy_tra_sa = SecurityAssociation(
149 crypt_algo=p.crypt_algo,
151 auth_algo=p.auth_algo,
153 nat_t_header=p.nat_header,
155 p.vpp_tra_sa = SecurityAssociation(
158 crypt_algo=p.crypt_algo,
160 auth_algo=p.auth_algo,
162 nat_t_header=p.nat_header,
166 class TemplateIpsec(VppTestCase):
171 |tra_if| <-------> |VPP|
176 ------ encrypt --- plain ---
177 |tun_if| <------- |VPP| <------ |pg1|
180 ------ decrypt --- plain ---
181 |tun_if| -------> |VPP| ------> |pg1|
187 def ipsec_select_backend(self):
188 """ empty method to be overloaded when necessary """
193 super(TemplateIpsec, cls).setUpClass()
196 def tearDownClass(cls):
197 super(TemplateIpsec, cls).tearDownClass()
199 def setup_params(self):
200 if not hasattr(self, 'ipv4_params'):
201 self.ipv4_params = IPsecIPv4Params()
202 if not hasattr(self, 'ipv6_params'):
203 self.ipv6_params = IPsecIPv6Params()
204 self.params = {self.ipv4_params.addr_type: self.ipv4_params,
205 self.ipv6_params.addr_type: self.ipv6_params}
207 def config_interfaces(self):
208 self.create_pg_interfaces(range(3))
209 self.interfaces = list(self.pg_interfaces)
210 for i in self.interfaces:
218 super(TemplateIpsec, self).setUp()
222 self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
224 self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
227 self.config_interfaces()
229 self.ipsec_select_backend()
231 def unconfig_interfaces(self):
232 for i in self.interfaces:
238 super(TemplateIpsec, self).tearDown()
240 self.unconfig_interfaces()
242 def show_commands_at_teardown(self):
243 self.logger.info(self.vapi.cli("show hardware"))
245 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
247 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
248 sa.encrypt(IP(src=src, dst=dst) /
249 ICMP() / Raw(b'X' * payload_size))
250 for i in range(count)]
252 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
254 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
255 sa.encrypt(IPv6(src=src, dst=dst,
256 hlim=p.inner_hop_limit,
257 fl=p.inner_flow_label) /
258 ICMPv6EchoRequest(id=0, seq=1,
259 data='X' * payload_size))
260 for i in range(count)]
262 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
263 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
264 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
265 for i in range(count)]
267 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
268 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
269 IPv6(src=src, dst=dst,
270 hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
271 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
272 for i in range(count)]
275 class IpsecTcp(object):
276 def verify_tcp_checksum(self):
277 self.vapi.cli("test http server")
278 p = self.params[socket.AF_INET]
279 send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
280 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
281 dst=self.tun_if.local_ip4) /
282 TCP(flags='S', dport=80)))
283 self.logger.debug(ppp("Sending packet:", send))
284 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
286 decrypted = p.vpp_tun_sa.decrypt(recv[IP])
287 self.assert_packet_checksums_valid(decrypted)
290 class IpsecTcpTests(IpsecTcp):
291 def test_tcp_checksum(self):
292 """ verify checksum correctness for vpp generated packets """
293 self.verify_tcp_checksum()
296 class IpsecTra4(object):
297 """ verify methods for Transport v4 """
298 def verify_tra_anti_replay(self):
299 p = self.params[socket.AF_INET]
300 esn_en = p.vpp_tra_sa.esn_en
302 seq_cycle_node_name = ('/err/%s/sequence number cycled' %
303 self.tra4_encrypt_node_name)
304 replay_node_name = ('/err/%s/SA replayed packet' %
305 self.tra4_decrypt_node_name)
306 if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
307 hash_failed_node_name = ('/err/%s/ESP decryption failed' %
308 self.tra4_decrypt_node_name)
310 hash_failed_node_name = ('/err/%s/Integrity check failed' %
311 self.tra4_decrypt_node_name)
312 replay_count = self.statistics.get_err_counter(replay_node_name)
313 hash_failed_count = self.statistics.get_err_counter(
314 hash_failed_node_name)
315 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
317 if ESP == self.encryption_type:
318 undersize_node_name = ('/err/%s/undersized packet' %
319 self.tra4_decrypt_node_name)
320 undersize_count = self.statistics.get_err_counter(
324 # send packets with seq numbers 1->34
325 # this means the window size is still in Case B (see RFC4303
328 # for reasons i haven't investigated Scapy won't create a packet with
331 pkts = [(Ether(src=self.tra_if.remote_mac,
332 dst=self.tra_if.local_mac) /
333 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
334 dst=self.tra_if.local_ip4) /
337 for seq in range(1, 34)]
338 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
340 # replayed packets are dropped
341 self.send_and_assert_no_replies(self.tra_if, pkts)
342 replay_count += len(pkts)
343 self.assert_error_counter_equal(replay_node_name, replay_count)
346 # now send a batch of packets all with the same sequence number
347 # the first packet in the batch is legitimate, the rest bogus
349 pkts = (Ether(src=self.tra_if.remote_mac,
350 dst=self.tra_if.local_mac) /
351 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
352 dst=self.tra_if.local_ip4) /
355 recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
358 self.assert_error_counter_equal(replay_node_name, replay_count)
361 # now move the window over to 257 (more than one byte) and into Case A
363 pkt = (Ether(src=self.tra_if.remote_mac,
364 dst=self.tra_if.local_mac) /
365 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
366 dst=self.tra_if.local_ip4) /
369 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
371 # replayed packets are dropped
372 self.send_and_assert_no_replies(self.tra_if, pkt * 3)
374 self.assert_error_counter_equal(replay_node_name, replay_count)
376 # the window size is 64 packets
377 # in window are still accepted
378 pkt = (Ether(src=self.tra_if.remote_mac,
379 dst=self.tra_if.local_mac) /
380 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
381 dst=self.tra_if.local_ip4) /
384 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
386 # a packet that does not decrypt does not move the window forward
387 bogus_sa = SecurityAssociation(self.encryption_type,
389 crypt_algo=p.crypt_algo,
390 crypt_key=mk_scapy_crypt_key(p)[::-1],
391 auth_algo=p.auth_algo,
392 auth_key=p.auth_key[::-1])
393 pkt = (Ether(src=self.tra_if.remote_mac,
394 dst=self.tra_if.local_mac) /
395 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
396 dst=self.tra_if.local_ip4) /
399 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
401 hash_failed_count += 17
402 self.assert_error_counter_equal(hash_failed_node_name,
405 # a malformed 'runt' packet
406 # created by a mis-constructed SA
407 if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
408 bogus_sa = SecurityAssociation(self.encryption_type,
410 pkt = (Ether(src=self.tra_if.remote_mac,
411 dst=self.tra_if.local_mac) /
412 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
413 dst=self.tra_if.local_ip4) /
416 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
418 undersize_count += 17
419 self.assert_error_counter_equal(undersize_node_name,
422 # which we can determine since this packet is still in the window
423 pkt = (Ether(src=self.tra_if.remote_mac,
424 dst=self.tra_if.local_mac) /
425 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
426 dst=self.tra_if.local_ip4) /
429 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
432 # out of window are dropped
433 # this is Case B. So VPP will consider this to be a high seq num wrap
434 # and so the decrypt attempt will fail
436 pkt = (Ether(src=self.tra_if.remote_mac,
437 dst=self.tra_if.local_mac) /
438 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439 dst=self.tra_if.local_ip4) /
442 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
445 # an out of window error with ESN looks like a high sequence
446 # wrap. but since it isn't then the verify will fail.
447 hash_failed_count += 17
448 self.assert_error_counter_equal(hash_failed_node_name,
453 self.assert_error_counter_equal(replay_node_name,
456 # valid packet moves the window over to 258
457 pkt = (Ether(src=self.tra_if.remote_mac,
458 dst=self.tra_if.local_mac) /
459 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
460 dst=self.tra_if.local_ip4) /
463 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
464 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
467 # move VPP's SA TX seq-num to just before the seq-number wrap.
468 # then fire in a packet that VPP should drop on TX because it
469 # causes the TX seq number to wrap; unless we're using extened sequence
472 self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
473 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
474 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
476 pkts = [(Ether(src=self.tra_if.remote_mac,
477 dst=self.tra_if.local_mac) /
478 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
479 dst=self.tra_if.local_ip4) /
482 for seq in range(259, 280)]
485 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
488 # in order for scapy to decrypt its SA's high order number needs
491 p.vpp_tra_sa.seq_num = 0x100000000
493 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
496 # wrap scapy's TX high sequence number. VPP is in case B, so it
497 # will consider this a high seq wrap also.
498 # The low seq num we set it to will place VPP's RX window in Case A
500 p.scapy_tra_sa.seq_num = 0x100000005
501 pkt = (Ether(src=self.tra_if.remote_mac,
502 dst=self.tra_if.local_mac) /
503 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
504 dst=self.tra_if.local_ip4) /
506 seq_num=0x100000005))
507 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
508 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
511 # A packet that has seq num between (2^32-64) and 5 is within
514 p.scapy_tra_sa.seq_num = 0xfffffffd
515 pkt = (Ether(src=self.tra_if.remote_mac,
516 dst=self.tra_if.local_mac) /
517 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
518 dst=self.tra_if.local_ip4) /
521 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
522 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
525 # While in case A we cannot wrap the high sequence number again
526 # becuase VPP will consider this packet to be one that moves the
529 pkt = (Ether(src=self.tra_if.remote_mac,
530 dst=self.tra_if.local_mac) /
531 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
532 dst=self.tra_if.local_ip4) /
534 seq_num=0x200000999))
535 self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
537 hash_failed_count += 1
538 self.assert_error_counter_equal(hash_failed_node_name,
542 # but if we move the wondow forward to case B, then we can wrap
545 p.scapy_tra_sa.seq_num = 0x100000555
546 pkt = (Ether(src=self.tra_if.remote_mac,
547 dst=self.tra_if.local_mac) /
548 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
549 dst=self.tra_if.local_ip4) /
551 seq_num=0x100000555))
552 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
553 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
555 p.scapy_tra_sa.seq_num = 0x200000444
556 pkt = (Ether(src=self.tra_if.remote_mac,
557 dst=self.tra_if.local_mac) /
558 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
559 dst=self.tra_if.local_ip4) /
561 seq_num=0x200000444))
562 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
563 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
567 # without ESN TX sequence numbers can't wrap and packets are
568 # dropped from here on out.
570 self.send_and_assert_no_replies(self.tra_if, pkts)
571 seq_cycle_count += len(pkts)
572 self.assert_error_counter_equal(seq_cycle_node_name,
575 # move the security-associations seq number on to the last we used
576 self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
577 p.scapy_tra_sa.seq_num = 351
578 p.vpp_tra_sa.seq_num = 351
580 def verify_tra_basic4(self, count=1, payload_size=54):
581 """ ipsec v4 transport basic test """
582 self.vapi.cli("clear errors")
583 self.vapi.cli("clear ipsec sa")
585 p = self.params[socket.AF_INET]
586 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
587 src=self.tra_if.remote_ip4,
588 dst=self.tra_if.local_ip4,
590 payload_size=payload_size)
591 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
594 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
595 self.assert_packet_checksums_valid(rx)
597 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
598 self.assert_packet_checksums_valid(decrypted)
600 self.logger.debug(ppp("Unexpected packet:", rx))
603 self.logger.info(self.vapi.ppcli("show error"))
604 self.logger.info(self.vapi.ppcli("show ipsec all"))
606 pkts = p.tra_sa_in.get_stats()['packets']
607 self.assertEqual(pkts, count,
608 "incorrect SA in counts: expected %d != %d" %
610 pkts = p.tra_sa_out.get_stats()['packets']
611 self.assertEqual(pkts, count,
612 "incorrect SA out counts: expected %d != %d" %
615 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
616 self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
619 class IpsecTra4Tests(IpsecTra4):
620 """ UT test methods for Transport v4 """
621 def test_tra_anti_replay(self):
622 """ ipsec v4 transport anti-replay test """
623 self.verify_tra_anti_replay()
625 def test_tra_basic(self, count=1):
626 """ ipsec v4 transport basic test """
627 self.verify_tra_basic4(count=1)
629 def test_tra_burst(self):
630 """ ipsec v4 transport burst test """
631 self.verify_tra_basic4(count=257)
634 class IpsecTra6(object):
635 """ verify methods for Transport v6 """
636 def verify_tra_basic6(self, count=1, payload_size=54):
637 self.vapi.cli("clear errors")
638 self.vapi.cli("clear ipsec sa")
640 p = self.params[socket.AF_INET6]
641 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
642 src=self.tra_if.remote_ip6,
643 dst=self.tra_if.local_ip6,
645 payload_size=payload_size)
646 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
649 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
652 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
653 self.assert_packet_checksums_valid(decrypted)
655 self.logger.debug(ppp("Unexpected packet:", rx))
658 self.logger.info(self.vapi.ppcli("show error"))
659 self.logger.info(self.vapi.ppcli("show ipsec all"))
661 pkts = p.tra_sa_in.get_stats()['packets']
662 self.assertEqual(pkts, count,
663 "incorrect SA in counts: expected %d != %d" %
665 pkts = p.tra_sa_out.get_stats()['packets']
666 self.assertEqual(pkts, count,
667 "incorrect SA out counts: expected %d != %d" %
669 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
670 self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
672 def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
674 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
675 sa.encrypt(IPv6(src=src, dst=dst) /
676 ICMPv6EchoRequest(id=0, seq=1,
677 data='X' * payload_size))
678 for i in range(count)]
680 def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
681 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
682 IPv6(src=src, dst=dst) /
683 IPv6ExtHdrHopByHop() /
684 IPv6ExtHdrFragment(id=2, offset=200) /
686 for i in range(count)]
688 def verify_tra_encrypted6(self, p, sa, rxs):
691 self.assert_packet_checksums_valid(rx)
693 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
694 decrypted.append(decrypt_pkt)
695 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
696 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
698 self.logger.debug(ppp("Unexpected packet:", rx))
700 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
706 def verify_tra_66_ext_hdrs(self, p):
710 # check we can decrypt with options
712 tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
713 src=self.tra_if.remote_ip6,
714 dst=self.tra_if.local_ip6,
716 self.send_and_expect(self.tra_if, tx, self.tra_if)
719 # injecting a packet from ourselves to be routed of box is a hack
720 # but it matches an outbout policy, alors je ne regrette rien
723 # one extension before ESP
724 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
725 IPv6(src=self.tra_if.local_ip6,
726 dst=self.tra_if.remote_ip6) /
727 IPv6ExtHdrFragment(id=2, offset=200) /
730 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
731 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
734 # for reasons i'm not going to investigate scapy does not
735 # created the correct headers after decrypt. but reparsing
736 # the ipv6 packet fixes it
737 dc = IPv6(raw(dc[IPv6]))
738 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
740 # two extensions before ESP
741 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
742 IPv6(src=self.tra_if.local_ip6,
743 dst=self.tra_if.remote_ip6) /
744 IPv6ExtHdrHopByHop() /
745 IPv6ExtHdrFragment(id=2, offset=200) /
748 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
749 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
752 dc = IPv6(raw(dc[IPv6]))
753 self.assertTrue(dc[IPv6ExtHdrHopByHop])
754 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
756 # two extensions before ESP, one after
757 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
758 IPv6(src=self.tra_if.local_ip6,
759 dst=self.tra_if.remote_ip6) /
760 IPv6ExtHdrHopByHop() /
761 IPv6ExtHdrFragment(id=2, offset=200) /
762 IPv6ExtHdrDestOpt() /
765 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
766 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
769 dc = IPv6(raw(dc[IPv6]))
770 self.assertTrue(dc[IPv6ExtHdrDestOpt])
771 self.assertTrue(dc[IPv6ExtHdrHopByHop])
772 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
775 class IpsecTra6Tests(IpsecTra6):
776 """ UT test methods for Transport v6 """
777 def test_tra_basic6(self):
778 """ ipsec v6 transport basic test """
779 self.verify_tra_basic6(count=1)
781 def test_tra_burst6(self):
782 """ ipsec v6 transport burst test """
783 self.verify_tra_basic6(count=257)
786 class IpsecTra6ExtTests(IpsecTra6):
787 def test_tra_ext_hdrs_66(self):
788 """ ipsec 6o6 tra extension headers test """
789 self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
792 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
793 """ UT test methods for Transport v6 and v4"""
797 class IpsecTun4(object):
798 """ verify methods for Tunnel v4 """
799 def verify_counters4(self, p, count, n_frags=None, worker=None):
802 if (hasattr(p, "spd_policy_in_any")):
803 pkts = p.spd_policy_in_any.get_stats(worker)['packets']
804 self.assertEqual(pkts, count,
805 "incorrect SPD any policy: expected %d != %d" %
808 if (hasattr(p, "tun_sa_in")):
809 pkts = p.tun_sa_in.get_stats(worker)['packets']
810 self.assertEqual(pkts, count,
811 "incorrect SA in counts: expected %d != %d" %
813 pkts = p.tun_sa_out.get_stats(worker)['packets']
814 self.assertEqual(pkts, n_frags,
815 "incorrect SA out counts: expected %d != %d" %
818 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
819 self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
821 def verify_decrypted(self, p, rxs):
823 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
824 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
825 self.assert_packet_checksums_valid(rx)
827 def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
828 align = sa.crypt_algo.block_size
831 exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
832 exp_len += sa.crypt_algo.iv_size
833 exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
834 self.assertEqual(exp_len, len(esp_payload))
836 def verify_encrypted(self, p, sa, rxs):
840 self.assertEqual(rx[UDP].dport, 4500)
841 self.assert_packet_checksums_valid(rx)
842 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
845 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
846 if not decrypt_pkt.haslayer(IP):
847 decrypt_pkt = IP(decrypt_pkt[Raw].load)
848 if rx_ip.proto == socket.IPPROTO_ESP:
849 self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
850 decrypt_pkts.append(decrypt_pkt)
851 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
852 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
854 self.logger.debug(ppp("Unexpected packet:", rx))
856 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
860 pkts = reassemble4(decrypt_pkts)
862 self.assert_packet_checksums_valid(pkt)
864 def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
865 self.vapi.cli("clear errors")
866 self.vapi.cli("clear ipsec counters")
867 self.vapi.cli("clear ipsec sa")
871 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
872 src=p.remote_tun_if_host,
873 dst=self.pg1.remote_ip4,
875 payload_size=payload_size)
876 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
877 self.verify_decrypted(p, recv_pkts)
879 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
880 dst=p.remote_tun_if_host, count=count,
881 payload_size=payload_size)
882 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
884 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
887 self.assertEqual(rx[IP].src, p.tun_src)
888 self.assertEqual(rx[IP].dst, p.tun_dst)
891 self.logger.info(self.vapi.ppcli("show error"))
892 self.logger.info(self.vapi.ppcli("show ipsec all"))
894 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
895 self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
896 self.verify_counters4(p, count, n_rx)
898 def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
899 self.vapi.cli("clear errors")
903 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
904 src=p.remote_tun_if_host,
905 dst=self.pg1.remote_ip4,
907 self.send_and_assert_no_replies(self.tun_if, send_pkts)
909 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
910 dst=p.remote_tun_if_host, count=count,
911 payload_size=payload_size)
912 self.send_and_assert_no_replies(self.pg1, send_pkts)
915 self.logger.info(self.vapi.ppcli("show error"))
916 self.logger.info(self.vapi.ppcli("show ipsec all"))
918 def verify_tun_reass_44(self, p):
919 self.vapi.cli("clear errors")
920 self.vapi.ip_reassembly_enable_disable(
921 sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
924 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
925 src=p.remote_tun_if_host,
926 dst=self.pg1.remote_ip4,
929 send_pkts = fragment_rfc791(send_pkts[0], 1400)
930 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
932 self.verify_decrypted(p, recv_pkts)
934 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
935 dst=p.remote_tun_if_host, count=1)
936 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
938 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
941 self.logger.info(self.vapi.ppcli("show error"))
942 self.logger.info(self.vapi.ppcli("show ipsec all"))
944 self.verify_counters4(p, 1, 1)
945 self.vapi.ip_reassembly_enable_disable(
946 sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
948 def verify_tun_64(self, p, count=1):
949 self.vapi.cli("clear errors")
950 self.vapi.cli("clear ipsec sa")
952 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
953 src=p.remote_tun_if_host6,
954 dst=self.pg1.remote_ip6,
956 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
957 for recv_pkt in recv_pkts:
958 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
959 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
960 self.assert_packet_checksums_valid(recv_pkt)
961 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
962 dst=p.remote_tun_if_host6, count=count)
963 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
964 for recv_pkt in recv_pkts:
966 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
967 if not decrypt_pkt.haslayer(IPv6):
968 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
969 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
970 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
971 self.assert_packet_checksums_valid(decrypt_pkt)
973 self.logger.error(ppp("Unexpected packet:", recv_pkt))
976 ppp("Decrypted packet:", decrypt_pkt))
981 self.logger.info(self.vapi.ppcli("show error"))
982 self.logger.info(self.vapi.ppcli("show ipsec all"))
984 self.verify_counters4(p, count)
986 def verify_keepalive(self, p):
987 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
988 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
989 UDP(sport=333, dport=4500) /
991 self.send_and_assert_no_replies(self.tun_if, pkt*31)
992 self.assert_error_counter_equal(
993 '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
995 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
996 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
997 UDP(sport=333, dport=4500) /
999 self.send_and_assert_no_replies(self.tun_if, pkt*31)
1000 self.assert_error_counter_equal(
1001 '/err/%s/Too Short' % self.tun4_input_node, 31)
1004 class IpsecTun4Tests(IpsecTun4):
1005 """ UT test methods for Tunnel v4 """
1006 def test_tun_basic44(self):
1007 """ ipsec 4o4 tunnel basic test """
1008 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1009 self.tun_if.admin_down()
1010 self.tun_if.resolve_arp()
1011 self.tun_if.admin_up()
1012 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1014 def test_tun_reass_basic44(self):
1015 """ ipsec 4o4 tunnel basic reassembly test """
1016 self.verify_tun_reass_44(self.params[socket.AF_INET])
1018 def test_tun_burst44(self):
1019 """ ipsec 4o4 tunnel burst test """
1020 self.verify_tun_44(self.params[socket.AF_INET], count=127)
1023 class IpsecTun6(object):
1024 """ verify methods for Tunnel v6 """
1025 def verify_counters6(self, p_in, p_out, count, worker=None):
1026 if (hasattr(p_in, "tun_sa_in")):
1027 pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1028 self.assertEqual(pkts, count,
1029 "incorrect SA in counts: expected %d != %d" %
1031 if (hasattr(p_out, "tun_sa_out")):
1032 pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1033 self.assertEqual(pkts, count,
1034 "incorrect SA out counts: expected %d != %d" %
1036 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1037 self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1039 def verify_decrypted6(self, p, rxs):
1041 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1042 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1043 self.assert_packet_checksums_valid(rx)
1045 def verify_encrypted6(self, p, sa, rxs):
1047 self.assert_packet_checksums_valid(rx)
1048 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1050 self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1051 if p.outer_flow_label:
1052 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1054 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1055 if not decrypt_pkt.haslayer(IPv6):
1056 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1057 self.assert_packet_checksums_valid(decrypt_pkt)
1058 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1059 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1060 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1061 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1063 self.logger.debug(ppp("Unexpected packet:", rx))
1065 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1070 def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1071 self.vapi.cli("clear errors")
1072 self.vapi.cli("clear ipsec sa")
1074 send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1076 src=p_in.remote_tun_if_host,
1077 dst=self.pg1.remote_ip6,
1079 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1080 self.logger.info(self.vapi.cli("sh punt stats"))
1082 def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1083 self.vapi.cli("clear errors")
1084 self.vapi.cli("clear ipsec sa")
1088 send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1090 src=p_in.remote_tun_if_host,
1091 dst=self.pg1.remote_ip6,
1093 payload_size=payload_size)
1094 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1095 self.verify_decrypted6(p_in, recv_pkts)
1097 send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
1098 dst=p_out.remote_tun_if_host,
1100 payload_size=payload_size)
1101 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1102 self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1104 for rx in recv_pkts:
1105 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1106 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1109 self.logger.info(self.vapi.ppcli("show error"))
1110 self.logger.info(self.vapi.ppcli("show ipsec all"))
1111 self.verify_counters6(p_in, p_out, count)
1113 def verify_tun_reass_66(self, p):
1114 self.vapi.cli("clear errors")
1115 self.vapi.ip_reassembly_enable_disable(
1116 sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1119 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1120 src=p.remote_tun_if_host,
1121 dst=self.pg1.remote_ip6,
1124 send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1125 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1127 self.verify_decrypted6(p, recv_pkts)
1129 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1130 dst=p.remote_tun_if_host,
1133 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1135 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1137 self.logger.info(self.vapi.ppcli("show error"))
1138 self.logger.info(self.vapi.ppcli("show ipsec all"))
1139 self.verify_counters6(p, p, 1)
1140 self.vapi.ip_reassembly_enable_disable(
1141 sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1143 def verify_tun_46(self, p, count=1):
1144 """ ipsec 4o6 tunnel basic test """
1145 self.vapi.cli("clear errors")
1146 self.vapi.cli("clear ipsec sa")
1148 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1149 src=p.remote_tun_if_host4,
1150 dst=self.pg1.remote_ip4,
1152 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1153 for recv_pkt in recv_pkts:
1154 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1155 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1156 self.assert_packet_checksums_valid(recv_pkt)
1157 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1158 dst=p.remote_tun_if_host4,
1160 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1161 for recv_pkt in recv_pkts:
1163 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1164 if not decrypt_pkt.haslayer(IP):
1165 decrypt_pkt = IP(decrypt_pkt[Raw].load)
1166 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1167 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1168 self.assert_packet_checksums_valid(decrypt_pkt)
1170 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1172 self.logger.debug(ppp("Decrypted packet:",
1178 self.logger.info(self.vapi.ppcli("show error"))
1179 self.logger.info(self.vapi.ppcli("show ipsec all"))
1180 self.verify_counters6(p, p, count)
1183 class IpsecTun6Tests(IpsecTun6):
1184 """ UT test methods for Tunnel v6 """
1186 def test_tun_basic66(self):
1187 """ ipsec 6o6 tunnel basic test """
1188 self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1190 def test_tun_reass_basic66(self):
1191 """ ipsec 6o6 tunnel basic reassembly test """
1192 self.verify_tun_reass_66(self.params[socket.AF_INET6])
1194 def test_tun_burst66(self):
1195 """ ipsec 6o6 tunnel burst test """
1196 self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1199 class IpsecTun6HandoffTests(IpsecTun6):
1200 """ UT test methods for Tunnel v6 with multiple workers """
1201 worker_config = "workers 2"
1203 def test_tun_handoff_66(self):
1204 """ ipsec 6o6 tunnel worker hand-off test """
1206 p = self.params[socket.AF_INET6]
1208 # inject alternately on worker 0 and 1. all counts on the SA
1209 # should be against worker 0
1210 for worker in [0, 1, 0, 1]:
1211 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1212 src=p.remote_tun_if_host,
1213 dst=self.pg1.remote_ip6,
1215 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1216 self.pg1, worker=worker)
1217 self.verify_decrypted6(p, recv_pkts)
1219 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1220 dst=p.remote_tun_if_host,
1222 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1223 self.tun_if, worker=worker)
1224 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1226 # all counts against the first worker that was used
1227 self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1230 class IpsecTun4HandoffTests(IpsecTun4):
1231 """ UT test methods for Tunnel v4 with multiple workers """
1232 worker_config = "workers 2"
1234 def test_tun_handooff_44(self):
1235 """ ipsec 4o4 tunnel worker hand-off test """
1237 p = self.params[socket.AF_INET]
1239 # inject alternately on worker 0 and 1. all counts on the SA
1240 # should be against worker 0
1241 for worker in [0, 1, 0, 1]:
1242 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1243 src=p.remote_tun_if_host,
1244 dst=self.pg1.remote_ip4,
1246 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1247 self.pg1, worker=worker)
1248 self.verify_decrypted(p, recv_pkts)
1250 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1251 dst=p.remote_tun_if_host,
1253 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1254 self.tun_if, worker=worker)
1255 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1257 # all counts against the first worker that was used
1258 self.verify_counters4(p, 4*N_PKTS, worker=0)
1261 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1262 """ UT test methods for Tunnel v6 & v4 """
1266 if __name__ == '__main__':
1267 unittest.main(testRunner=VppTestRunner)