5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10 IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
18 class IPsecIPv4Params:
20 addr_type = socket.AF_INET
22 addr_bcast = "255.255.255.255"
27 self.remote_tun_if_host = '1.1.1.1'
28 self.remote_tun_if_host6 = '1111::1'
30 self.scapy_tun_sa_id = 100
31 self.scapy_tun_spi = 1000
32 self.vpp_tun_sa_id = 200
33 self.vpp_tun_spi = 2000
35 self.scapy_tra_sa_id = 300
36 self.scapy_tra_spi = 3000
37 self.vpp_tra_sa_id = 400
38 self.vpp_tra_spi = 4000
40 self.outer_hop_limit = 64
41 self.inner_hop_limit = 255
42 self.outer_flow_label = 0
43 self.inner_flow_label = 0x12345
45 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
46 IPSEC_API_INTEG_ALG_SHA1_96)
47 self.auth_algo = 'HMAC-SHA1-96' # scapy name
48 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
50 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
51 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
52 self.crypt_algo = 'AES-CBC' # scapy name
53 self.crypt_key = b'JPjyOWBeVEQiMe7h'
56 self.nat_header = None
57 self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
58 TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
60 self.async_mode = False
63 class IPsecIPv6Params:
65 addr_type = socket.AF_INET6
67 addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
72 self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
73 self.remote_tun_if_host4 = '1.1.1.1'
75 self.scapy_tun_sa_id = 500
76 self.scapy_tun_spi = 3001
77 self.vpp_tun_sa_id = 600
78 self.vpp_tun_spi = 3000
80 self.scapy_tra_sa_id = 700
81 self.scapy_tra_spi = 4001
82 self.vpp_tra_sa_id = 800
83 self.vpp_tra_spi = 4000
85 self.outer_hop_limit = 64
86 self.inner_hop_limit = 255
87 self.outer_flow_label = 0
88 self.inner_flow_label = 0x12345
90 self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
91 IPSEC_API_INTEG_ALG_SHA1_96)
92 self.auth_algo = 'HMAC-SHA1-96' # scapy name
93 self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
95 self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
96 IPSEC_API_CRYPTO_ALG_AES_CBC_128)
97 self.crypt_algo = 'AES-CBC' # scapy name
98 self.crypt_key = b'JPjyOWBeVEQiMe7h'
101 self.nat_header = None
102 self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
103 TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
105 self.async_mode = False
108 def mk_scapy_crypt_key(p):
109 if p.crypt_algo in ("AES-GCM", "AES-CTR"):
110 return p.crypt_key + struct.pack("!I", p.salt)
115 def config_tun_params(p, encryption_type, tun_if):
116 ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
117 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
118 IPSEC_API_SAD_FLAG_USE_ESN))
119 p.tun_dst = tun_if.remote_addr[p.addr_type]
120 p.tun_src = tun_if.local_addr[p.addr_type]
121 crypt_key = mk_scapy_crypt_key(p)
122 p.scapy_tun_sa = SecurityAssociation(
123 encryption_type, spi=p.vpp_tun_spi,
124 crypt_algo=p.crypt_algo,
126 auth_algo=p.auth_algo, auth_key=p.auth_key,
127 tunnel_header=ip_class_by_addr_type[p.addr_type](
130 nat_t_header=p.nat_header,
132 p.vpp_tun_sa = SecurityAssociation(
133 encryption_type, spi=p.scapy_tun_spi,
134 crypt_algo=p.crypt_algo,
136 auth_algo=p.auth_algo, auth_key=p.auth_key,
137 tunnel_header=ip_class_by_addr_type[p.addr_type](
140 nat_t_header=p.nat_header,
144 def config_tra_params(p, encryption_type):
145 esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
146 IPSEC_API_SAD_FLAG_USE_ESN))
147 crypt_key = mk_scapy_crypt_key(p)
148 p.scapy_tra_sa = SecurityAssociation(
151 crypt_algo=p.crypt_algo,
153 auth_algo=p.auth_algo,
155 nat_t_header=p.nat_header,
157 p.vpp_tra_sa = SecurityAssociation(
160 crypt_algo=p.crypt_algo,
162 auth_algo=p.auth_algo,
164 nat_t_header=p.nat_header,
168 class TemplateIpsec(VppTestCase):
173 |tra_if| <-------> |VPP|
178 ------ encrypt --- plain ---
179 |tun_if| <------- |VPP| <------ |pg1|
182 ------ decrypt --- plain ---
183 |tun_if| -------> |VPP| ------> |pg1|
189 def ipsec_select_backend(self):
190 """ empty method to be overloaded when necessary """
195 super(TemplateIpsec, cls).setUpClass()
198 def tearDownClass(cls):
199 super(TemplateIpsec, cls).tearDownClass()
201 def setup_params(self):
202 if not hasattr(self, 'ipv4_params'):
203 self.ipv4_params = IPsecIPv4Params()
204 if not hasattr(self, 'ipv6_params'):
205 self.ipv6_params = IPsecIPv6Params()
206 self.params = {self.ipv4_params.addr_type: self.ipv4_params,
207 self.ipv6_params.addr_type: self.ipv6_params}
209 def config_interfaces(self):
210 self.create_pg_interfaces(range(3))
211 self.interfaces = list(self.pg_interfaces)
212 for i in self.interfaces:
220 super(TemplateIpsec, self).setUp()
224 self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
226 self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
229 self.config_interfaces()
231 self.ipsec_select_backend()
233 def unconfig_interfaces(self):
234 for i in self.interfaces:
240 super(TemplateIpsec, self).tearDown()
242 self.unconfig_interfaces()
244 def show_commands_at_teardown(self):
245 self.logger.info(self.vapi.cli("show hardware"))
247 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
249 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
250 sa.encrypt(IP(src=src, dst=dst) /
251 ICMP() / Raw(b'X' * payload_size))
252 for i in range(count)]
254 def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
256 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
257 sa.encrypt(IPv6(src=src, dst=dst,
258 hlim=p.inner_hop_limit,
259 fl=p.inner_flow_label) /
260 ICMPv6EchoRequest(id=0, seq=1,
261 data='X' * payload_size))
262 for i in range(count)]
264 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
265 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
267 for i in range(count)]
269 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
270 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
271 IPv6(src=src, dst=dst,
272 hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
273 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
274 for i in range(count)]
277 class IpsecTcp(object):
278 def verify_tcp_checksum(self):
279 self.vapi.cli("test http server")
280 p = self.params[socket.AF_INET]
281 send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
282 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
283 dst=self.tun_if.local_ip4) /
284 TCP(flags='S', dport=80)))
285 self.logger.debug(ppp("Sending packet:", send))
286 recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
288 decrypted = p.vpp_tun_sa.decrypt(recv[IP])
289 self.assert_packet_checksums_valid(decrypted)
292 class IpsecTcpTests(IpsecTcp):
293 def test_tcp_checksum(self):
294 """ verify checksum correctness for vpp generated packets """
295 self.verify_tcp_checksum()
298 class IpsecTra4(object):
299 """ verify methods for Transport v4 """
300 def get_replay_counts(self, p):
301 replay_node_name = ('/err/%s/SA replayed packet' %
302 self.tra4_decrypt_node_name[0])
303 count = self.statistics.get_err_counter(replay_node_name)
306 replay_post_node_name = ('/err/%s/SA replayed packet' %
307 self.tra4_decrypt_node_name[p.async_mode])
308 count += self.statistics.get_err_counter(replay_post_node_name)
312 def get_hash_failed_counts(self, p):
313 if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
314 hash_failed_node_name = ('/err/%s/ESP decryption failed' %
315 self.tra4_decrypt_node_name[p.async_mode])
317 hash_failed_node_name = ('/err/%s/Integrity check failed' %
318 self.tra4_decrypt_node_name[p.async_mode])
319 count = self.statistics.get_err_counter(hash_failed_node_name)
322 count += self.statistics.get_err_counter(
323 '/err/crypto-dispatch/bad-hmac')
327 def verify_tra_anti_replay(self):
328 p = self.params[socket.AF_INET]
329 esn_en = p.vpp_tra_sa.esn_en
331 seq_cycle_node_name = \
332 ('/err/%s/sequence number cycled (packet dropped)' %
333 self.tra4_encrypt_node_name)
334 replay_count = self.get_replay_counts(p)
335 hash_failed_count = self.get_hash_failed_counts(p)
336 seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
338 if ESP == self.encryption_type:
339 undersize_node_name = ('/err/%s/undersized packet' %
340 self.tra4_decrypt_node_name[0])
341 undersize_count = self.statistics.get_err_counter(
345 # send packets with seq numbers 1->34
346 # this means the window size is still in Case B (see RFC4303
349 # for reasons i haven't investigated Scapy won't create a packet with
352 pkts = [(Ether(src=self.tra_if.remote_mac,
353 dst=self.tra_if.local_mac) /
354 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
355 dst=self.tra_if.local_ip4) /
358 for seq in range(1, 34)]
359 recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
361 # replayed packets are dropped
362 self.send_and_assert_no_replies(self.tra_if, pkts)
363 replay_count += len(pkts)
364 self.assertEqual(self.get_replay_counts(p), replay_count)
367 # now send a batch of packets all with the same sequence number
368 # the first packet in the batch is legitimate, the rest bogus
370 self.vapi.cli("clear error")
371 self.vapi.cli("clear node counters")
372 pkts = (Ether(src=self.tra_if.remote_mac,
373 dst=self.tra_if.local_mac) /
374 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375 dst=self.tra_if.local_ip4) /
378 recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
381 self.assertEqual(self.get_replay_counts(p), replay_count)
384 # now move the window over to 257 (more than one byte) and into Case A
386 self.vapi.cli("clear error")
387 pkt = (Ether(src=self.tra_if.remote_mac,
388 dst=self.tra_if.local_mac) /
389 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
390 dst=self.tra_if.local_ip4) /
393 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
395 # replayed packets are dropped
396 self.send_and_assert_no_replies(self.tra_if, pkt * 3)
398 self.assertEqual(self.get_replay_counts(p), replay_count)
400 # the window size is 64 packets
401 # in window are still accepted
402 pkt = (Ether(src=self.tra_if.remote_mac,
403 dst=self.tra_if.local_mac) /
404 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
405 dst=self.tra_if.local_ip4) /
408 recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
410 # a packet that does not decrypt does not move the window forward
411 bogus_sa = SecurityAssociation(self.encryption_type,
413 crypt_algo=p.crypt_algo,
414 crypt_key=mk_scapy_crypt_key(p)[::-1],
415 auth_algo=p.auth_algo,
416 auth_key=p.auth_key[::-1])
417 pkt = (Ether(src=self.tra_if.remote_mac,
418 dst=self.tra_if.local_mac) /
419 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
420 dst=self.tra_if.local_ip4) /
423 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
425 hash_failed_count += 17
426 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
428 # a malformed 'runt' packet
429 # created by a mis-constructed SA
430 if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
431 bogus_sa = SecurityAssociation(self.encryption_type,
433 pkt = (Ether(src=self.tra_if.remote_mac,
434 dst=self.tra_if.local_mac) /
435 bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
436 dst=self.tra_if.local_ip4) /
439 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
441 undersize_count += 17
442 self.assert_error_counter_equal(undersize_node_name,
445 # which we can determine since this packet is still in the window
446 pkt = (Ether(src=self.tra_if.remote_mac,
447 dst=self.tra_if.local_mac) /
448 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
449 dst=self.tra_if.local_ip4) /
452 self.send_and_expect(self.tra_if, [pkt], self.tra_if)
455 # out of window are dropped
456 # this is Case B. So VPP will consider this to be a high seq num wrap
457 # and so the decrypt attempt will fail
459 pkt = (Ether(src=self.tra_if.remote_mac,
460 dst=self.tra_if.local_mac) /
461 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
462 dst=self.tra_if.local_ip4) /
465 self.send_and_assert_no_replies(self.tra_if, pkt * 17)
468 # an out of window error with ESN looks like a high sequence
469 # wrap. but since it isn't then the verify will fail.
470 hash_failed_count += 17
471 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
475 self.assertEqual(self.get_replay_counts(p), replay_count)
477 # valid packet moves the window over to 258
478 pkt = (Ether(src=self.tra_if.remote_mac,
479 dst=self.tra_if.local_mac) /
480 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
481 dst=self.tra_if.local_ip4) /
484 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
485 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
488 # move VPP's SA TX seq-num to just before the seq-number wrap.
489 # then fire in a packet that VPP should drop on TX because it
490 # causes the TX seq number to wrap; unless we're using extened sequence
493 self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
494 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
495 self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
497 pkts = [(Ether(src=self.tra_if.remote_mac,
498 dst=self.tra_if.local_mac) /
499 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
500 dst=self.tra_if.local_ip4) /
503 for seq in range(259, 280)]
506 rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
509 # in order for scapy to decrypt its SA's high order number needs
512 p.vpp_tra_sa.seq_num = 0x100000000
514 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
517 # wrap scapy's TX high sequence number. VPP is in case B, so it
518 # will consider this a high seq wrap also.
519 # The low seq num we set it to will place VPP's RX window in Case A
521 p.scapy_tra_sa.seq_num = 0x100000005
522 pkt = (Ether(src=self.tra_if.remote_mac,
523 dst=self.tra_if.local_mac) /
524 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
525 dst=self.tra_if.local_ip4) /
527 seq_num=0x100000005))
528 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
529 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
532 # A packet that has seq num between (2^32-64) and 5 is within
535 p.scapy_tra_sa.seq_num = 0xfffffffd
536 pkt = (Ether(src=self.tra_if.remote_mac,
537 dst=self.tra_if.local_mac) /
538 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
539 dst=self.tra_if.local_ip4) /
542 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
543 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
546 # While in case A we cannot wrap the high sequence number again
547 # becuase VPP will consider this packet to be one that moves the
550 pkt = (Ether(src=self.tra_if.remote_mac,
551 dst=self.tra_if.local_mac) /
552 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
553 dst=self.tra_if.local_ip4) /
555 seq_num=0x200000999))
556 self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
558 hash_failed_count += 1
559 self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
562 # but if we move the wondow forward to case B, then we can wrap
565 p.scapy_tra_sa.seq_num = 0x100000555
566 pkt = (Ether(src=self.tra_if.remote_mac,
567 dst=self.tra_if.local_mac) /
568 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
569 dst=self.tra_if.local_ip4) /
571 seq_num=0x100000555))
572 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
573 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
575 p.scapy_tra_sa.seq_num = 0x200000444
576 pkt = (Ether(src=self.tra_if.remote_mac,
577 dst=self.tra_if.local_mac) /
578 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
579 dst=self.tra_if.local_ip4) /
581 seq_num=0x200000444))
582 rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
583 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
587 # without ESN TX sequence numbers can't wrap and packets are
588 # dropped from here on out.
590 self.send_and_assert_no_replies(self.tra_if, pkts)
591 seq_cycle_count += len(pkts)
592 self.assert_error_counter_equal(seq_cycle_node_name,
595 # move the security-associations seq number on to the last we used
596 self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
597 p.scapy_tra_sa.seq_num = 351
598 p.vpp_tra_sa.seq_num = 351
600 def verify_tra_basic4(self, count=1, payload_size=54):
601 """ ipsec v4 transport basic test """
602 self.vapi.cli("clear errors")
603 self.vapi.cli("clear ipsec sa")
605 p = self.params[socket.AF_INET]
606 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
607 src=self.tra_if.remote_ip4,
608 dst=self.tra_if.local_ip4,
610 payload_size=payload_size)
611 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
614 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
615 self.assert_packet_checksums_valid(rx)
617 decrypted = p.vpp_tra_sa.decrypt(rx[IP])
618 self.assert_packet_checksums_valid(decrypted)
620 self.logger.debug(ppp("Unexpected packet:", rx))
623 self.logger.info(self.vapi.ppcli("show error"))
624 self.logger.info(self.vapi.ppcli("show ipsec all"))
626 pkts = p.tra_sa_in.get_stats()['packets']
627 self.assertEqual(pkts, count,
628 "incorrect SA in counts: expected %d != %d" %
630 pkts = p.tra_sa_out.get_stats()['packets']
631 self.assertEqual(pkts, count,
632 "incorrect SA out counts: expected %d != %d" %
635 self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
636 self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
639 class IpsecTra4Tests(IpsecTra4):
640 """ UT test methods for Transport v4 """
641 def test_tra_anti_replay(self):
642 """ ipsec v4 transport anti-replay test """
643 self.verify_tra_anti_replay()
645 def test_tra_basic(self, count=1):
646 """ ipsec v4 transport basic test """
647 self.verify_tra_basic4(count=1)
649 def test_tra_burst(self):
650 """ ipsec v4 transport burst test """
651 self.verify_tra_basic4(count=257)
654 class IpsecTra6(object):
655 """ verify methods for Transport v6 """
656 def verify_tra_basic6(self, count=1, payload_size=54):
657 self.vapi.cli("clear errors")
658 self.vapi.cli("clear ipsec sa")
660 p = self.params[socket.AF_INET6]
661 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
662 src=self.tra_if.remote_ip6,
663 dst=self.tra_if.local_ip6,
665 payload_size=payload_size)
666 recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
669 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
672 decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
673 self.assert_packet_checksums_valid(decrypted)
675 self.logger.debug(ppp("Unexpected packet:", rx))
678 self.logger.info(self.vapi.ppcli("show error"))
679 self.logger.info(self.vapi.ppcli("show ipsec all"))
681 pkts = p.tra_sa_in.get_stats()['packets']
682 self.assertEqual(pkts, count,
683 "incorrect SA in counts: expected %d != %d" %
685 pkts = p.tra_sa_out.get_stats()['packets']
686 self.assertEqual(pkts, count,
687 "incorrect SA out counts: expected %d != %d" %
689 self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
690 self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
692 def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
694 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
695 sa.encrypt(IPv6(src=src, dst=dst) /
696 ICMPv6EchoRequest(id=0, seq=1,
697 data='X' * payload_size))
698 for i in range(count)]
700 def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
701 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
702 IPv6(src=src, dst=dst) /
703 IPv6ExtHdrHopByHop() /
704 IPv6ExtHdrFragment(id=2, offset=200) /
706 for i in range(count)]
708 def verify_tra_encrypted6(self, p, sa, rxs):
711 self.assert_packet_checksums_valid(rx)
713 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
714 decrypted.append(decrypt_pkt)
715 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
716 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
718 self.logger.debug(ppp("Unexpected packet:", rx))
720 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
726 def verify_tra_66_ext_hdrs(self, p):
730 # check we can decrypt with options
732 tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
733 src=self.tra_if.remote_ip6,
734 dst=self.tra_if.local_ip6,
736 self.send_and_expect(self.tra_if, tx, self.tra_if)
739 # injecting a packet from ourselves to be routed of box is a hack
740 # but it matches an outbout policy, alors je ne regrette rien
743 # one extension before ESP
744 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
745 IPv6(src=self.tra_if.local_ip6,
746 dst=self.tra_if.remote_ip6) /
747 IPv6ExtHdrFragment(id=2, offset=200) /
750 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
751 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
754 # for reasons i'm not going to investigate scapy does not
755 # created the correct headers after decrypt. but reparsing
756 # the ipv6 packet fixes it
757 dc = IPv6(raw(dc[IPv6]))
758 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
760 # two extensions before ESP
761 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
762 IPv6(src=self.tra_if.local_ip6,
763 dst=self.tra_if.remote_ip6) /
764 IPv6ExtHdrHopByHop() /
765 IPv6ExtHdrFragment(id=2, offset=200) /
768 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
769 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
772 dc = IPv6(raw(dc[IPv6]))
773 self.assertTrue(dc[IPv6ExtHdrHopByHop])
774 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
776 # two extensions before ESP, one after
777 tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
778 IPv6(src=self.tra_if.local_ip6,
779 dst=self.tra_if.remote_ip6) /
780 IPv6ExtHdrHopByHop() /
781 IPv6ExtHdrFragment(id=2, offset=200) /
782 IPv6ExtHdrDestOpt() /
785 rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
786 dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
789 dc = IPv6(raw(dc[IPv6]))
790 self.assertTrue(dc[IPv6ExtHdrDestOpt])
791 self.assertTrue(dc[IPv6ExtHdrHopByHop])
792 self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
795 class IpsecTra6Tests(IpsecTra6):
796 """ UT test methods for Transport v6 """
797 def test_tra_basic6(self):
798 """ ipsec v6 transport basic test """
799 self.verify_tra_basic6(count=1)
801 def test_tra_burst6(self):
802 """ ipsec v6 transport burst test """
803 self.verify_tra_basic6(count=257)
806 class IpsecTra6ExtTests(IpsecTra6):
807 def test_tra_ext_hdrs_66(self):
808 """ ipsec 6o6 tra extension headers test """
809 self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
812 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
813 """ UT test methods for Transport v6 and v4"""
817 class IpsecTun4(object):
818 """ verify methods for Tunnel v4 """
819 def verify_counters4(self, p, count, n_frags=None, worker=None):
822 if (hasattr(p, "spd_policy_in_any")):
823 pkts = p.spd_policy_in_any.get_stats(worker)['packets']
824 self.assertEqual(pkts, count,
825 "incorrect SPD any policy: expected %d != %d" %
828 if (hasattr(p, "tun_sa_in")):
829 pkts = p.tun_sa_in.get_stats(worker)['packets']
830 self.assertEqual(pkts, count,
831 "incorrect SA in counts: expected %d != %d" %
833 pkts = p.tun_sa_out.get_stats(worker)['packets']
834 self.assertEqual(pkts, n_frags,
835 "incorrect SA out counts: expected %d != %d" %
838 self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
839 self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
841 def verify_decrypted(self, p, rxs):
843 self.assert_equal(rx[IP].src, p.remote_tun_if_host)
844 self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
845 self.assert_packet_checksums_valid(rx)
847 def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
848 align = sa.crypt_algo.block_size
851 exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
852 exp_len += sa.crypt_algo.iv_size
853 exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
854 self.assertEqual(exp_len, len(esp_payload))
856 def verify_encrypted(self, p, sa, rxs):
860 self.assertEqual(rx[UDP].dport, 4500)
861 self.assert_packet_checksums_valid(rx)
862 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
865 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
866 if not decrypt_pkt.haslayer(IP):
867 decrypt_pkt = IP(decrypt_pkt[Raw].load)
868 if rx_ip.proto == socket.IPPROTO_ESP:
869 self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
870 decrypt_pkts.append(decrypt_pkt)
871 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
872 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
874 self.logger.debug(ppp("Unexpected packet:", rx))
876 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
880 pkts = reassemble4(decrypt_pkts)
882 self.assert_packet_checksums_valid(pkt)
884 def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
885 self.vapi.cli("clear errors")
886 self.vapi.cli("clear ipsec counters")
887 self.vapi.cli("clear ipsec sa")
891 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
892 src=p.remote_tun_if_host,
893 dst=self.pg1.remote_ip4,
895 payload_size=payload_size)
896 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
897 self.verify_decrypted(p, recv_pkts)
899 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
900 dst=p.remote_tun_if_host, count=count,
901 payload_size=payload_size)
902 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
904 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
907 self.assertEqual(rx[IP].src, p.tun_src)
908 self.assertEqual(rx[IP].dst, p.tun_dst)
911 self.logger.info(self.vapi.ppcli("show error"))
912 self.logger.info(self.vapi.ppcli("show ipsec all"))
914 self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
915 self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
916 self.verify_counters4(p, count, n_rx)
918 def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
919 self.vapi.cli("clear errors")
923 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
924 src=p.remote_tun_if_host,
925 dst=self.pg1.remote_ip4,
927 self.send_and_assert_no_replies(self.tun_if, send_pkts)
929 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
930 dst=p.remote_tun_if_host, count=count,
931 payload_size=payload_size)
932 self.send_and_assert_no_replies(self.pg1, send_pkts)
935 self.logger.info(self.vapi.ppcli("show error"))
936 self.logger.info(self.vapi.ppcli("show ipsec all"))
938 def verify_tun_reass_44(self, p):
939 self.vapi.cli("clear errors")
940 self.vapi.ip_reassembly_enable_disable(
941 sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
944 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
945 src=p.remote_tun_if_host,
946 dst=self.pg1.remote_ip4,
949 send_pkts = fragment_rfc791(send_pkts[0], 1400)
950 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
952 self.verify_decrypted(p, recv_pkts)
954 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
955 dst=p.remote_tun_if_host, count=1)
956 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
958 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
961 self.logger.info(self.vapi.ppcli("show error"))
962 self.logger.info(self.vapi.ppcli("show ipsec all"))
964 self.verify_counters4(p, 1, 1)
965 self.vapi.ip_reassembly_enable_disable(
966 sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
968 def verify_tun_64(self, p, count=1):
969 self.vapi.cli("clear errors")
970 self.vapi.cli("clear ipsec sa")
972 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
973 src=p.remote_tun_if_host6,
974 dst=self.pg1.remote_ip6,
976 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
977 for recv_pkt in recv_pkts:
978 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
979 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
980 self.assert_packet_checksums_valid(recv_pkt)
981 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
982 dst=p.remote_tun_if_host6, count=count)
983 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
984 for recv_pkt in recv_pkts:
986 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
987 if not decrypt_pkt.haslayer(IPv6):
988 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
989 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
990 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
991 self.assert_packet_checksums_valid(decrypt_pkt)
993 self.logger.error(ppp("Unexpected packet:", recv_pkt))
996 ppp("Decrypted packet:", decrypt_pkt))
1001 self.logger.info(self.vapi.ppcli("show error"))
1002 self.logger.info(self.vapi.ppcli("show ipsec all"))
1004 self.verify_counters4(p, count)
1006 def verify_keepalive(self, p):
1007 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1008 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1009 UDP(sport=333, dport=4500) /
1011 self.send_and_assert_no_replies(self.tun_if, pkt*31)
1012 self.assert_error_counter_equal(
1013 '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
1015 pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1016 IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1017 UDP(sport=333, dport=4500) /
1019 self.send_and_assert_no_replies(self.tun_if, pkt*31)
1020 self.assert_error_counter_equal(
1021 '/err/%s/Too Short' % self.tun4_input_node, 31)
1024 class IpsecTun4Tests(IpsecTun4):
1025 """ UT test methods for Tunnel v4 """
1026 def test_tun_basic44(self):
1027 """ ipsec 4o4 tunnel basic test """
1028 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1029 self.tun_if.admin_down()
1030 self.tun_if.resolve_arp()
1031 self.tun_if.admin_up()
1032 self.verify_tun_44(self.params[socket.AF_INET], count=1)
1034 def test_tun_reass_basic44(self):
1035 """ ipsec 4o4 tunnel basic reassembly test """
1036 self.verify_tun_reass_44(self.params[socket.AF_INET])
1038 def test_tun_burst44(self):
1039 """ ipsec 4o4 tunnel burst test """
1040 self.verify_tun_44(self.params[socket.AF_INET], count=127)
1043 class IpsecTun6(object):
1044 """ verify methods for Tunnel v6 """
1045 def verify_counters6(self, p_in, p_out, count, worker=None):
1046 if (hasattr(p_in, "tun_sa_in")):
1047 pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1048 self.assertEqual(pkts, count,
1049 "incorrect SA in counts: expected %d != %d" %
1051 if (hasattr(p_out, "tun_sa_out")):
1052 pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1053 self.assertEqual(pkts, count,
1054 "incorrect SA out counts: expected %d != %d" %
1056 self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1057 self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1059 def verify_decrypted6(self, p, rxs):
1061 self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1062 self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1063 self.assert_packet_checksums_valid(rx)
1065 def verify_encrypted6(self, p, sa, rxs):
1067 self.assert_packet_checksums_valid(rx)
1068 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1070 self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1071 if p.outer_flow_label:
1072 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1074 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1075 if not decrypt_pkt.haslayer(IPv6):
1076 decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1077 self.assert_packet_checksums_valid(decrypt_pkt)
1078 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1079 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1080 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1081 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1083 self.logger.debug(ppp("Unexpected packet:", rx))
1085 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1090 def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1091 self.vapi.cli("clear errors")
1092 self.vapi.cli("clear ipsec sa")
1094 send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1096 src=p_in.remote_tun_if_host,
1097 dst=self.pg1.remote_ip6,
1099 self.send_and_assert_no_replies(self.tun_if, send_pkts)
1100 self.logger.info(self.vapi.cli("sh punt stats"))
1102 def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1103 self.vapi.cli("clear errors")
1104 self.vapi.cli("clear ipsec sa")
1108 send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1110 src=p_in.remote_tun_if_host,
1111 dst=self.pg1.remote_ip6,
1113 payload_size=payload_size)
1114 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1115 self.verify_decrypted6(p_in, recv_pkts)
1117 send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
1118 dst=p_out.remote_tun_if_host,
1120 payload_size=payload_size)
1121 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1122 self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1124 for rx in recv_pkts:
1125 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1126 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1129 self.logger.info(self.vapi.ppcli("show error"))
1130 self.logger.info(self.vapi.ppcli("show ipsec all"))
1131 self.verify_counters6(p_in, p_out, count)
1133 def verify_tun_reass_66(self, p):
1134 self.vapi.cli("clear errors")
1135 self.vapi.ip_reassembly_enable_disable(
1136 sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1139 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1140 src=p.remote_tun_if_host,
1141 dst=self.pg1.remote_ip6,
1144 send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1145 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1147 self.verify_decrypted6(p, recv_pkts)
1149 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1150 dst=p.remote_tun_if_host,
1153 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1155 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1157 self.logger.info(self.vapi.ppcli("show error"))
1158 self.logger.info(self.vapi.ppcli("show ipsec all"))
1159 self.verify_counters6(p, p, 1)
1160 self.vapi.ip_reassembly_enable_disable(
1161 sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1163 def verify_tun_46(self, p, count=1):
1164 """ ipsec 4o6 tunnel basic test """
1165 self.vapi.cli("clear errors")
1166 self.vapi.cli("clear ipsec sa")
1168 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1169 src=p.remote_tun_if_host4,
1170 dst=self.pg1.remote_ip4,
1172 recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1173 for recv_pkt in recv_pkts:
1174 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1175 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1176 self.assert_packet_checksums_valid(recv_pkt)
1177 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1178 dst=p.remote_tun_if_host4,
1180 recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1181 for recv_pkt in recv_pkts:
1183 decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1184 if not decrypt_pkt.haslayer(IP):
1185 decrypt_pkt = IP(decrypt_pkt[Raw].load)
1186 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1187 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1188 self.assert_packet_checksums_valid(decrypt_pkt)
1190 self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1192 self.logger.debug(ppp("Decrypted packet:",
1198 self.logger.info(self.vapi.ppcli("show error"))
1199 self.logger.info(self.vapi.ppcli("show ipsec all"))
1200 self.verify_counters6(p, p, count)
1203 class IpsecTun6Tests(IpsecTun6):
1204 """ UT test methods for Tunnel v6 """
1206 def test_tun_basic66(self):
1207 """ ipsec 6o6 tunnel basic test """
1208 self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1210 def test_tun_reass_basic66(self):
1211 """ ipsec 6o6 tunnel basic reassembly test """
1212 self.verify_tun_reass_66(self.params[socket.AF_INET6])
1214 def test_tun_burst66(self):
1215 """ ipsec 6o6 tunnel burst test """
1216 self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1219 class IpsecTun6HandoffTests(IpsecTun6):
1220 """ UT test methods for Tunnel v6 with multiple workers """
1221 vpp_worker_count = 2
1223 def test_tun_handoff_66(self):
1224 """ ipsec 6o6 tunnel worker hand-off test """
1225 self.vapi.cli("clear errors")
1226 self.vapi.cli("clear ipsec sa")
1229 p = self.params[socket.AF_INET6]
1231 # inject alternately on worker 0 and 1. all counts on the SA
1232 # should be against worker 0
1233 for worker in [0, 1, 0, 1]:
1234 send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1235 src=p.remote_tun_if_host,
1236 dst=self.pg1.remote_ip6,
1238 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1239 self.pg1, worker=worker)
1240 self.verify_decrypted6(p, recv_pkts)
1242 send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1243 dst=p.remote_tun_if_host,
1245 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1246 self.tun_if, worker=worker)
1247 self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1249 # all counts against the first worker that was used
1250 self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1253 class IpsecTun4HandoffTests(IpsecTun4):
1254 """ UT test methods for Tunnel v4 with multiple workers """
1255 vpp_worker_count = 2
1257 def test_tun_handooff_44(self):
1258 """ ipsec 4o4 tunnel worker hand-off test """
1259 self.vapi.cli("clear errors")
1260 self.vapi.cli("clear ipsec sa")
1263 p = self.params[socket.AF_INET]
1265 # inject alternately on worker 0 and 1. all counts on the SA
1266 # should be against worker 0
1267 for worker in [0, 1, 0, 1]:
1268 send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1269 src=p.remote_tun_if_host,
1270 dst=self.pg1.remote_ip4,
1272 recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1273 self.pg1, worker=worker)
1274 self.verify_decrypted(p, recv_pkts)
1276 send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1277 dst=p.remote_tun_if_host,
1279 recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1280 self.tun_if, worker=worker)
1281 self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1283 # all counts against the first worker that was used
1284 self.verify_counters4(p, 4*N_PKTS, worker=0)
1287 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1288 """ UT test methods for Tunnel v6 & v4 """
1292 if __name__ == '__main__':
1293 unittest.main(testRunner=VppTestRunner)