ikev2: support responder hostname
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params:
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 100
31         self.scapy_tun_spi = 1000
32         self.vpp_tun_sa_id = 200
33         self.vpp_tun_spi = 2000
34
35         self.scapy_tra_sa_id = 300
36         self.scapy_tra_spi = 3000
37         self.vpp_tra_sa_id = 400
38         self.vpp_tra_spi = 4000
39
40         self.outer_hop_limit = 64
41         self.inner_hop_limit = 255
42         self.outer_flow_label = 0
43         self.inner_flow_label = 0x12345
44
45         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
46                                  IPSEC_API_INTEG_ALG_SHA1_96)
47         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
48         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
49
50         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
51                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
52         self.crypt_algo = 'AES-CBC'  # scapy name
53         self.crypt_key = b'JPjyOWBeVEQiMe7h'
54         self.salt = 0
55         self.flags = 0
56         self.nat_header = None
57         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
58                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
59         self.dscp = 0
60         self.async_mode = False
61
62
63 class IPsecIPv6Params:
64
65     addr_type = socket.AF_INET6
66     addr_any = "0::0"
67     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
68     addr_len = 128
69     is_ipv6 = 1
70
71     def __init__(self):
72         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
73         self.remote_tun_if_host4 = '1.1.1.1'
74
75         self.scapy_tun_sa_id = 500
76         self.scapy_tun_spi = 3001
77         self.vpp_tun_sa_id = 600
78         self.vpp_tun_spi = 3000
79
80         self.scapy_tra_sa_id = 700
81         self.scapy_tra_spi = 4001
82         self.vpp_tra_sa_id = 800
83         self.vpp_tra_spi = 4000
84
85         self.outer_hop_limit = 64
86         self.inner_hop_limit = 255
87         self.outer_flow_label = 0
88         self.inner_flow_label = 0x12345
89
90         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
91                                  IPSEC_API_INTEG_ALG_SHA1_96)
92         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
93         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
94
95         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
96                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
97         self.crypt_algo = 'AES-CBC'  # scapy name
98         self.crypt_key = b'JPjyOWBeVEQiMe7h'
99         self.salt = 0
100         self.flags = 0
101         self.nat_header = None
102         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
103                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
104         self.dscp = 0
105         self.async_mode = False
106
107
108 def mk_scapy_crypt_key(p):
109     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
110         return p.crypt_key + struct.pack("!I", p.salt)
111     else:
112         return p.crypt_key
113
114
115 def config_tun_params(p, encryption_type, tun_if):
116     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
117     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
118                              IPSEC_API_SAD_FLAG_USE_ESN))
119     p.tun_dst = tun_if.remote_addr[p.addr_type]
120     p.tun_src = tun_if.local_addr[p.addr_type]
121     crypt_key = mk_scapy_crypt_key(p)
122     p.scapy_tun_sa = SecurityAssociation(
123         encryption_type, spi=p.vpp_tun_spi,
124         crypt_algo=p.crypt_algo,
125         crypt_key=crypt_key,
126         auth_algo=p.auth_algo, auth_key=p.auth_key,
127         tunnel_header=ip_class_by_addr_type[p.addr_type](
128             src=p.tun_dst,
129             dst=p.tun_src),
130         nat_t_header=p.nat_header,
131         esn_en=esn_en)
132     p.vpp_tun_sa = SecurityAssociation(
133         encryption_type, spi=p.scapy_tun_spi,
134         crypt_algo=p.crypt_algo,
135         crypt_key=crypt_key,
136         auth_algo=p.auth_algo, auth_key=p.auth_key,
137         tunnel_header=ip_class_by_addr_type[p.addr_type](
138             dst=p.tun_dst,
139             src=p.tun_src),
140         nat_t_header=p.nat_header,
141         esn_en=esn_en)
142
143
144 def config_tra_params(p, encryption_type):
145     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
146                              IPSEC_API_SAD_FLAG_USE_ESN))
147     crypt_key = mk_scapy_crypt_key(p)
148     p.scapy_tra_sa = SecurityAssociation(
149         encryption_type,
150         spi=p.vpp_tra_spi,
151         crypt_algo=p.crypt_algo,
152         crypt_key=crypt_key,
153         auth_algo=p.auth_algo,
154         auth_key=p.auth_key,
155         nat_t_header=p.nat_header,
156         esn_en=esn_en)
157     p.vpp_tra_sa = SecurityAssociation(
158         encryption_type,
159         spi=p.scapy_tra_spi,
160         crypt_algo=p.crypt_algo,
161         crypt_key=crypt_key,
162         auth_algo=p.auth_algo,
163         auth_key=p.auth_key,
164         nat_t_header=p.nat_header,
165         esn_en=esn_en)
166
167
168 class TemplateIpsec(VppTestCase):
169     """
170     TRANSPORT MODE:
171
172      ------   encrypt   ---
173     |tra_if| <-------> |VPP|
174      ------   decrypt   ---
175
176     TUNNEL MODE:
177
178      ------   encrypt   ---   plain   ---
179     |tun_if| <-------  |VPP| <------ |pg1|
180      ------             ---           ---
181
182      ------   decrypt   ---   plain   ---
183     |tun_if| ------->  |VPP| ------> |pg1|
184      ------             ---           ---
185     """
186     tun_spd_id = 1
187     tra_spd_id = 2
188
189     def ipsec_select_backend(self):
190         """ empty method to be overloaded when necessary """
191         pass
192
193     @classmethod
194     def setUpClass(cls):
195         super(TemplateIpsec, cls).setUpClass()
196
197     @classmethod
198     def tearDownClass(cls):
199         super(TemplateIpsec, cls).tearDownClass()
200
201     def setup_params(self):
202         if not hasattr(self, 'ipv4_params'):
203             self.ipv4_params = IPsecIPv4Params()
204         if not hasattr(self, 'ipv6_params'):
205             self.ipv6_params = IPsecIPv6Params()
206         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
207                        self.ipv6_params.addr_type: self.ipv6_params}
208
209     def config_interfaces(self):
210         self.create_pg_interfaces(range(3))
211         self.interfaces = list(self.pg_interfaces)
212         for i in self.interfaces:
213             i.admin_up()
214             i.config_ip4()
215             i.resolve_arp()
216             i.config_ip6()
217             i.resolve_ndp()
218
219     def setUp(self):
220         super(TemplateIpsec, self).setUp()
221
222         self.setup_params()
223
224         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
225                                  IPSEC_API_PROTO_ESP)
226         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
227                                 IPSEC_API_PROTO_AH)
228
229         self.config_interfaces()
230
231         self.ipsec_select_backend()
232
233     def unconfig_interfaces(self):
234         for i in self.interfaces:
235             i.admin_down()
236             i.unconfig_ip4()
237             i.unconfig_ip6()
238
239     def tearDown(self):
240         super(TemplateIpsec, self).tearDown()
241
242         self.unconfig_interfaces()
243
244     def show_commands_at_teardown(self):
245         self.logger.info(self.vapi.cli("show hardware"))
246
247     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
248                          payload_size=54):
249         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
250                 sa.encrypt(IP(src=src, dst=dst) /
251                            ICMP() / Raw(b'X' * payload_size))
252                 for i in range(count)]
253
254     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
255                           payload_size=54):
256         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
257                 sa.encrypt(IPv6(src=src, dst=dst,
258                                 hlim=p.inner_hop_limit,
259                                 fl=p.inner_flow_label) /
260                            ICMPv6EchoRequest(id=0, seq=1,
261                                              data='X' * payload_size))
262                 for i in range(count)]
263
264     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
265         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
267                 for i in range(count)]
268
269     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
270         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
271                 IPv6(src=src, dst=dst,
272                      hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
273                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
274                 for i in range(count)]
275
276
277 class IpsecTcp(object):
278     def verify_tcp_checksum(self):
279         self.vapi.cli("test http server")
280         p = self.params[socket.AF_INET]
281         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
282                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
283                                           dst=self.tun_if.local_ip4) /
284                                        TCP(flags='S', dport=80)))
285         self.logger.debug(ppp("Sending packet:", send))
286         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
287         recv = recv[0]
288         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
289         self.assert_packet_checksums_valid(decrypted)
290
291
292 class IpsecTcpTests(IpsecTcp):
293     def test_tcp_checksum(self):
294         """ verify checksum correctness for vpp generated packets """
295         self.verify_tcp_checksum()
296
297
298 class IpsecTra4(object):
299     """ verify methods for Transport v4 """
300     def get_replay_counts(self, p):
301         replay_node_name = ('/err/%s/SA replayed packet' %
302                             self.tra4_decrypt_node_name[0])
303         count = self.statistics.get_err_counter(replay_node_name)
304
305         if p.async_mode:
306             replay_post_node_name = ('/err/%s/SA replayed packet' %
307                                      self.tra4_decrypt_node_name[p.async_mode])
308             count += self.statistics.get_err_counter(replay_post_node_name)
309
310         return count
311
312     def get_hash_failed_counts(self, p):
313         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
314             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
315                                      self.tra4_decrypt_node_name[p.async_mode])
316         else:
317             hash_failed_node_name = ('/err/%s/Integrity check failed' %
318                                      self.tra4_decrypt_node_name[p.async_mode])
319         count = self.statistics.get_err_counter(hash_failed_node_name)
320
321         if p.async_mode:
322             count += self.statistics.get_err_counter(
323                 '/err/crypto-dispatch/bad-hmac')
324
325         return count
326
327     def verify_tra_anti_replay(self):
328         p = self.params[socket.AF_INET]
329         esn_en = p.vpp_tra_sa.esn_en
330
331         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
332                                self.tra4_encrypt_node_name)
333         replay_count = self.get_replay_counts(p)
334         hash_failed_count = self.get_hash_failed_counts(p)
335         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
336
337         if ESP == self.encryption_type:
338             undersize_node_name = ('/err/%s/undersized packet' %
339                                    self.tra4_decrypt_node_name[0])
340             undersize_count = self.statistics.get_err_counter(
341                 undersize_node_name)
342
343         #
344         # send packets with seq numbers 1->34
345         # this means the window size is still in Case B (see RFC4303
346         # Appendix A)
347         #
348         # for reasons i haven't investigated Scapy won't create a packet with
349         # seq_num=0
350         #
351         pkts = [(Ether(src=self.tra_if.remote_mac,
352                        dst=self.tra_if.local_mac) /
353                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
354                                            dst=self.tra_if.local_ip4) /
355                                         ICMP(),
356                                         seq_num=seq))
357                 for seq in range(1, 34)]
358         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
359
360         # replayed packets are dropped
361         self.send_and_assert_no_replies(self.tra_if, pkts)
362         replay_count += len(pkts)
363         self.assertEqual(self.get_replay_counts(p), replay_count)
364
365         #
366         # now send a batch of packets all with the same sequence number
367         # the first packet in the batch is legitimate, the rest bogus
368         #
369         self.vapi.cli("clear error")
370         self.vapi.cli("clear node counters")
371         pkts = (Ether(src=self.tra_if.remote_mac,
372                       dst=self.tra_if.local_mac) /
373                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
374                                           dst=self.tra_if.local_ip4) /
375                                        ICMP(),
376                                        seq_num=35))
377         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
378                                          self.tra_if, n_rx=1)
379         replay_count += 7
380         self.assertEqual(self.get_replay_counts(p), replay_count)
381
382         #
383         # now move the window over to 257 (more than one byte) and into Case A
384         #
385         self.vapi.cli("clear error")
386         pkt = (Ether(src=self.tra_if.remote_mac,
387                      dst=self.tra_if.local_mac) /
388                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
389                                          dst=self.tra_if.local_ip4) /
390                                       ICMP(),
391                                       seq_num=257))
392         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
393
394         # replayed packets are dropped
395         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
396         replay_count += 3
397         self.assertEqual(self.get_replay_counts(p), replay_count)
398
399         # the window size is 64 packets
400         # in window are still accepted
401         pkt = (Ether(src=self.tra_if.remote_mac,
402                      dst=self.tra_if.local_mac) /
403                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
404                                          dst=self.tra_if.local_ip4) /
405                                       ICMP(),
406                                       seq_num=200))
407         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
408
409         # a packet that does not decrypt does not move the window forward
410         bogus_sa = SecurityAssociation(self.encryption_type,
411                                        p.vpp_tra_spi,
412                                        crypt_algo=p.crypt_algo,
413                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
414                                        auth_algo=p.auth_algo,
415                                        auth_key=p.auth_key[::-1])
416         pkt = (Ether(src=self.tra_if.remote_mac,
417                      dst=self.tra_if.local_mac) /
418                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
419                                    dst=self.tra_if.local_ip4) /
420                                 ICMP(),
421                                 seq_num=350))
422         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
423
424         hash_failed_count += 17
425         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
426
427         # a malformed 'runt' packet
428         #  created by a mis-constructed SA
429         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
430             bogus_sa = SecurityAssociation(self.encryption_type,
431                                            p.vpp_tra_spi)
432             pkt = (Ether(src=self.tra_if.remote_mac,
433                          dst=self.tra_if.local_mac) /
434                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
435                                        dst=self.tra_if.local_ip4) /
436                                     ICMP(),
437                                     seq_num=350))
438             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
439
440             undersize_count += 17
441             self.assert_error_counter_equal(undersize_node_name,
442                                             undersize_count)
443
444         # which we can determine since this packet is still in the window
445         pkt = (Ether(src=self.tra_if.remote_mac,
446                      dst=self.tra_if.local_mac) /
447                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
448                                          dst=self.tra_if.local_ip4) /
449                                       ICMP(),
450                                       seq_num=234))
451         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
452
453         #
454         # out of window are dropped
455         #  this is Case B. So VPP will consider this to be a high seq num wrap
456         #  and so the decrypt attempt will fail
457         #
458         pkt = (Ether(src=self.tra_if.remote_mac,
459                      dst=self.tra_if.local_mac) /
460                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
461                                          dst=self.tra_if.local_ip4) /
462                                       ICMP(),
463                                       seq_num=17))
464         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
465
466         if esn_en:
467             # an out of window error with ESN looks like a high sequence
468             # wrap. but since it isn't then the verify will fail.
469             hash_failed_count += 17
470             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
471
472         else:
473             replay_count += 17
474             self.assertEqual(self.get_replay_counts(p), replay_count)
475
476         # valid packet moves the window over to 258
477         pkt = (Ether(src=self.tra_if.remote_mac,
478                      dst=self.tra_if.local_mac) /
479                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
480                                          dst=self.tra_if.local_ip4) /
481                                       ICMP(),
482                                       seq_num=258))
483         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
484         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
485
486         #
487         # move VPP's SA TX seq-num to just before the seq-number wrap.
488         # then fire in a packet that VPP should drop on TX because it
489         # causes the TX seq number to wrap; unless we're using extened sequence
490         # numbers.
491         #
492         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
493         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
494         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
495
496         pkts = [(Ether(src=self.tra_if.remote_mac,
497                        dst=self.tra_if.local_mac) /
498                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
499                                            dst=self.tra_if.local_ip4) /
500                                         ICMP(),
501                                         seq_num=seq))
502                 for seq in range(259, 280)]
503
504         if esn_en:
505             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
506
507             #
508             # in order for scapy to decrypt its SA's high order number needs
509             # to wrap
510             #
511             p.vpp_tra_sa.seq_num = 0x100000000
512             for rx in rxs:
513                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
514
515             #
516             # wrap scapy's TX high sequence number. VPP is in case B, so it
517             # will consider this a high seq wrap also.
518             # The low seq num we set it to will place VPP's RX window in Case A
519             #
520             p.scapy_tra_sa.seq_num = 0x100000005
521             pkt = (Ether(src=self.tra_if.remote_mac,
522                          dst=self.tra_if.local_mac) /
523                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
524                                              dst=self.tra_if.local_ip4) /
525                                           ICMP(),
526                                           seq_num=0x100000005))
527             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
528             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
529
530             #
531             # A packet that has seq num between (2^32-64) and 5 is within
532             # the window
533             #
534             p.scapy_tra_sa.seq_num = 0xfffffffd
535             pkt = (Ether(src=self.tra_if.remote_mac,
536                          dst=self.tra_if.local_mac) /
537                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
538                                              dst=self.tra_if.local_ip4) /
539                                           ICMP(),
540                                           seq_num=0xfffffffd))
541             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
542             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
543
544             #
545             # While in case A we cannot wrap the high sequence number again
546             # becuase VPP will consider this packet to be one that moves the
547             # window forward
548             #
549             pkt = (Ether(src=self.tra_if.remote_mac,
550                          dst=self.tra_if.local_mac) /
551                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
552                                              dst=self.tra_if.local_ip4) /
553                                           ICMP(),
554                                           seq_num=0x200000999))
555             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
556
557             hash_failed_count += 1
558             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
559
560             #
561             # but if we move the wondow forward to case B, then we can wrap
562             # again
563             #
564             p.scapy_tra_sa.seq_num = 0x100000555
565             pkt = (Ether(src=self.tra_if.remote_mac,
566                          dst=self.tra_if.local_mac) /
567                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
568                                              dst=self.tra_if.local_ip4) /
569                                           ICMP(),
570                                           seq_num=0x100000555))
571             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
572             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
573
574             p.scapy_tra_sa.seq_num = 0x200000444
575             pkt = (Ether(src=self.tra_if.remote_mac,
576                          dst=self.tra_if.local_mac) /
577                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
578                                              dst=self.tra_if.local_ip4) /
579                                           ICMP(),
580                                           seq_num=0x200000444))
581             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
582             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
583
584         else:
585             #
586             # without ESN TX sequence numbers can't wrap and packets are
587             # dropped from here on out.
588             #
589             self.send_and_assert_no_replies(self.tra_if, pkts)
590             seq_cycle_count += len(pkts)
591             self.assert_error_counter_equal(seq_cycle_node_name,
592                                             seq_cycle_count)
593
594         # move the security-associations seq number on to the last we used
595         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
596         p.scapy_tra_sa.seq_num = 351
597         p.vpp_tra_sa.seq_num = 351
598
599     def verify_tra_basic4(self, count=1, payload_size=54):
600         """ ipsec v4 transport basic test """
601         self.vapi.cli("clear errors")
602         self.vapi.cli("clear ipsec sa")
603         try:
604             p = self.params[socket.AF_INET]
605             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
606                                               src=self.tra_if.remote_ip4,
607                                               dst=self.tra_if.local_ip4,
608                                               count=count,
609                                               payload_size=payload_size)
610             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
611                                              self.tra_if)
612             for rx in recv_pkts:
613                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
614                 self.assert_packet_checksums_valid(rx)
615                 try:
616                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
617                     self.assert_packet_checksums_valid(decrypted)
618                 except:
619                     self.logger.debug(ppp("Unexpected packet:", rx))
620                     raise
621         finally:
622             self.logger.info(self.vapi.ppcli("show error"))
623             self.logger.info(self.vapi.ppcli("show ipsec all"))
624
625         pkts = p.tra_sa_in.get_stats()['packets']
626         self.assertEqual(pkts, count,
627                          "incorrect SA in counts: expected %d != %d" %
628                          (count, pkts))
629         pkts = p.tra_sa_out.get_stats()['packets']
630         self.assertEqual(pkts, count,
631                          "incorrect SA out counts: expected %d != %d" %
632                          (count, pkts))
633
634         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
635         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
636
637
638 class IpsecTra4Tests(IpsecTra4):
639     """ UT test methods for Transport v4 """
640     def test_tra_anti_replay(self):
641         """ ipsec v4 transport anti-replay test """
642         self.verify_tra_anti_replay()
643
644     def test_tra_basic(self, count=1):
645         """ ipsec v4 transport basic test """
646         self.verify_tra_basic4(count=1)
647
648     def test_tra_burst(self):
649         """ ipsec v4 transport burst test """
650         self.verify_tra_basic4(count=257)
651
652
653 class IpsecTra6(object):
654     """ verify methods for Transport v6 """
655     def verify_tra_basic6(self, count=1, payload_size=54):
656         self.vapi.cli("clear errors")
657         self.vapi.cli("clear ipsec sa")
658         try:
659             p = self.params[socket.AF_INET6]
660             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
661                                                src=self.tra_if.remote_ip6,
662                                                dst=self.tra_if.local_ip6,
663                                                count=count,
664                                                payload_size=payload_size)
665             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
666                                              self.tra_if)
667             for rx in recv_pkts:
668                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
669                                  rx[IPv6].plen)
670                 try:
671                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
672                     self.assert_packet_checksums_valid(decrypted)
673                 except:
674                     self.logger.debug(ppp("Unexpected packet:", rx))
675                     raise
676         finally:
677             self.logger.info(self.vapi.ppcli("show error"))
678             self.logger.info(self.vapi.ppcli("show ipsec all"))
679
680         pkts = p.tra_sa_in.get_stats()['packets']
681         self.assertEqual(pkts, count,
682                          "incorrect SA in counts: expected %d != %d" %
683                          (count, pkts))
684         pkts = p.tra_sa_out.get_stats()['packets']
685         self.assertEqual(pkts, count,
686                          "incorrect SA out counts: expected %d != %d" %
687                          (count, pkts))
688         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
689         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
690
691     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
692                                    payload_size=54):
693         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
694                 sa.encrypt(IPv6(src=src, dst=dst) /
695                            ICMPv6EchoRequest(id=0, seq=1,
696                                              data='X' * payload_size))
697                 for i in range(count)]
698
699     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
700         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
701                 IPv6(src=src, dst=dst) /
702                 IPv6ExtHdrHopByHop() /
703                 IPv6ExtHdrFragment(id=2, offset=200) /
704                 Raw(b'\xff' * 200)
705                 for i in range(count)]
706
707     def verify_tra_encrypted6(self, p, sa, rxs):
708         decrypted = []
709         for rx in rxs:
710             self.assert_packet_checksums_valid(rx)
711             try:
712                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
713                 decrypted.append(decrypt_pkt)
714                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
715                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
716             except:
717                 self.logger.debug(ppp("Unexpected packet:", rx))
718                 try:
719                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
720                 except:
721                     pass
722                 raise
723         return decrypted
724
725     def verify_tra_66_ext_hdrs(self, p):
726         count = 63
727
728         #
729         # check we can decrypt with options
730         #
731         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
732                                              src=self.tra_if.remote_ip6,
733                                              dst=self.tra_if.local_ip6,
734                                              count=count)
735         self.send_and_expect(self.tra_if, tx, self.tra_if)
736
737         #
738         # injecting a packet from ourselves to be routed of box is a hack
739         # but it matches an outbout policy, alors je ne regrette rien
740         #
741
742         # one extension before ESP
743         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
744               IPv6(src=self.tra_if.local_ip6,
745                    dst=self.tra_if.remote_ip6) /
746               IPv6ExtHdrFragment(id=2, offset=200) /
747               Raw(b'\xff' * 200))
748
749         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
750         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
751
752         for dc in dcs:
753             # for reasons i'm not going to investigate scapy does not
754             # created the correct headers after decrypt. but reparsing
755             # the ipv6 packet fixes it
756             dc = IPv6(raw(dc[IPv6]))
757             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
758
759         # two extensions before ESP
760         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
761               IPv6(src=self.tra_if.local_ip6,
762                    dst=self.tra_if.remote_ip6) /
763               IPv6ExtHdrHopByHop() /
764               IPv6ExtHdrFragment(id=2, offset=200) /
765               Raw(b'\xff' * 200))
766
767         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
768         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
769
770         for dc in dcs:
771             dc = IPv6(raw(dc[IPv6]))
772             self.assertTrue(dc[IPv6ExtHdrHopByHop])
773             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
774
775         # two extensions before ESP, one after
776         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
777               IPv6(src=self.tra_if.local_ip6,
778                    dst=self.tra_if.remote_ip6) /
779               IPv6ExtHdrHopByHop() /
780               IPv6ExtHdrFragment(id=2, offset=200) /
781               IPv6ExtHdrDestOpt() /
782               Raw(b'\xff' * 200))
783
784         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
785         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
786
787         for dc in dcs:
788             dc = IPv6(raw(dc[IPv6]))
789             self.assertTrue(dc[IPv6ExtHdrDestOpt])
790             self.assertTrue(dc[IPv6ExtHdrHopByHop])
791             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
792
793
794 class IpsecTra6Tests(IpsecTra6):
795     """ UT test methods for Transport v6 """
796     def test_tra_basic6(self):
797         """ ipsec v6 transport basic test """
798         self.verify_tra_basic6(count=1)
799
800     def test_tra_burst6(self):
801         """ ipsec v6 transport burst test """
802         self.verify_tra_basic6(count=257)
803
804
805 class IpsecTra6ExtTests(IpsecTra6):
806     def test_tra_ext_hdrs_66(self):
807         """ ipsec 6o6 tra extension headers test """
808         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
809
810
811 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
812     """ UT test methods for Transport v6 and v4"""
813     pass
814
815
816 class IpsecTun4(object):
817     """ verify methods for Tunnel v4 """
818     def verify_counters4(self, p, count, n_frags=None, worker=None):
819         if not n_frags:
820             n_frags = count
821         if (hasattr(p, "spd_policy_in_any")):
822             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
823             self.assertEqual(pkts, count,
824                              "incorrect SPD any policy: expected %d != %d" %
825                              (count, pkts))
826
827         if (hasattr(p, "tun_sa_in")):
828             pkts = p.tun_sa_in.get_stats(worker)['packets']
829             self.assertEqual(pkts, count,
830                              "incorrect SA in counts: expected %d != %d" %
831                              (count, pkts))
832             pkts = p.tun_sa_out.get_stats(worker)['packets']
833             self.assertEqual(pkts, n_frags,
834                              "incorrect SA out counts: expected %d != %d" %
835                              (count, pkts))
836
837         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
838         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
839
840     def verify_decrypted(self, p, rxs):
841         for rx in rxs:
842             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
843             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
844             self.assert_packet_checksums_valid(rx)
845
846     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
847         align = sa.crypt_algo.block_size
848         if align < 4:
849             align = 4
850         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
851         exp_len += sa.crypt_algo.iv_size
852         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
853         self.assertEqual(exp_len, len(esp_payload))
854
855     def verify_encrypted(self, p, sa, rxs):
856         decrypt_pkts = []
857         for rx in rxs:
858             if p.nat_header:
859                 self.assertEqual(rx[UDP].dport, 4500)
860             self.assert_packet_checksums_valid(rx)
861             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
862             try:
863                 rx_ip = rx[IP]
864                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
865                 if not decrypt_pkt.haslayer(IP):
866                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
867                 if rx_ip.proto == socket.IPPROTO_ESP:
868                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
869                 decrypt_pkts.append(decrypt_pkt)
870                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
871                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
872             except:
873                 self.logger.debug(ppp("Unexpected packet:", rx))
874                 try:
875                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
876                 except:
877                     pass
878                 raise
879         pkts = reassemble4(decrypt_pkts)
880         for pkt in pkts:
881             self.assert_packet_checksums_valid(pkt)
882
883     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
884         self.vapi.cli("clear errors")
885         self.vapi.cli("clear ipsec counters")
886         self.vapi.cli("clear ipsec sa")
887         if not n_rx:
888             n_rx = count
889         try:
890             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
891                                               src=p.remote_tun_if_host,
892                                               dst=self.pg1.remote_ip4,
893                                               count=count,
894                                               payload_size=payload_size)
895             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
896             self.verify_decrypted(p, recv_pkts)
897
898             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
899                                       dst=p.remote_tun_if_host, count=count,
900                                       payload_size=payload_size)
901             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
902                                              self.tun_if, n_rx)
903             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
904
905             for rx in recv_pkts:
906                 self.assertEqual(rx[IP].src, p.tun_src)
907                 self.assertEqual(rx[IP].dst, p.tun_dst)
908
909         finally:
910             self.logger.info(self.vapi.ppcli("show error"))
911             self.logger.info(self.vapi.ppcli("show ipsec all"))
912
913         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
914         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
915         self.verify_counters4(p, count, n_rx)
916
917     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
918         self.vapi.cli("clear errors")
919         if not n_rx:
920             n_rx = count
921         try:
922             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
923                                               src=p.remote_tun_if_host,
924                                               dst=self.pg1.remote_ip4,
925                                               count=count)
926             self.send_and_assert_no_replies(self.tun_if, send_pkts)
927
928             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
929                                       dst=p.remote_tun_if_host, count=count,
930                                       payload_size=payload_size)
931             self.send_and_assert_no_replies(self.pg1, send_pkts)
932
933         finally:
934             self.logger.info(self.vapi.ppcli("show error"))
935             self.logger.info(self.vapi.ppcli("show ipsec all"))
936
937     def verify_tun_reass_44(self, p):
938         self.vapi.cli("clear errors")
939         self.vapi.ip_reassembly_enable_disable(
940             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
941
942         try:
943             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
944                                               src=p.remote_tun_if_host,
945                                               dst=self.pg1.remote_ip4,
946                                               payload_size=1900,
947                                               count=1)
948             send_pkts = fragment_rfc791(send_pkts[0], 1400)
949             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
950                                              self.pg1, n_rx=1)
951             self.verify_decrypted(p, recv_pkts)
952
953             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
954                                       dst=p.remote_tun_if_host, count=1)
955             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
956                                              self.tun_if)
957             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
958
959         finally:
960             self.logger.info(self.vapi.ppcli("show error"))
961             self.logger.info(self.vapi.ppcli("show ipsec all"))
962
963         self.verify_counters4(p, 1, 1)
964         self.vapi.ip_reassembly_enable_disable(
965             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
966
967     def verify_tun_64(self, p, count=1):
968         self.vapi.cli("clear errors")
969         self.vapi.cli("clear ipsec sa")
970         try:
971             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
972                                                src=p.remote_tun_if_host6,
973                                                dst=self.pg1.remote_ip6,
974                                                count=count)
975             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
976             for recv_pkt in recv_pkts:
977                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
978                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
979                 self.assert_packet_checksums_valid(recv_pkt)
980             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
981                                        dst=p.remote_tun_if_host6, count=count)
982             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
983             for recv_pkt in recv_pkts:
984                 try:
985                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
986                     if not decrypt_pkt.haslayer(IPv6):
987                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
988                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
989                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
990                     self.assert_packet_checksums_valid(decrypt_pkt)
991                 except:
992                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
993                     try:
994                         self.logger.debug(
995                             ppp("Decrypted packet:", decrypt_pkt))
996                     except:
997                         pass
998                     raise
999         finally:
1000             self.logger.info(self.vapi.ppcli("show error"))
1001             self.logger.info(self.vapi.ppcli("show ipsec all"))
1002
1003         self.verify_counters4(p, count)
1004
1005     def verify_keepalive(self, p):
1006         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1007                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1008                UDP(sport=333, dport=4500) /
1009                Raw(b'\xff'))
1010         self.send_and_assert_no_replies(self.tun_if, pkt*31)
1011         self.assert_error_counter_equal(
1012             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
1013
1014         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1015                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1016                UDP(sport=333, dport=4500) /
1017                Raw(b'\xfe'))
1018         self.send_and_assert_no_replies(self.tun_if, pkt*31)
1019         self.assert_error_counter_equal(
1020             '/err/%s/Too Short' % self.tun4_input_node, 31)
1021
1022
1023 class IpsecTun4Tests(IpsecTun4):
1024     """ UT test methods for Tunnel v4 """
1025     def test_tun_basic44(self):
1026         """ ipsec 4o4 tunnel basic test """
1027         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1028         self.tun_if.admin_down()
1029         self.tun_if.resolve_arp()
1030         self.tun_if.admin_up()
1031         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1032
1033     def test_tun_reass_basic44(self):
1034         """ ipsec 4o4 tunnel basic reassembly test """
1035         self.verify_tun_reass_44(self.params[socket.AF_INET])
1036
1037     def test_tun_burst44(self):
1038         """ ipsec 4o4 tunnel burst test """
1039         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1040
1041
1042 class IpsecTun6(object):
1043     """ verify methods for Tunnel v6 """
1044     def verify_counters6(self, p_in, p_out, count, worker=None):
1045         if (hasattr(p_in, "tun_sa_in")):
1046             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1047             self.assertEqual(pkts, count,
1048                              "incorrect SA in counts: expected %d != %d" %
1049                              (count, pkts))
1050         if (hasattr(p_out, "tun_sa_out")):
1051             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1052             self.assertEqual(pkts, count,
1053                              "incorrect SA out counts: expected %d != %d" %
1054                              (count, pkts))
1055         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1056         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1057
1058     def verify_decrypted6(self, p, rxs):
1059         for rx in rxs:
1060             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1061             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1062             self.assert_packet_checksums_valid(rx)
1063
1064     def verify_encrypted6(self, p, sa, rxs):
1065         for rx in rxs:
1066             self.assert_packet_checksums_valid(rx)
1067             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1068                              rx[IPv6].plen)
1069             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1070             if p.outer_flow_label:
1071                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1072             try:
1073                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1074                 if not decrypt_pkt.haslayer(IPv6):
1075                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1076                 self.assert_packet_checksums_valid(decrypt_pkt)
1077                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1078                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1079                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1080                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1081             except:
1082                 self.logger.debug(ppp("Unexpected packet:", rx))
1083                 try:
1084                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1085                 except:
1086                     pass
1087                 raise
1088
1089     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1090         self.vapi.cli("clear errors")
1091         self.vapi.cli("clear ipsec sa")
1092
1093         send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1094                                            self.tun_if,
1095                                            src=p_in.remote_tun_if_host,
1096                                            dst=self.pg1.remote_ip6,
1097                                            count=count)
1098         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1099         self.logger.info(self.vapi.cli("sh punt stats"))
1100
1101     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1102         self.vapi.cli("clear errors")
1103         self.vapi.cli("clear ipsec sa")
1104         if not p_out:
1105             p_out = p_in
1106         try:
1107             send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1108                                                self.tun_if,
1109                                                src=p_in.remote_tun_if_host,
1110                                                dst=self.pg1.remote_ip6,
1111                                                count=count,
1112                                                payload_size=payload_size)
1113             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1114             self.verify_decrypted6(p_in, recv_pkts)
1115
1116             send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
1117                                        dst=p_out.remote_tun_if_host,
1118                                        count=count,
1119                                        payload_size=payload_size)
1120             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1121             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1122
1123             for rx in recv_pkts:
1124                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1125                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1126
1127         finally:
1128             self.logger.info(self.vapi.ppcli("show error"))
1129             self.logger.info(self.vapi.ppcli("show ipsec all"))
1130         self.verify_counters6(p_in, p_out, count)
1131
1132     def verify_tun_reass_66(self, p):
1133         self.vapi.cli("clear errors")
1134         self.vapi.ip_reassembly_enable_disable(
1135             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1136
1137         try:
1138             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1139                                                src=p.remote_tun_if_host,
1140                                                dst=self.pg1.remote_ip6,
1141                                                count=1,
1142                                                payload_size=1850)
1143             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1144             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1145                                              self.pg1, n_rx=1)
1146             self.verify_decrypted6(p, recv_pkts)
1147
1148             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1149                                        dst=p.remote_tun_if_host,
1150                                        count=1,
1151                                        payload_size=64)
1152             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1153                                              self.tun_if)
1154             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1155         finally:
1156             self.logger.info(self.vapi.ppcli("show error"))
1157             self.logger.info(self.vapi.ppcli("show ipsec all"))
1158         self.verify_counters6(p, p, 1)
1159         self.vapi.ip_reassembly_enable_disable(
1160             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1161
1162     def verify_tun_46(self, p, count=1):
1163         """ ipsec 4o6 tunnel basic test """
1164         self.vapi.cli("clear errors")
1165         self.vapi.cli("clear ipsec sa")
1166         try:
1167             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1168                                               src=p.remote_tun_if_host4,
1169                                               dst=self.pg1.remote_ip4,
1170                                               count=count)
1171             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1172             for recv_pkt in recv_pkts:
1173                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1174                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1175                 self.assert_packet_checksums_valid(recv_pkt)
1176             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1177                                       dst=p.remote_tun_if_host4,
1178                                       count=count)
1179             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1180             for recv_pkt in recv_pkts:
1181                 try:
1182                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1183                     if not decrypt_pkt.haslayer(IP):
1184                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1185                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1186                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1187                     self.assert_packet_checksums_valid(decrypt_pkt)
1188                 except:
1189                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1190                     try:
1191                         self.logger.debug(ppp("Decrypted packet:",
1192                                               decrypt_pkt))
1193                     except:
1194                         pass
1195                     raise
1196         finally:
1197             self.logger.info(self.vapi.ppcli("show error"))
1198             self.logger.info(self.vapi.ppcli("show ipsec all"))
1199         self.verify_counters6(p, p, count)
1200
1201
1202 class IpsecTun6Tests(IpsecTun6):
1203     """ UT test methods for Tunnel v6 """
1204
1205     def test_tun_basic66(self):
1206         """ ipsec 6o6 tunnel basic test """
1207         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1208
1209     def test_tun_reass_basic66(self):
1210         """ ipsec 6o6 tunnel basic reassembly test """
1211         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1212
1213     def test_tun_burst66(self):
1214         """ ipsec 6o6 tunnel burst test """
1215         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1216
1217
1218 class IpsecTun6HandoffTests(IpsecTun6):
1219     """ UT test methods for Tunnel v6 with multiple workers """
1220     worker_config = "workers 2"
1221
1222     def test_tun_handoff_66(self):
1223         """ ipsec 6o6 tunnel worker hand-off test """
1224         self.vapi.cli("clear errors")
1225         self.vapi.cli("clear ipsec sa")
1226
1227         N_PKTS = 15
1228         p = self.params[socket.AF_INET6]
1229
1230         # inject alternately on worker 0 and 1. all counts on the SA
1231         # should be against worker 0
1232         for worker in [0, 1, 0, 1]:
1233             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1234                                                src=p.remote_tun_if_host,
1235                                                dst=self.pg1.remote_ip6,
1236                                                count=N_PKTS)
1237             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1238                                              self.pg1, worker=worker)
1239             self.verify_decrypted6(p, recv_pkts)
1240
1241             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1242                                        dst=p.remote_tun_if_host,
1243                                        count=N_PKTS)
1244             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1245                                              self.tun_if, worker=worker)
1246             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1247
1248         # all counts against the first worker that was used
1249         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1250
1251
1252 class IpsecTun4HandoffTests(IpsecTun4):
1253     """ UT test methods for Tunnel v4 with multiple workers """
1254     worker_config = "workers 2"
1255
1256     def test_tun_handooff_44(self):
1257         """ ipsec 4o4 tunnel worker hand-off test """
1258         self.vapi.cli("clear errors")
1259         self.vapi.cli("clear ipsec sa")
1260
1261         N_PKTS = 15
1262         p = self.params[socket.AF_INET]
1263
1264         # inject alternately on worker 0 and 1. all counts on the SA
1265         # should be against worker 0
1266         for worker in [0, 1, 0, 1]:
1267             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1268                                               src=p.remote_tun_if_host,
1269                                               dst=self.pg1.remote_ip4,
1270                                               count=N_PKTS)
1271             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1272                                              self.pg1, worker=worker)
1273             self.verify_decrypted(p, recv_pkts)
1274
1275             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1276                                       dst=p.remote_tun_if_host,
1277                                       count=N_PKTS)
1278             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1279                                              self.tun_if, worker=worker)
1280             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1281
1282         # all counts against the first worker that was used
1283         self.verify_counters4(p, 4*N_PKTS, worker=0)
1284
1285
1286 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1287     """ UT test methods for Tunnel v6 & v4 """
1288     pass
1289
1290
1291 if __name__ == '__main__':
1292     unittest.main(testRunner=VppTestRunner)