5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10 IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
18 class IPsecIPv4Params(object):
20 addr_type = socket.AF_INET
22 addr_bcast = "255.255.255.255"
27 self.remote_tun_if_host = '1.1.1.1'
28 self.remote_tun_if_host6 = '1111::1'
30 self.scapy_tun_sa_id = 10
31 self.scapy_tun_spi = 1001
32 self.vpp_tun_sa_id = 20
33 self.vpp_tun_spi = 1000
35 self.scapy_tra_sa_id = 30
36 self.scapy_tra_spi = 2001
37 self.vpp_tra_sa_id = 40
38 self.vpp_tra_spi = 2000
40 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41 IPSEC_API_INTEG_ALG_SHA1_96)
42 self.auth_algo = 'HMAC-SHA1-96' # scapy name
43 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
45 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47 self.crypt_algo = 'AES-CBC' # scapy name
48 self.crypt_key = b'JPjyOWBeVEQiMe7h'
51 self.nat_header = None
54 class IPsecIPv6Params(object):
56 addr_type = socket.AF_INET6
58 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
63 self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
64 self.remote_tun_if_host4 = '1.1.1.1'
66 self.scapy_tun_sa_id = 50
67 self.scapy_tun_spi = 3001
68 self.vpp_tun_sa_id = 60
69 self.vpp_tun_spi = 3000
71 self.scapy_tra_sa_id = 70
72 self.scapy_tra_spi = 4001
73 self.vpp_tra_sa_id = 80
74 self.vpp_tra_spi = 4000
76 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
77 IPSEC_API_INTEG_ALG_SHA1_96)
78 self.auth_algo = 'HMAC-SHA1-96' # scapy name
79 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
81 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
82 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
83 self.crypt_algo = 'AES-CBC' # scapy name
84 self.crypt_key = b'JPjyOWBeVEQiMe7h'
87 self.nat_header = None
90 def mk_scapy_crypt_key(p):
91 if p.crypt_algo == "AES-GCM":
92 return p.crypt_key + struct.pack("!I", p.salt)
97 def config_tun_params(p, encryption_type, tun_if):
98 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
99 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
100 IPSEC_API_SAD_FLAG_USE_ESN))
101 crypt_key = mk_scapy_crypt_key(p)
102 p.scapy_tun_sa = SecurityAssociation(
103 encryption_type, spi=p.vpp_tun_spi,
104 crypt_algo=p.crypt_algo,
106 auth_algo=p.auth_algo, auth_key=p.auth_key,
107 tunnel_header=ip_class_by_addr_type[p.addr_type](
108 src=tun_if.remote_addr[p.addr_type],
109 dst=tun_if.local_addr[p.addr_type]),
110 nat_t_header=p.nat_header,
112 p.vpp_tun_sa = SecurityAssociation(
113 encryption_type, spi=p.scapy_tun_spi,
114 crypt_algo=p.crypt_algo,
116 auth_algo=p.auth_algo, auth_key=p.auth_key,
117 tunnel_header=ip_class_by_addr_type[p.addr_type](
118 dst=tun_if.remote_addr[p.addr_type],
119 src=tun_if.local_addr[p.addr_type]),
120 nat_t_header=p.nat_header,
124 def config_tra_params(p, encryption_type):
125 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
126 IPSEC_API_SAD_FLAG_USE_ESN))
127 crypt_key = mk_scapy_crypt_key(p)
128 p.scapy_tra_sa = SecurityAssociation(
131 crypt_algo=p.crypt_algo,
133 auth_algo=p.auth_algo,
135 nat_t_header=p.nat_header,
137 p.vpp_tra_sa = SecurityAssociation(
140 crypt_algo=p.crypt_algo,
142 auth_algo=p.auth_algo,
144 nat_t_header=p.nat_header,
148 class TemplateIpsec(VppTestCase):
153 |tra_if| <-------> |VPP|
158 ------ encrypt --- plain ---
159 |tun_if| <------- |VPP| <------ |pg1|
162 ------ decrypt --- plain ---
163 |tun_if| -------> |VPP| ------> |pg1|
169 def ipsec_select_backend(self):
170 """ empty method to be overloaded when necessary """
175 super(TemplateIpsec, cls).setUpClass()
178 def tearDownClass(cls):
179 super(TemplateIpsec, cls).tearDownClass()
181 def setup_params(self):
182 self.ipv4_params = IPsecIPv4Params()
183 self.ipv6_params = IPsecIPv6Params()
184 self.params = {self.ipv4_params.addr_type: self.ipv4_params,
185 self.ipv6_params.addr_type: self.ipv6_params}
187 def config_interfaces(self):
188 self.create_pg_interfaces(range(3))
189 self.interfaces = list(self.pg_interfaces)
190 for i in self.interfaces:
198 super(TemplateIpsec, self).setUp()
202 self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
204 self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
207 self.config_interfaces()
209 self.ipsec_select_backend()
211 def unconfig_interfaces(self):
212 for i in self.interfaces:
218 super(TemplateIpsec, self).tearDown()
220 self.unconfig_interfaces()
222 def show_commands_at_teardown(self):
223 self.logger.info(self.vapi.cli("show hardware"))
225 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
227 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
228 sa.encrypt(IP(src=src, dst=dst) /
229 ICMP() / Raw(b'X' * payload_size))
230 for i in range(count)]
232 def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
234 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
235 sa.encrypt(IPv6(src=src, dst=dst) /
236 ICMPv6EchoRequest(id=0, seq=1,
237 data='X' * payload_size))
238 for i in range(count)]
240 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
241 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
242 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
243 for i in range(count)]
245 def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
246 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
247 IPv6(src=src, dst=dst) /
248 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
249 for i in range(count)]
252 class IpsecTcp(object):
253 def verify_tcp_checksum(self):
254 self.vapi.cli("test http server")
255 p = self.params[socket.AF_INET]
256 send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
257 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
258 dst=self.tun_if.local_ip4) /
259 TCP(flags='S', dport=80)))
260 self.logger.debug(ppp("Sending packet:", send))
261 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
263 decrypted = p.vpp_tun_sa.decrypt(recv[IP])
264 self.assert_packet_checksums_valid(decrypted)
267 class IpsecTcpTests(IpsecTcp):
268 def test_tcp_checksum(self):
269 """ verify checksum correctness for vpp generated packets """
270 self.verify_tcp_checksum()
273 class IpsecTra4(object):
274 """ verify methods for Transport v4 """
275 def verify_tra_anti_replay(self):
276 p = self.params[socket.AF_INET]
277 esn_en = p.vpp_tra_sa.esn_en
279 seq_cycle_node_name = ('/err/%s/sequence number cycled' %
280 self.tra4_encrypt_node_name)
281 replay_node_name = ('/err/%s/SA replayed packet' %
282 self.tra4_decrypt_node_name)
283 if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
284 hash_failed_node_name = ('/err/%s/ESP decryption failed' %
285 self.tra4_decrypt_node_name)
287 hash_failed_node_name = ('/err/%s/Integrity check failed' %
288 self.tra4_decrypt_node_name)
289 replay_count = self.statistics.get_err_counter(replay_node_name)
290 hash_failed_count = self.statistics.get_err_counter(
291 hash_failed_node_name)
292 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
294 if ESP == self.encryption_type:
295 undersize_node_name = ('/err/%s/undersized packet' %
296 self.tra4_decrypt_node_name)
297 undersize_count = self.statistics.get_err_counter(
301 # send packets with seq numbers 1->34
302 # this means the window size is still in Case B (see RFC4303
305 # for reasons i haven't investigated Scapy won't create a packet with
308 pkts = [(Ether(src=self.tra_if.remote_mac,
309 dst=self.tra_if.local_mac) /
310 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
311 dst=self.tra_if.local_ip4) /
314 for seq in range(1, 34)]
315 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
317 # replayed packets are dropped
318 self.send_and_assert_no_replies(self.tra_if, pkts)
319 replay_count += len(pkts)
320 self.assert_error_counter_equal(replay_node_name, replay_count)
323 # now send a batch of packets all with the same sequence number
324 # the first packet in the batch is legitimate, the rest bogus
326 pkts = (Ether(src=self.tra_if.remote_mac,
327 dst=self.tra_if.local_mac) /
328 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
329 dst=self.tra_if.local_ip4) /
332 recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
335 self.assert_error_counter_equal(replay_node_name, replay_count)
338 # now move the window over to 257 (more than one byte) and into Case A
340 pkt = (Ether(src=self.tra_if.remote_mac,
341 dst=self.tra_if.local_mac) /
342 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
343 dst=self.tra_if.local_ip4) /
346 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
348 # replayed packets are dropped
349 self.send_and_assert_no_replies(self.tra_if, pkt * 3)
351 self.assert_error_counter_equal(replay_node_name, replay_count)
353 # the window size is 64 packets
354 # in window are still accepted
355 pkt = (Ether(src=self.tra_if.remote_mac,
356 dst=self.tra_if.local_mac) /
357 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
358 dst=self.tra_if.local_ip4) /
361 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
363 # a packet that does not decrypt does not move the window forward
364 bogus_sa = SecurityAssociation(self.encryption_type,
366 crypt_algo=p.crypt_algo,
367 crypt_key=mk_scapy_crypt_key(p)[::-1],
368 auth_algo=p.auth_algo,
369 auth_key=p.auth_key[::-1])
370 pkt = (Ether(src=self.tra_if.remote_mac,
371 dst=self.tra_if.local_mac) /
372 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
373 dst=self.tra_if.local_ip4) /
376 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
378 hash_failed_count += 17
379 self.assert_error_counter_equal(hash_failed_node_name,
382 # a malformed 'runt' packet
383 # created by a mis-constructed SA
384 if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
385 bogus_sa = SecurityAssociation(self.encryption_type,
387 pkt = (Ether(src=self.tra_if.remote_mac,
388 dst=self.tra_if.local_mac) /
389 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
390 dst=self.tra_if.local_ip4) /
393 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
395 undersize_count += 17
396 self.assert_error_counter_equal(undersize_node_name,
399 # which we can determine since this packet is still in the window
400 pkt = (Ether(src=self.tra_if.remote_mac,
401 dst=self.tra_if.local_mac) /
402 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
403 dst=self.tra_if.local_ip4) /
406 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
409 # out of window are dropped
410 # this is Case B. So VPP will consider this to be a high seq num wrap
411 # and so the decrypt attempt will fail
413 pkt = (Ether(src=self.tra_if.remote_mac,
414 dst=self.tra_if.local_mac) /
415 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
416 dst=self.tra_if.local_ip4) /
419 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
422 # an out of window error with ESN looks like a high sequence
423 # wrap. but since it isn't then the verify will fail.
424 hash_failed_count += 17
425 self.assert_error_counter_equal(hash_failed_node_name,
430 self.assert_error_counter_equal(replay_node_name,
433 # valid packet moves the window over to 258
434 pkt = (Ether(src=self.tra_if.remote_mac,
435 dst=self.tra_if.local_mac) /
436 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
437 dst=self.tra_if.local_ip4) /
440 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
441 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
444 # move VPP's SA TX seq-num to just before the seq-number wrap.
445 # then fire in a packet that VPP should drop on TX because it
446 # causes the TX seq number to wrap; unless we're using extened sequence
449 self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
450 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
451 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
453 pkts = [(Ether(src=self.tra_if.remote_mac,
454 dst=self.tra_if.local_mac) /
455 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
456 dst=self.tra_if.local_ip4) /
459 for seq in range(259, 280)]
462 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
465 # in order for scapy to decrypt its SA's high order number needs
468 p.vpp_tra_sa.seq_num = 0x100000000
470 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
473 # wrap scapy's TX high sequence number. VPP is in case B, so it
474 # will consider this a high seq wrap also.
475 # The low seq num we set it to will place VPP's RX window in Case A
477 p.scapy_tra_sa.seq_num = 0x100000005
478 pkt = (Ether(src=self.tra_if.remote_mac,
479 dst=self.tra_if.local_mac) /
480 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
481 dst=self.tra_if.local_ip4) /
483 seq_num=0x100000005))
484 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
485 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
488 # A packet that has seq num between (2^32-64) and 5 is within
491 p.scapy_tra_sa.seq_num = 0xfffffffd
492 pkt = (Ether(src=self.tra_if.remote_mac,
493 dst=self.tra_if.local_mac) /
494 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
495 dst=self.tra_if.local_ip4) /
498 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
499 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
502 # While in case A we cannot wrap the high sequence number again
503 # becuase VPP will consider this packet to be one that moves the
506 pkt = (Ether(src=self.tra_if.remote_mac,
507 dst=self.tra_if.local_mac) /
508 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
509 dst=self.tra_if.local_ip4) /
511 seq_num=0x200000999))
512 self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
514 hash_failed_count += 1
515 self.assert_error_counter_equal(hash_failed_node_name,
519 # but if we move the wondow forward to case B, then we can wrap
522 p.scapy_tra_sa.seq_num = 0x100000555
523 pkt = (Ether(src=self.tra_if.remote_mac,
524 dst=self.tra_if.local_mac) /
525 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
526 dst=self.tra_if.local_ip4) /
528 seq_num=0x100000555))
529 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
530 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
532 p.scapy_tra_sa.seq_num = 0x200000444
533 pkt = (Ether(src=self.tra_if.remote_mac,
534 dst=self.tra_if.local_mac) /
535 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
536 dst=self.tra_if.local_ip4) /
538 seq_num=0x200000444))
539 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
540 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
544 # without ESN TX sequence numbers can't wrap and packets are
545 # dropped from here on out.
547 self.send_and_assert_no_replies(self.tra_if, pkts)
548 seq_cycle_count += len(pkts)
549 self.assert_error_counter_equal(seq_cycle_node_name,
552 # move the security-associations seq number on to the last we used
553 self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
554 p.scapy_tra_sa.seq_num = 351
555 p.vpp_tra_sa.seq_num = 351
557 def verify_tra_basic4(self, count=1):
558 """ ipsec v4 transport basic test """
559 self.vapi.cli("clear errors")
560 self.vapi.cli("clear ipsec sa")
562 p = self.params[socket.AF_INET]
563 send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
564 src=self.tra_if.remote_ip4,
565 dst=self.tra_if.local_ip4,
567 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
570 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
571 self.assert_packet_checksums_valid(rx)
573 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
574 self.assert_packet_checksums_valid(decrypted)
576 self.logger.debug(ppp("Unexpected packet:", rx))
579 self.logger.info(self.vapi.ppcli("show error"))
580 self.logger.info(self.vapi.ppcli("show ipsec all"))
582 pkts = p.tra_sa_in.get_stats()['packets']
583 self.assertEqual(pkts, count,
584 "incorrect SA in counts: expected %d != %d" %
586 pkts = p.tra_sa_out.get_stats()['packets']
587 self.assertEqual(pkts, count,
588 "incorrect SA out counts: expected %d != %d" %
591 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
592 self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
595 class IpsecTra4Tests(IpsecTra4):
596 """ UT test methods for Transport v4 """
597 def test_tra_anti_replay(self):
598 """ ipsec v4 transport anti-reply test """
599 self.verify_tra_anti_replay()
601 def test_tra_basic(self, count=1):
602 """ ipsec v4 transport basic test """
603 self.verify_tra_basic4(count=1)
605 def test_tra_burst(self):
606 """ ipsec v4 transport burst test """
607 self.verify_tra_basic4(count=257)
610 class IpsecTra6(object):
611 """ verify methods for Transport v6 """
612 def verify_tra_basic6(self, count=1):
613 self.vapi.cli("clear errors")
615 p = self.params[socket.AF_INET6]
616 send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
617 src=self.tra_if.remote_ip6,
618 dst=self.tra_if.local_ip6,
620 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
623 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
626 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
627 self.assert_packet_checksums_valid(decrypted)
629 self.logger.debug(ppp("Unexpected packet:", rx))
632 self.logger.info(self.vapi.ppcli("show error"))
633 self.logger.info(self.vapi.ppcli("show ipsec all"))
635 pkts = p.tra_sa_in.get_stats()['packets']
636 self.assertEqual(pkts, count,
637 "incorrect SA in counts: expected %d != %d" %
639 pkts = p.tra_sa_out.get_stats()['packets']
640 self.assertEqual(pkts, count,
641 "incorrect SA out counts: expected %d != %d" %
643 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
644 self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
646 def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
648 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649 sa.encrypt(IPv6(src=src, dst=dst) /
650 ICMPv6EchoRequest(id=0, seq=1,
651 data='X' * payload_size))
652 for i in range(count)]
654 def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
655 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
656 IPv6(src=src, dst=dst) /
657 IPv6ExtHdrHopByHop() /
658 IPv6ExtHdrFragment(id=2, offset=200) /
660 for i in range(count)]
662 def verify_tra_encrypted6(self, p, sa, rxs):
665 self.assert_packet_checksums_valid(rx)
667 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
668 decrypted.append(decrypt_pkt)
669 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
670 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
672 self.logger.debug(ppp("Unexpected packet:", rx))
674 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
680 def verify_tra_66_ext_hdrs(self, p):
684 # check we can decrypt with options
686 tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
687 src=self.tra_if.remote_ip6,
688 dst=self.tra_if.local_ip6,
690 self.send_and_expect(self.tra_if, tx, self.tra_if)
693 # injecting a packet from ourselves to be routed of box is a hack
694 # but it matches an outbout policy, alors je ne regrette rien
697 # one extension before ESP
698 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
699 IPv6(src=self.tra_if.local_ip6,
700 dst=self.tra_if.remote_ip6) /
701 IPv6ExtHdrFragment(id=2, offset=200) /
704 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
705 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
708 # for reasons i'm not going to investigate scapy does not
709 # created the correct headers after decrypt. but reparsing
710 # the ipv6 packet fixes it
711 dc = IPv6(raw(dc[IPv6]))
712 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
714 # two extensions before ESP
715 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
716 IPv6(src=self.tra_if.local_ip6,
717 dst=self.tra_if.remote_ip6) /
718 IPv6ExtHdrHopByHop() /
719 IPv6ExtHdrFragment(id=2, offset=200) /
722 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
723 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
726 dc = IPv6(raw(dc[IPv6]))
727 self.assertTrue(dc[IPv6ExtHdrHopByHop])
728 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
730 # two extensions before ESP, one after
731 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
732 IPv6(src=self.tra_if.local_ip6,
733 dst=self.tra_if.remote_ip6) /
734 IPv6ExtHdrHopByHop() /
735 IPv6ExtHdrFragment(id=2, offset=200) /
736 IPv6ExtHdrDestOpt() /
739 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
740 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
743 dc = IPv6(raw(dc[IPv6]))
744 self.assertTrue(dc[IPv6ExtHdrDestOpt])
745 self.assertTrue(dc[IPv6ExtHdrHopByHop])
746 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
749 class IpsecTra6Tests(IpsecTra6):
750 """ UT test methods for Transport v6 """
751 def test_tra_basic6(self):
752 """ ipsec v6 transport basic test """
753 self.verify_tra_basic6(count=1)
755 def test_tra_burst6(self):
756 """ ipsec v6 transport burst test """
757 self.verify_tra_basic6(count=257)
760 class IpsecTra6ExtTests(IpsecTra6):
761 def test_tra_ext_hdrs_66(self):
762 """ ipsec 6o6 tra extension headers test """
763 self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
766 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
767 """ UT test methods for Transport v6 and v4"""
771 class IpsecTun4(object):
772 """ verify methods for Tunnel v4 """
773 def verify_counters4(self, p, count, n_frags=None, worker=None):
776 if (hasattr(p, "spd_policy_in_any")):
777 pkts = p.spd_policy_in_any.get_stats(worker)['packets']
778 self.assertEqual(pkts, count,
779 "incorrect SPD any policy: expected %d != %d" %
782 if (hasattr(p, "tun_sa_in")):
783 pkts = p.tun_sa_in.get_stats(worker)['packets']
784 self.assertEqual(pkts, count,
785 "incorrect SA in counts: expected %d != %d" %
787 pkts = p.tun_sa_out.get_stats(worker)['packets']
788 self.assertEqual(pkts, count,
789 "incorrect SA out counts: expected %d != %d" %
792 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
793 self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
795 def verify_decrypted(self, p, rxs):
797 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
798 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
799 self.assert_packet_checksums_valid(rx)
801 def verify_encrypted(self, p, sa, rxs):
805 self.assertEqual(rx[UDP].dport, 4500)
806 self.assert_packet_checksums_valid(rx)
807 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
809 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
810 if not decrypt_pkt.haslayer(IP):
811 decrypt_pkt = IP(decrypt_pkt[Raw].load)
812 decrypt_pkts.append(decrypt_pkt)
813 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
814 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
816 self.logger.debug(ppp("Unexpected packet:", rx))
818 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
822 pkts = reassemble4(decrypt_pkts)
824 self.assert_packet_checksums_valid(pkt)
826 def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
827 self.vapi.cli("clear errors")
828 self.vapi.cli("clear ipsec counters")
832 send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
833 src=p.remote_tun_if_host,
834 dst=self.pg1.remote_ip4,
836 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
837 self.verify_decrypted(p, recv_pkts)
839 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
840 dst=p.remote_tun_if_host, count=count,
841 payload_size=payload_size)
842 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
844 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
847 self.logger.info(self.vapi.ppcli("show error"))
848 self.logger.info(self.vapi.ppcli("show ipsec all"))
850 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
851 self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
852 self.verify_counters4(p, count, n_rx)
854 """ verify methods for Transport v4 """
855 def verify_tun_44_bad_packet_sizes(self, p):
856 # with a buffer size of 2048, 1989 bytes of payload
857 # means there isn't space to insert the ESP header
859 for p_siz in [1989, 8500]:
860 send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
861 src=p.remote_tun_if_host,
862 dst=self.pg1.remote_ip4,
865 self.send_and_assert_no_replies(self.tun_if, send_pkts)
866 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
867 dst=p.remote_tun_if_host, count=N_PKTS,
869 self.send_and_assert_no_replies(self.pg1, send_pkts,
872 # both large packets on decrpyt count against chained buffers
873 # the 9000 bytes one does on encrypt
874 self.assertEqual(2 * N_PKTS,
875 self.statistics.get_err_counter(
876 '/err/%s/chained buffers (packet dropped)' %
877 self.tun4_decrypt_node_name))
878 self.assertEqual(N_PKTS,
879 self.statistics.get_err_counter(
880 '/err/%s/chained buffers (packet dropped)' %
881 self.tun4_encrypt_node_name))
883 # on encrypt the 1989 size is no trailer space
884 self.assertEqual(N_PKTS,
885 self.statistics.get_err_counter(
886 '/err/%s/no trailer space (packet dropped)' %
887 self.tun4_encrypt_node_name))
889 def verify_tun_reass_44(self, p):
890 self.vapi.cli("clear errors")
891 self.vapi.ip_reassembly_enable_disable(
892 sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
895 send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
896 src=p.remote_tun_if_host,
897 dst=self.pg1.remote_ip4,
900 send_pkts = fragment_rfc791(send_pkts[0], 1400)
901 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
903 self.verify_decrypted(p, recv_pkts)
905 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
906 dst=p.remote_tun_if_host, count=1)
907 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
909 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
912 self.logger.info(self.vapi.ppcli("show error"))
913 self.logger.info(self.vapi.ppcli("show ipsec all"))
915 self.verify_counters4(p, 1, 1)
916 self.vapi.ip_reassembly_enable_disable(
917 sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
919 def verify_tun_64(self, p, count=1):
920 self.vapi.cli("clear errors")
922 send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
923 src=p.remote_tun_if_host6,
924 dst=self.pg1.remote_ip6,
926 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
927 for recv_pkt in recv_pkts:
928 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
929 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
930 self.assert_packet_checksums_valid(recv_pkt)
931 send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
932 dst=p.remote_tun_if_host6, count=count)
933 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
934 for recv_pkt in recv_pkts:
936 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
937 if not decrypt_pkt.haslayer(IPv6):
938 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
939 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
940 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
941 self.assert_packet_checksums_valid(decrypt_pkt)
943 self.logger.error(ppp("Unexpected packet:", recv_pkt))
946 ppp("Decrypted packet:", decrypt_pkt))
951 self.logger.info(self.vapi.ppcli("show error"))
952 self.logger.info(self.vapi.ppcli("show ipsec all"))
954 self.verify_counters4(p, count)
956 def verify_keepalive(self, p):
957 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
958 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
959 UDP(sport=333, dport=4500) /
961 self.send_and_assert_no_replies(self.tun_if, pkt*31)
962 self.assert_error_counter_equal(
963 '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
965 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
966 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
967 UDP(sport=333, dport=4500) /
969 self.send_and_assert_no_replies(self.tun_if, pkt*31)
970 self.assert_error_counter_equal(
971 '/err/%s/Too Short' % self.tun4_input_node, 31)
974 class IpsecTun4Tests(IpsecTun4):
975 """ UT test methods for Tunnel v4 """
976 def test_tun_basic44(self):
977 """ ipsec 4o4 tunnel basic test """
978 self.verify_tun_44(self.params[socket.AF_INET], count=1)
979 self.tun_if.admin_down()
980 self.tun_if.resolve_arp()
981 self.tun_if.admin_up()
982 self.verify_tun_44(self.params[socket.AF_INET], count=1)
984 def test_tun_reass_basic44(self):
985 """ ipsec 4o4 tunnel basic reassembly test """
986 self.verify_tun_reass_44(self.params[socket.AF_INET])
988 def test_tun_burst44(self):
989 """ ipsec 4o4 tunnel burst test """
990 self.verify_tun_44(self.params[socket.AF_INET], count=127)
993 class IpsecTunEsp4Tests(IpsecTun4):
994 def test_tun_bad_packet_sizes(self):
995 """ ipsec v4 tunnel bad packet size """
996 self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
999 class IpsecTun6(object):
1000 """ verify methods for Tunnel v6 """
1001 def verify_counters6(self, p_in, p_out, count, worker=None):
1002 if (hasattr(p_in, "tun_sa_in")):
1003 pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1004 self.assertEqual(pkts, count,
1005 "incorrect SA in counts: expected %d != %d" %
1007 if (hasattr(p_out, "tun_sa_out")):
1008 pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1009 self.assertEqual(pkts, count,
1010 "incorrect SA out counts: expected %d != %d" %
1012 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1013 self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1015 def verify_decrypted6(self, p, rxs):
1017 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1018 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1019 self.assert_packet_checksums_valid(rx)
1021 def verify_encrypted6(self, p, sa, rxs):
1023 self.assert_packet_checksums_valid(rx)
1024 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1027 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1028 if not decrypt_pkt.haslayer(IPv6):
1029 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1030 self.assert_packet_checksums_valid(decrypt_pkt)
1031 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1032 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1034 self.logger.debug(ppp("Unexpected packet:", rx))
1036 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1041 def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1042 self.vapi.cli("clear errors")
1043 self.vapi.cli("clear ipsec sa")
1045 send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1046 src=p_in.remote_tun_if_host,
1047 dst=self.pg1.remote_ip6,
1049 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1050 self.logger.info(self.vapi.cli("sh punt stats"))
1052 def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1053 self.vapi.cli("clear errors")
1054 self.vapi.cli("clear ipsec sa")
1058 send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1059 src=p_in.remote_tun_if_host,
1060 dst=self.pg1.remote_ip6,
1062 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1063 self.verify_decrypted6(p_in, recv_pkts)
1065 send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1066 dst=p_out.remote_tun_if_host,
1068 payload_size=payload_size)
1069 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1070 self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1073 self.logger.info(self.vapi.ppcli("show error"))
1074 self.logger.info(self.vapi.ppcli("show ipsec all"))
1075 self.verify_counters6(p_in, p_out, count)
1077 def verify_tun_reass_66(self, p):
1078 self.vapi.cli("clear errors")
1079 self.vapi.ip_reassembly_enable_disable(
1080 sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1083 send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1084 src=p.remote_tun_if_host,
1085 dst=self.pg1.remote_ip6,
1088 send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1089 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1091 self.verify_decrypted6(p, recv_pkts)
1093 send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1094 dst=p.remote_tun_if_host,
1097 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1099 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1101 self.logger.info(self.vapi.ppcli("show error"))
1102 self.logger.info(self.vapi.ppcli("show ipsec all"))
1103 self.verify_counters6(p, p, 1)
1104 self.vapi.ip_reassembly_enable_disable(
1105 sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1107 def verify_tun_46(self, p, count=1):
1108 """ ipsec 4o6 tunnel basic test """
1109 self.vapi.cli("clear errors")
1111 send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1112 src=p.remote_tun_if_host4,
1113 dst=self.pg1.remote_ip4,
1115 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1116 for recv_pkt in recv_pkts:
1117 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1118 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1119 self.assert_packet_checksums_valid(recv_pkt)
1120 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1121 dst=p.remote_tun_if_host4,
1123 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1124 for recv_pkt in recv_pkts:
1126 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1127 if not decrypt_pkt.haslayer(IP):
1128 decrypt_pkt = IP(decrypt_pkt[Raw].load)
1129 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1130 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1131 self.assert_packet_checksums_valid(decrypt_pkt)
1133 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1135 self.logger.debug(ppp("Decrypted packet:",
1141 self.logger.info(self.vapi.ppcli("show error"))
1142 self.logger.info(self.vapi.ppcli("show ipsec all"))
1143 self.verify_counters6(p, p, count)
1146 class IpsecTun6Tests(IpsecTun6):
1147 """ UT test methods for Tunnel v6 """
1149 def test_tun_basic66(self):
1150 """ ipsec 6o6 tunnel basic test """
1151 self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1153 def test_tun_reass_basic66(self):
1154 """ ipsec 6o6 tunnel basic reassembly test """
1155 self.verify_tun_reass_66(self.params[socket.AF_INET6])
1157 def test_tun_burst66(self):
1158 """ ipsec 6o6 tunnel burst test """
1159 self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1162 class IpsecTun6HandoffTests(IpsecTun6):
1163 """ UT test methods for Tunnel v6 with multiple workers """
1164 worker_config = "workers 2"
1166 def test_tun_handoff_66(self):
1167 """ ipsec 6o6 tunnel worker hand-off test """
1169 p = self.params[socket.AF_INET6]
1171 # inject alternately on worker 0 and 1. all counts on the SA
1172 # should be against worker 0
1173 for worker in [0, 1, 0, 1]:
1174 send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1175 src=p.remote_tun_if_host,
1176 dst=self.pg1.remote_ip6,
1178 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1179 self.pg1, worker=worker)
1180 self.verify_decrypted6(p, recv_pkts)
1182 send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1183 dst=p.remote_tun_if_host,
1185 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1186 self.tun_if, worker=worker)
1187 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1189 # all counts against the first worker that was used
1190 self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1193 class IpsecTun4HandoffTests(IpsecTun4):
1194 """ UT test methods for Tunnel v4 with multiple workers """
1195 worker_config = "workers 2"
1197 def test_tun_handooff_44(self):
1198 """ ipsec 4o4 tunnel worker hand-off test """
1200 p = self.params[socket.AF_INET]
1202 # inject alternately on worker 0 and 1. all counts on the SA
1203 # should be against worker 0
1204 for worker in [0, 1, 0, 1]:
1205 send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1206 src=p.remote_tun_if_host,
1207 dst=self.pg1.remote_ip4,
1209 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1210 self.pg1, worker=worker)
1211 self.verify_decrypted(p, recv_pkts)
1213 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1214 dst=p.remote_tun_if_host,
1216 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1217 self.tun_if, worker=worker)
1218 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1220 # all counts against the first worker that was used
1221 self.verify_counters4(p, 4*N_PKTS, worker=0)
1224 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1225 """ UT test methods for Tunnel v6 & v4 """
1229 if __name__ == '__main__':
1230 unittest.main(testRunner=VppTestRunner)