crypto crypto-openssl: support hashing operations
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params:
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 100
31         self.scapy_tun_spi = 1000
32         self.vpp_tun_sa_id = 200
33         self.vpp_tun_spi = 2000
34
35         self.scapy_tra_sa_id = 300
36         self.scapy_tra_spi = 3000
37         self.vpp_tra_sa_id = 400
38         self.vpp_tra_spi = 4000
39
40         self.outer_hop_limit = 64
41         self.inner_hop_limit = 255
42         self.outer_flow_label = 0
43         self.inner_flow_label = 0x12345
44
45         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
46                                  IPSEC_API_INTEG_ALG_SHA1_96)
47         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
48         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
49
50         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
51                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
52         self.crypt_algo = 'AES-CBC'  # scapy name
53         self.crypt_key = b'JPjyOWBeVEQiMe7h'
54         self.salt = 0
55         self.flags = 0
56         self.nat_header = None
57         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
58                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
59         self.dscp = 0
60         self.async_mode = False
61
62
63 class IPsecIPv6Params:
64
65     addr_type = socket.AF_INET6
66     addr_any = "0::0"
67     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
68     addr_len = 128
69     is_ipv6 = 1
70
71     def __init__(self):
72         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
73         self.remote_tun_if_host4 = '1.1.1.1'
74
75         self.scapy_tun_sa_id = 500
76         self.scapy_tun_spi = 3001
77         self.vpp_tun_sa_id = 600
78         self.vpp_tun_spi = 3000
79
80         self.scapy_tra_sa_id = 700
81         self.scapy_tra_spi = 4001
82         self.vpp_tra_sa_id = 800
83         self.vpp_tra_spi = 4000
84
85         self.outer_hop_limit = 64
86         self.inner_hop_limit = 255
87         self.outer_flow_label = 0
88         self.inner_flow_label = 0x12345
89
90         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
91                                  IPSEC_API_INTEG_ALG_SHA1_96)
92         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
93         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
94
95         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
96                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
97         self.crypt_algo = 'AES-CBC'  # scapy name
98         self.crypt_key = b'JPjyOWBeVEQiMe7h'
99         self.salt = 0
100         self.flags = 0
101         self.nat_header = None
102         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
103                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
104         self.dscp = 0
105         self.async_mode = False
106
107
108 def mk_scapy_crypt_key(p):
109     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
110         return p.crypt_key + struct.pack("!I", p.salt)
111     else:
112         return p.crypt_key
113
114
115 def config_tun_params(p, encryption_type, tun_if):
116     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
117     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
118                              IPSEC_API_SAD_FLAG_USE_ESN))
119     p.tun_dst = tun_if.remote_addr[p.addr_type]
120     p.tun_src = tun_if.local_addr[p.addr_type]
121     crypt_key = mk_scapy_crypt_key(p)
122     p.scapy_tun_sa = SecurityAssociation(
123         encryption_type, spi=p.vpp_tun_spi,
124         crypt_algo=p.crypt_algo,
125         crypt_key=crypt_key,
126         auth_algo=p.auth_algo, auth_key=p.auth_key,
127         tunnel_header=ip_class_by_addr_type[p.addr_type](
128             src=p.tun_dst,
129             dst=p.tun_src),
130         nat_t_header=p.nat_header,
131         esn_en=esn_en)
132     p.vpp_tun_sa = SecurityAssociation(
133         encryption_type, spi=p.scapy_tun_spi,
134         crypt_algo=p.crypt_algo,
135         crypt_key=crypt_key,
136         auth_algo=p.auth_algo, auth_key=p.auth_key,
137         tunnel_header=ip_class_by_addr_type[p.addr_type](
138             dst=p.tun_dst,
139             src=p.tun_src),
140         nat_t_header=p.nat_header,
141         esn_en=esn_en)
142
143
144 def config_tra_params(p, encryption_type):
145     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
146                              IPSEC_API_SAD_FLAG_USE_ESN))
147     crypt_key = mk_scapy_crypt_key(p)
148     p.scapy_tra_sa = SecurityAssociation(
149         encryption_type,
150         spi=p.vpp_tra_spi,
151         crypt_algo=p.crypt_algo,
152         crypt_key=crypt_key,
153         auth_algo=p.auth_algo,
154         auth_key=p.auth_key,
155         nat_t_header=p.nat_header,
156         esn_en=esn_en)
157     p.vpp_tra_sa = SecurityAssociation(
158         encryption_type,
159         spi=p.scapy_tra_spi,
160         crypt_algo=p.crypt_algo,
161         crypt_key=crypt_key,
162         auth_algo=p.auth_algo,
163         auth_key=p.auth_key,
164         nat_t_header=p.nat_header,
165         esn_en=esn_en)
166
167
168 class TemplateIpsec(VppTestCase):
169     """
170     TRANSPORT MODE:
171
172      ------   encrypt   ---
173     |tra_if| <-------> |VPP|
174      ------   decrypt   ---
175
176     TUNNEL MODE:
177
178      ------   encrypt   ---   plain   ---
179     |tun_if| <-------  |VPP| <------ |pg1|
180      ------             ---           ---
181
182      ------   decrypt   ---   plain   ---
183     |tun_if| ------->  |VPP| ------> |pg1|
184      ------             ---           ---
185     """
186     tun_spd_id = 1
187     tra_spd_id = 2
188
189     def ipsec_select_backend(self):
190         """ empty method to be overloaded when necessary """
191         pass
192
193     @classmethod
194     def setUpClass(cls):
195         super(TemplateIpsec, cls).setUpClass()
196
197     @classmethod
198     def tearDownClass(cls):
199         super(TemplateIpsec, cls).tearDownClass()
200
201     def setup_params(self):
202         if not hasattr(self, 'ipv4_params'):
203             self.ipv4_params = IPsecIPv4Params()
204         if not hasattr(self, 'ipv6_params'):
205             self.ipv6_params = IPsecIPv6Params()
206         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
207                        self.ipv6_params.addr_type: self.ipv6_params}
208
209     def config_interfaces(self):
210         self.create_pg_interfaces(range(3))
211         self.interfaces = list(self.pg_interfaces)
212         for i in self.interfaces:
213             i.admin_up()
214             i.config_ip4()
215             i.resolve_arp()
216             i.config_ip6()
217             i.resolve_ndp()
218
219     def setUp(self):
220         super(TemplateIpsec, self).setUp()
221
222         self.setup_params()
223
224         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
225                                  IPSEC_API_PROTO_ESP)
226         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
227                                 IPSEC_API_PROTO_AH)
228
229         self.config_interfaces()
230
231         self.ipsec_select_backend()
232
233     def unconfig_interfaces(self):
234         for i in self.interfaces:
235             i.admin_down()
236             i.unconfig_ip4()
237             i.unconfig_ip6()
238
239     def tearDown(self):
240         super(TemplateIpsec, self).tearDown()
241
242         self.unconfig_interfaces()
243
244     def show_commands_at_teardown(self):
245         self.logger.info(self.vapi.cli("show hardware"))
246
247     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
248                          payload_size=54):
249         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
250                 sa.encrypt(IP(src=src, dst=dst) /
251                            ICMP() / Raw(b'X' * payload_size))
252                 for i in range(count)]
253
254     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
255                           payload_size=54):
256         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
257                 sa.encrypt(IPv6(src=src, dst=dst,
258                                 hlim=p.inner_hop_limit,
259                                 fl=p.inner_flow_label) /
260                            ICMPv6EchoRequest(id=0, seq=1,
261                                              data='X' * payload_size))
262                 for i in range(count)]
263
264     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
265         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
267                 for i in range(count)]
268
269     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
270         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
271                 IPv6(src=src, dst=dst,
272                      hlim=p.inner_hop_limit, fl=p.inner_flow_label) /
273                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
274                 for i in range(count)]
275
276
277 class IpsecTcp(object):
278     def verify_tcp_checksum(self):
279         self.vapi.cli("test http server")
280         p = self.params[socket.AF_INET]
281         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
282                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
283                                           dst=self.tun_if.local_ip4) /
284                                        TCP(flags='S', dport=80)))
285         self.logger.debug(ppp("Sending packet:", send))
286         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
287         recv = recv[0]
288         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
289         self.assert_packet_checksums_valid(decrypted)
290
291
292 class IpsecTcpTests(IpsecTcp):
293     def test_tcp_checksum(self):
294         """ verify checksum correctness for vpp generated packets """
295         self.verify_tcp_checksum()
296
297
298 class IpsecTra4(object):
299     """ verify methods for Transport v4 """
300     def get_replay_counts(self, p):
301         replay_node_name = ('/err/%s/SA replayed packet' %
302                             self.tra4_decrypt_node_name[0])
303         count = self.statistics.get_err_counter(replay_node_name)
304
305         if p.async_mode:
306             replay_post_node_name = ('/err/%s/SA replayed packet' %
307                                      self.tra4_decrypt_node_name[p.async_mode])
308             count += self.statistics.get_err_counter(replay_post_node_name)
309
310         return count
311
312     def get_hash_failed_counts(self, p):
313         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
314             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
315                                      self.tra4_decrypt_node_name[p.async_mode])
316         else:
317             hash_failed_node_name = ('/err/%s/Integrity check failed' %
318                                      self.tra4_decrypt_node_name[p.async_mode])
319         count = self.statistics.get_err_counter(hash_failed_node_name)
320
321         if p.async_mode:
322             count += self.statistics.get_err_counter(
323                 '/err/crypto-dispatch/bad-hmac')
324
325         return count
326
327     def verify_tra_anti_replay(self):
328         p = self.params[socket.AF_INET]
329         esn_en = p.vpp_tra_sa.esn_en
330
331         seq_cycle_node_name = \
332             ('/err/%s/sequence number cycled (packet dropped)' %
333              self.tra4_encrypt_node_name)
334         replay_count = self.get_replay_counts(p)
335         hash_failed_count = self.get_hash_failed_counts(p)
336         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
337
338         if ESP == self.encryption_type:
339             undersize_node_name = ('/err/%s/undersized packet' %
340                                    self.tra4_decrypt_node_name[0])
341             undersize_count = self.statistics.get_err_counter(
342                 undersize_node_name)
343
344         #
345         # send packets with seq numbers 1->34
346         # this means the window size is still in Case B (see RFC4303
347         # Appendix A)
348         #
349         # for reasons i haven't investigated Scapy won't create a packet with
350         # seq_num=0
351         #
352         pkts = [(Ether(src=self.tra_if.remote_mac,
353                        dst=self.tra_if.local_mac) /
354                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
355                                            dst=self.tra_if.local_ip4) /
356                                         ICMP(),
357                                         seq_num=seq))
358                 for seq in range(1, 34)]
359         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
360
361         # replayed packets are dropped
362         self.send_and_assert_no_replies(self.tra_if, pkts)
363         replay_count += len(pkts)
364         self.assertEqual(self.get_replay_counts(p), replay_count)
365
366         #
367         # now send a batch of packets all with the same sequence number
368         # the first packet in the batch is legitimate, the rest bogus
369         #
370         self.vapi.cli("clear error")
371         self.vapi.cli("clear node counters")
372         pkts = (Ether(src=self.tra_if.remote_mac,
373                       dst=self.tra_if.local_mac) /
374                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375                                           dst=self.tra_if.local_ip4) /
376                                        ICMP(),
377                                        seq_num=35))
378         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
379                                          self.tra_if, n_rx=1)
380         replay_count += 7
381         self.assertEqual(self.get_replay_counts(p), replay_count)
382
383         #
384         # now move the window over to 257 (more than one byte) and into Case A
385         #
386         self.vapi.cli("clear error")
387         pkt = (Ether(src=self.tra_if.remote_mac,
388                      dst=self.tra_if.local_mac) /
389                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
390                                          dst=self.tra_if.local_ip4) /
391                                       ICMP(),
392                                       seq_num=257))
393         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
394
395         # replayed packets are dropped
396         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
397         replay_count += 3
398         self.assertEqual(self.get_replay_counts(p), replay_count)
399
400         # the window size is 64 packets
401         # in window are still accepted
402         pkt = (Ether(src=self.tra_if.remote_mac,
403                      dst=self.tra_if.local_mac) /
404                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
405                                          dst=self.tra_if.local_ip4) /
406                                       ICMP(),
407                                       seq_num=200))
408         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
409
410         # a packet that does not decrypt does not move the window forward
411         bogus_sa = SecurityAssociation(self.encryption_type,
412                                        p.vpp_tra_spi,
413                                        crypt_algo=p.crypt_algo,
414                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
415                                        auth_algo=p.auth_algo,
416                                        auth_key=p.auth_key[::-1])
417         pkt = (Ether(src=self.tra_if.remote_mac,
418                      dst=self.tra_if.local_mac) /
419                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
420                                    dst=self.tra_if.local_ip4) /
421                                 ICMP(),
422                                 seq_num=350))
423         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
424
425         hash_failed_count += 17
426         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
427
428         # a malformed 'runt' packet
429         #  created by a mis-constructed SA
430         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
431             bogus_sa = SecurityAssociation(self.encryption_type,
432                                            p.vpp_tra_spi)
433             pkt = (Ether(src=self.tra_if.remote_mac,
434                          dst=self.tra_if.local_mac) /
435                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
436                                        dst=self.tra_if.local_ip4) /
437                                     ICMP(),
438                                     seq_num=350))
439             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
440
441             undersize_count += 17
442             self.assert_error_counter_equal(undersize_node_name,
443                                             undersize_count)
444
445         # which we can determine since this packet is still in the window
446         pkt = (Ether(src=self.tra_if.remote_mac,
447                      dst=self.tra_if.local_mac) /
448                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
449                                          dst=self.tra_if.local_ip4) /
450                                       ICMP(),
451                                       seq_num=234))
452         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
453
454         #
455         # out of window are dropped
456         #  this is Case B. So VPP will consider this to be a high seq num wrap
457         #  and so the decrypt attempt will fail
458         #
459         pkt = (Ether(src=self.tra_if.remote_mac,
460                      dst=self.tra_if.local_mac) /
461                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
462                                          dst=self.tra_if.local_ip4) /
463                                       ICMP(),
464                                       seq_num=17))
465         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
466
467         if esn_en:
468             # an out of window error with ESN looks like a high sequence
469             # wrap. but since it isn't then the verify will fail.
470             hash_failed_count += 17
471             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
472
473         else:
474             replay_count += 17
475             self.assertEqual(self.get_replay_counts(p), replay_count)
476
477         # valid packet moves the window over to 258
478         pkt = (Ether(src=self.tra_if.remote_mac,
479                      dst=self.tra_if.local_mac) /
480                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
481                                          dst=self.tra_if.local_ip4) /
482                                       ICMP(),
483                                       seq_num=258))
484         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
485         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
486
487         #
488         # move VPP's SA TX seq-num to just before the seq-number wrap.
489         # then fire in a packet that VPP should drop on TX because it
490         # causes the TX seq number to wrap; unless we're using extened sequence
491         # numbers.
492         #
493         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
494         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
495         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
496
497         pkts = [(Ether(src=self.tra_if.remote_mac,
498                        dst=self.tra_if.local_mac) /
499                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
500                                            dst=self.tra_if.local_ip4) /
501                                         ICMP(),
502                                         seq_num=seq))
503                 for seq in range(259, 280)]
504
505         if esn_en:
506             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
507
508             #
509             # in order for scapy to decrypt its SA's high order number needs
510             # to wrap
511             #
512             p.vpp_tra_sa.seq_num = 0x100000000
513             for rx in rxs:
514                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
515
516             #
517             # wrap scapy's TX high sequence number. VPP is in case B, so it
518             # will consider this a high seq wrap also.
519             # The low seq num we set it to will place VPP's RX window in Case A
520             #
521             p.scapy_tra_sa.seq_num = 0x100000005
522             pkt = (Ether(src=self.tra_if.remote_mac,
523                          dst=self.tra_if.local_mac) /
524                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
525                                              dst=self.tra_if.local_ip4) /
526                                           ICMP(),
527                                           seq_num=0x100000005))
528             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
529             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
530
531             #
532             # A packet that has seq num between (2^32-64) and 5 is within
533             # the window
534             #
535             p.scapy_tra_sa.seq_num = 0xfffffffd
536             pkt = (Ether(src=self.tra_if.remote_mac,
537                          dst=self.tra_if.local_mac) /
538                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
539                                              dst=self.tra_if.local_ip4) /
540                                           ICMP(),
541                                           seq_num=0xfffffffd))
542             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
543             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
544
545             #
546             # While in case A we cannot wrap the high sequence number again
547             # becuase VPP will consider this packet to be one that moves the
548             # window forward
549             #
550             pkt = (Ether(src=self.tra_if.remote_mac,
551                          dst=self.tra_if.local_mac) /
552                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
553                                              dst=self.tra_if.local_ip4) /
554                                           ICMP(),
555                                           seq_num=0x200000999))
556             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
557
558             hash_failed_count += 1
559             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
560
561             #
562             # but if we move the wondow forward to case B, then we can wrap
563             # again
564             #
565             p.scapy_tra_sa.seq_num = 0x100000555
566             pkt = (Ether(src=self.tra_if.remote_mac,
567                          dst=self.tra_if.local_mac) /
568                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
569                                              dst=self.tra_if.local_ip4) /
570                                           ICMP(),
571                                           seq_num=0x100000555))
572             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
573             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
574
575             p.scapy_tra_sa.seq_num = 0x200000444
576             pkt = (Ether(src=self.tra_if.remote_mac,
577                          dst=self.tra_if.local_mac) /
578                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
579                                              dst=self.tra_if.local_ip4) /
580                                           ICMP(),
581                                           seq_num=0x200000444))
582             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
583             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
584
585         else:
586             #
587             # without ESN TX sequence numbers can't wrap and packets are
588             # dropped from here on out.
589             #
590             self.send_and_assert_no_replies(self.tra_if, pkts)
591             seq_cycle_count += len(pkts)
592             self.assert_error_counter_equal(seq_cycle_node_name,
593                                             seq_cycle_count)
594
595         # move the security-associations seq number on to the last we used
596         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
597         p.scapy_tra_sa.seq_num = 351
598         p.vpp_tra_sa.seq_num = 351
599
600     def verify_tra_basic4(self, count=1, payload_size=54):
601         """ ipsec v4 transport basic test """
602         self.vapi.cli("clear errors")
603         self.vapi.cli("clear ipsec sa")
604         try:
605             p = self.params[socket.AF_INET]
606             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
607                                               src=self.tra_if.remote_ip4,
608                                               dst=self.tra_if.local_ip4,
609                                               count=count,
610                                               payload_size=payload_size)
611             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
612                                              self.tra_if)
613             for rx in recv_pkts:
614                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
615                 self.assert_packet_checksums_valid(rx)
616                 try:
617                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
618                     self.assert_packet_checksums_valid(decrypted)
619                 except:
620                     self.logger.debug(ppp("Unexpected packet:", rx))
621                     raise
622         finally:
623             self.logger.info(self.vapi.ppcli("show error"))
624             self.logger.info(self.vapi.ppcli("show ipsec all"))
625
626         pkts = p.tra_sa_in.get_stats()['packets']
627         self.assertEqual(pkts, count,
628                          "incorrect SA in counts: expected %d != %d" %
629                          (count, pkts))
630         pkts = p.tra_sa_out.get_stats()['packets']
631         self.assertEqual(pkts, count,
632                          "incorrect SA out counts: expected %d != %d" %
633                          (count, pkts))
634
635         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
636         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
637
638
639 class IpsecTra4Tests(IpsecTra4):
640     """ UT test methods for Transport v4 """
641     def test_tra_anti_replay(self):
642         """ ipsec v4 transport anti-replay test """
643         self.verify_tra_anti_replay()
644
645     def test_tra_basic(self, count=1):
646         """ ipsec v4 transport basic test """
647         self.verify_tra_basic4(count=1)
648
649     def test_tra_burst(self):
650         """ ipsec v4 transport burst test """
651         self.verify_tra_basic4(count=257)
652
653
654 class IpsecTra6(object):
655     """ verify methods for Transport v6 """
656     def verify_tra_basic6(self, count=1, payload_size=54):
657         self.vapi.cli("clear errors")
658         self.vapi.cli("clear ipsec sa")
659         try:
660             p = self.params[socket.AF_INET6]
661             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
662                                                src=self.tra_if.remote_ip6,
663                                                dst=self.tra_if.local_ip6,
664                                                count=count,
665                                                payload_size=payload_size)
666             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
667                                              self.tra_if)
668             for rx in recv_pkts:
669                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
670                                  rx[IPv6].plen)
671                 try:
672                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
673                     self.assert_packet_checksums_valid(decrypted)
674                 except:
675                     self.logger.debug(ppp("Unexpected packet:", rx))
676                     raise
677         finally:
678             self.logger.info(self.vapi.ppcli("show error"))
679             self.logger.info(self.vapi.ppcli("show ipsec all"))
680
681         pkts = p.tra_sa_in.get_stats()['packets']
682         self.assertEqual(pkts, count,
683                          "incorrect SA in counts: expected %d != %d" %
684                          (count, pkts))
685         pkts = p.tra_sa_out.get_stats()['packets']
686         self.assertEqual(pkts, count,
687                          "incorrect SA out counts: expected %d != %d" %
688                          (count, pkts))
689         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
690         self.assert_packet_counter_equal(self.tra6_decrypt_node_name[0], count)
691
692     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
693                                    payload_size=54):
694         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
695                 sa.encrypt(IPv6(src=src, dst=dst) /
696                            ICMPv6EchoRequest(id=0, seq=1,
697                                              data='X' * payload_size))
698                 for i in range(count)]
699
700     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
701         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
702                 IPv6(src=src, dst=dst) /
703                 IPv6ExtHdrHopByHop() /
704                 IPv6ExtHdrFragment(id=2, offset=200) /
705                 Raw(b'\xff' * 200)
706                 for i in range(count)]
707
708     def verify_tra_encrypted6(self, p, sa, rxs):
709         decrypted = []
710         for rx in rxs:
711             self.assert_packet_checksums_valid(rx)
712             try:
713                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
714                 decrypted.append(decrypt_pkt)
715                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
716                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
717             except:
718                 self.logger.debug(ppp("Unexpected packet:", rx))
719                 try:
720                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
721                 except:
722                     pass
723                 raise
724         return decrypted
725
726     def verify_tra_66_ext_hdrs(self, p):
727         count = 63
728
729         #
730         # check we can decrypt with options
731         #
732         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
733                                              src=self.tra_if.remote_ip6,
734                                              dst=self.tra_if.local_ip6,
735                                              count=count)
736         self.send_and_expect(self.tra_if, tx, self.tra_if)
737
738         #
739         # injecting a packet from ourselves to be routed of box is a hack
740         # but it matches an outbout policy, alors je ne regrette rien
741         #
742
743         # one extension before ESP
744         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
745               IPv6(src=self.tra_if.local_ip6,
746                    dst=self.tra_if.remote_ip6) /
747               IPv6ExtHdrFragment(id=2, offset=200) /
748               Raw(b'\xff' * 200))
749
750         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
751         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
752
753         for dc in dcs:
754             # for reasons i'm not going to investigate scapy does not
755             # created the correct headers after decrypt. but reparsing
756             # the ipv6 packet fixes it
757             dc = IPv6(raw(dc[IPv6]))
758             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
759
760         # two extensions before ESP
761         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
762               IPv6(src=self.tra_if.local_ip6,
763                    dst=self.tra_if.remote_ip6) /
764               IPv6ExtHdrHopByHop() /
765               IPv6ExtHdrFragment(id=2, offset=200) /
766               Raw(b'\xff' * 200))
767
768         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
769         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
770
771         for dc in dcs:
772             dc = IPv6(raw(dc[IPv6]))
773             self.assertTrue(dc[IPv6ExtHdrHopByHop])
774             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
775
776         # two extensions before ESP, one after
777         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
778               IPv6(src=self.tra_if.local_ip6,
779                    dst=self.tra_if.remote_ip6) /
780               IPv6ExtHdrHopByHop() /
781               IPv6ExtHdrFragment(id=2, offset=200) /
782               IPv6ExtHdrDestOpt() /
783               Raw(b'\xff' * 200))
784
785         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
786         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
787
788         for dc in dcs:
789             dc = IPv6(raw(dc[IPv6]))
790             self.assertTrue(dc[IPv6ExtHdrDestOpt])
791             self.assertTrue(dc[IPv6ExtHdrHopByHop])
792             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
793
794
795 class IpsecTra6Tests(IpsecTra6):
796     """ UT test methods for Transport v6 """
797     def test_tra_basic6(self):
798         """ ipsec v6 transport basic test """
799         self.verify_tra_basic6(count=1)
800
801     def test_tra_burst6(self):
802         """ ipsec v6 transport burst test """
803         self.verify_tra_basic6(count=257)
804
805
806 class IpsecTra6ExtTests(IpsecTra6):
807     def test_tra_ext_hdrs_66(self):
808         """ ipsec 6o6 tra extension headers test """
809         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
810
811
812 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
813     """ UT test methods for Transport v6 and v4"""
814     pass
815
816
817 class IpsecTun4(object):
818     """ verify methods for Tunnel v4 """
819     def verify_counters4(self, p, count, n_frags=None, worker=None):
820         if not n_frags:
821             n_frags = count
822         if (hasattr(p, "spd_policy_in_any")):
823             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
824             self.assertEqual(pkts, count,
825                              "incorrect SPD any policy: expected %d != %d" %
826                              (count, pkts))
827
828         if (hasattr(p, "tun_sa_in")):
829             pkts = p.tun_sa_in.get_stats(worker)['packets']
830             self.assertEqual(pkts, count,
831                              "incorrect SA in counts: expected %d != %d" %
832                              (count, pkts))
833             pkts = p.tun_sa_out.get_stats(worker)['packets']
834             self.assertEqual(pkts, n_frags,
835                              "incorrect SA out counts: expected %d != %d" %
836                              (count, pkts))
837
838         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
839         self.assert_packet_counter_equal(self.tun4_decrypt_node_name[0], count)
840
841     def verify_decrypted(self, p, rxs):
842         for rx in rxs:
843             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
844             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
845             self.assert_packet_checksums_valid(rx)
846
847     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
848         align = sa.crypt_algo.block_size
849         if align < 4:
850             align = 4
851         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
852         exp_len += sa.crypt_algo.iv_size
853         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
854         self.assertEqual(exp_len, len(esp_payload))
855
856     def verify_encrypted(self, p, sa, rxs):
857         decrypt_pkts = []
858         for rx in rxs:
859             if p.nat_header:
860                 self.assertEqual(rx[UDP].dport, 4500)
861             self.assert_packet_checksums_valid(rx)
862             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
863             try:
864                 rx_ip = rx[IP]
865                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
866                 if not decrypt_pkt.haslayer(IP):
867                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
868                 if rx_ip.proto == socket.IPPROTO_ESP:
869                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
870                 decrypt_pkts.append(decrypt_pkt)
871                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
872                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
873             except:
874                 self.logger.debug(ppp("Unexpected packet:", rx))
875                 try:
876                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
877                 except:
878                     pass
879                 raise
880         pkts = reassemble4(decrypt_pkts)
881         for pkt in pkts:
882             self.assert_packet_checksums_valid(pkt)
883
884     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
885         self.vapi.cli("clear errors")
886         self.vapi.cli("clear ipsec counters")
887         self.vapi.cli("clear ipsec sa")
888         if not n_rx:
889             n_rx = count
890         try:
891             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
892                                               src=p.remote_tun_if_host,
893                                               dst=self.pg1.remote_ip4,
894                                               count=count,
895                                               payload_size=payload_size)
896             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
897             self.verify_decrypted(p, recv_pkts)
898
899             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
900                                       dst=p.remote_tun_if_host, count=count,
901                                       payload_size=payload_size)
902             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
903                                              self.tun_if, n_rx)
904             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
905
906             for rx in recv_pkts:
907                 self.assertEqual(rx[IP].src, p.tun_src)
908                 self.assertEqual(rx[IP].dst, p.tun_dst)
909
910         finally:
911             self.logger.info(self.vapi.ppcli("show error"))
912             self.logger.info(self.vapi.ppcli("show ipsec all"))
913
914         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
915         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
916         self.verify_counters4(p, count, n_rx)
917
918     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
919         self.vapi.cli("clear errors")
920         if not n_rx:
921             n_rx = count
922         try:
923             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
924                                               src=p.remote_tun_if_host,
925                                               dst=self.pg1.remote_ip4,
926                                               count=count)
927             self.send_and_assert_no_replies(self.tun_if, send_pkts)
928
929             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
930                                       dst=p.remote_tun_if_host, count=count,
931                                       payload_size=payload_size)
932             self.send_and_assert_no_replies(self.pg1, send_pkts)
933
934         finally:
935             self.logger.info(self.vapi.ppcli("show error"))
936             self.logger.info(self.vapi.ppcli("show ipsec all"))
937
938     def verify_tun_reass_44(self, p):
939         self.vapi.cli("clear errors")
940         self.vapi.ip_reassembly_enable_disable(
941             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
942
943         try:
944             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
945                                               src=p.remote_tun_if_host,
946                                               dst=self.pg1.remote_ip4,
947                                               payload_size=1900,
948                                               count=1)
949             send_pkts = fragment_rfc791(send_pkts[0], 1400)
950             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
951                                              self.pg1, n_rx=1)
952             self.verify_decrypted(p, recv_pkts)
953
954             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
955                                       dst=p.remote_tun_if_host, count=1)
956             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
957                                              self.tun_if)
958             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
959
960         finally:
961             self.logger.info(self.vapi.ppcli("show error"))
962             self.logger.info(self.vapi.ppcli("show ipsec all"))
963
964         self.verify_counters4(p, 1, 1)
965         self.vapi.ip_reassembly_enable_disable(
966             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
967
968     def verify_tun_64(self, p, count=1):
969         self.vapi.cli("clear errors")
970         self.vapi.cli("clear ipsec sa")
971         try:
972             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
973                                                src=p.remote_tun_if_host6,
974                                                dst=self.pg1.remote_ip6,
975                                                count=count)
976             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
977             for recv_pkt in recv_pkts:
978                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
979                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
980                 self.assert_packet_checksums_valid(recv_pkt)
981             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
982                                        dst=p.remote_tun_if_host6, count=count)
983             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
984             for recv_pkt in recv_pkts:
985                 try:
986                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
987                     if not decrypt_pkt.haslayer(IPv6):
988                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
989                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
990                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
991                     self.assert_packet_checksums_valid(decrypt_pkt)
992                 except:
993                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
994                     try:
995                         self.logger.debug(
996                             ppp("Decrypted packet:", decrypt_pkt))
997                     except:
998                         pass
999                     raise
1000         finally:
1001             self.logger.info(self.vapi.ppcli("show error"))
1002             self.logger.info(self.vapi.ppcli("show ipsec all"))
1003
1004         self.verify_counters4(p, count)
1005
1006     def verify_keepalive(self, p):
1007         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1008                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1009                UDP(sport=333, dport=4500) /
1010                Raw(b'\xff'))
1011         self.send_and_assert_no_replies(self.tun_if, pkt*31)
1012         self.assert_error_counter_equal(
1013             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
1014
1015         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
1016                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
1017                UDP(sport=333, dport=4500) /
1018                Raw(b'\xfe'))
1019         self.send_and_assert_no_replies(self.tun_if, pkt*31)
1020         self.assert_error_counter_equal(
1021             '/err/%s/Too Short' % self.tun4_input_node, 31)
1022
1023
1024 class IpsecTun4Tests(IpsecTun4):
1025     """ UT test methods for Tunnel v4 """
1026     def test_tun_basic44(self):
1027         """ ipsec 4o4 tunnel basic test """
1028         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1029         self.tun_if.admin_down()
1030         self.tun_if.resolve_arp()
1031         self.tun_if.admin_up()
1032         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1033
1034     def test_tun_reass_basic44(self):
1035         """ ipsec 4o4 tunnel basic reassembly test """
1036         self.verify_tun_reass_44(self.params[socket.AF_INET])
1037
1038     def test_tun_burst44(self):
1039         """ ipsec 4o4 tunnel burst test """
1040         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1041
1042
1043 class IpsecTun6(object):
1044     """ verify methods for Tunnel v6 """
1045     def verify_counters6(self, p_in, p_out, count, worker=None):
1046         if (hasattr(p_in, "tun_sa_in")):
1047             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1048             self.assertEqual(pkts, count,
1049                              "incorrect SA in counts: expected %d != %d" %
1050                              (count, pkts))
1051         if (hasattr(p_out, "tun_sa_out")):
1052             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1053             self.assertEqual(pkts, count,
1054                              "incorrect SA out counts: expected %d != %d" %
1055                              (count, pkts))
1056         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1057         self.assert_packet_counter_equal(self.tun6_decrypt_node_name[0], count)
1058
1059     def verify_decrypted6(self, p, rxs):
1060         for rx in rxs:
1061             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1062             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1063             self.assert_packet_checksums_valid(rx)
1064
1065     def verify_encrypted6(self, p, sa, rxs):
1066         for rx in rxs:
1067             self.assert_packet_checksums_valid(rx)
1068             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1069                              rx[IPv6].plen)
1070             self.assert_equal(rx[IPv6].hlim, p.outer_hop_limit)
1071             if p.outer_flow_label:
1072                 self.assert_equal(rx[IPv6].fl, p.outer_flow_label)
1073             try:
1074                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1075                 if not decrypt_pkt.haslayer(IPv6):
1076                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1077                 self.assert_packet_checksums_valid(decrypt_pkt)
1078                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1079                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1080                 self.assert_equal(decrypt_pkt.hlim, p.inner_hop_limit - 1)
1081                 self.assert_equal(decrypt_pkt.fl, p.inner_flow_label)
1082             except:
1083                 self.logger.debug(ppp("Unexpected packet:", rx))
1084                 try:
1085                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1086                 except:
1087                     pass
1088                 raise
1089
1090     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1091         self.vapi.cli("clear errors")
1092         self.vapi.cli("clear ipsec sa")
1093
1094         send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1095                                            self.tun_if,
1096                                            src=p_in.remote_tun_if_host,
1097                                            dst=self.pg1.remote_ip6,
1098                                            count=count)
1099         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1100         self.logger.info(self.vapi.cli("sh punt stats"))
1101
1102     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1103         self.vapi.cli("clear errors")
1104         self.vapi.cli("clear ipsec sa")
1105         if not p_out:
1106             p_out = p_in
1107         try:
1108             send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1109                                                self.tun_if,
1110                                                src=p_in.remote_tun_if_host,
1111                                                dst=self.pg1.remote_ip6,
1112                                                count=count,
1113                                                payload_size=payload_size)
1114             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1115             self.verify_decrypted6(p_in, recv_pkts)
1116
1117             send_pkts = self.gen_pkts6(p_in, self.pg1, src=self.pg1.remote_ip6,
1118                                        dst=p_out.remote_tun_if_host,
1119                                        count=count,
1120                                        payload_size=payload_size)
1121             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1122             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1123
1124             for rx in recv_pkts:
1125                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1126                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1127
1128         finally:
1129             self.logger.info(self.vapi.ppcli("show error"))
1130             self.logger.info(self.vapi.ppcli("show ipsec all"))
1131         self.verify_counters6(p_in, p_out, count)
1132
1133     def verify_tun_reass_66(self, p):
1134         self.vapi.cli("clear errors")
1135         self.vapi.ip_reassembly_enable_disable(
1136             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1137
1138         try:
1139             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1140                                                src=p.remote_tun_if_host,
1141                                                dst=self.pg1.remote_ip6,
1142                                                count=1,
1143                                                payload_size=1850)
1144             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1145             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1146                                              self.pg1, n_rx=1)
1147             self.verify_decrypted6(p, recv_pkts)
1148
1149             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1150                                        dst=p.remote_tun_if_host,
1151                                        count=1,
1152                                        payload_size=64)
1153             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1154                                              self.tun_if)
1155             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1156         finally:
1157             self.logger.info(self.vapi.ppcli("show error"))
1158             self.logger.info(self.vapi.ppcli("show ipsec all"))
1159         self.verify_counters6(p, p, 1)
1160         self.vapi.ip_reassembly_enable_disable(
1161             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1162
1163     def verify_tun_46(self, p, count=1):
1164         """ ipsec 4o6 tunnel basic test """
1165         self.vapi.cli("clear errors")
1166         self.vapi.cli("clear ipsec sa")
1167         try:
1168             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1169                                               src=p.remote_tun_if_host4,
1170                                               dst=self.pg1.remote_ip4,
1171                                               count=count)
1172             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1173             for recv_pkt in recv_pkts:
1174                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1175                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1176                 self.assert_packet_checksums_valid(recv_pkt)
1177             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1178                                       dst=p.remote_tun_if_host4,
1179                                       count=count)
1180             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1181             for recv_pkt in recv_pkts:
1182                 try:
1183                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1184                     if not decrypt_pkt.haslayer(IP):
1185                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1186                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1187                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1188                     self.assert_packet_checksums_valid(decrypt_pkt)
1189                 except:
1190                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1191                     try:
1192                         self.logger.debug(ppp("Decrypted packet:",
1193                                               decrypt_pkt))
1194                     except:
1195                         pass
1196                     raise
1197         finally:
1198             self.logger.info(self.vapi.ppcli("show error"))
1199             self.logger.info(self.vapi.ppcli("show ipsec all"))
1200         self.verify_counters6(p, p, count)
1201
1202
1203 class IpsecTun6Tests(IpsecTun6):
1204     """ UT test methods for Tunnel v6 """
1205
1206     def test_tun_basic66(self):
1207         """ ipsec 6o6 tunnel basic test """
1208         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1209
1210     def test_tun_reass_basic66(self):
1211         """ ipsec 6o6 tunnel basic reassembly test """
1212         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1213
1214     def test_tun_burst66(self):
1215         """ ipsec 6o6 tunnel burst test """
1216         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1217
1218
1219 class IpsecTun6HandoffTests(IpsecTun6):
1220     """ UT test methods for Tunnel v6 with multiple workers """
1221     vpp_worker_count = 2
1222
1223     def test_tun_handoff_66(self):
1224         """ ipsec 6o6 tunnel worker hand-off test """
1225         self.vapi.cli("clear errors")
1226         self.vapi.cli("clear ipsec sa")
1227
1228         N_PKTS = 15
1229         p = self.params[socket.AF_INET6]
1230
1231         # inject alternately on worker 0 and 1. all counts on the SA
1232         # should be against worker 0
1233         for worker in [0, 1, 0, 1]:
1234             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1235                                                src=p.remote_tun_if_host,
1236                                                dst=self.pg1.remote_ip6,
1237                                                count=N_PKTS)
1238             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1239                                              self.pg1, worker=worker)
1240             self.verify_decrypted6(p, recv_pkts)
1241
1242             send_pkts = self.gen_pkts6(p, self.pg1, src=self.pg1.remote_ip6,
1243                                        dst=p.remote_tun_if_host,
1244                                        count=N_PKTS)
1245             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1246                                              self.tun_if, worker=worker)
1247             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1248
1249         # all counts against the first worker that was used
1250         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1251
1252
1253 class IpsecTun4HandoffTests(IpsecTun4):
1254     """ UT test methods for Tunnel v4 with multiple workers """
1255     vpp_worker_count = 2
1256
1257     def test_tun_handooff_44(self):
1258         """ ipsec 4o4 tunnel worker hand-off test """
1259         self.vapi.cli("clear errors")
1260         self.vapi.cli("clear ipsec sa")
1261
1262         N_PKTS = 15
1263         p = self.params[socket.AF_INET]
1264
1265         # inject alternately on worker 0 and 1. all counts on the SA
1266         # should be against worker 0
1267         for worker in [0, 1, 0, 1]:
1268             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1269                                               src=p.remote_tun_if_host,
1270                                               dst=self.pg1.remote_ip4,
1271                                               count=N_PKTS)
1272             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1273                                              self.pg1, worker=worker)
1274             self.verify_decrypted(p, recv_pkts)
1275
1276             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1277                                       dst=p.remote_tun_if_host,
1278                                       count=N_PKTS)
1279             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1280                                              self.tun_if, worker=worker)
1281             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1282
1283         # all counts against the first worker that was used
1284         self.verify_counters4(p, 4*N_PKTS, worker=0)
1285
1286
1287 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1288     """ UT test methods for Tunnel v6 & v4 """
1289     pass
1290
1291
1292 if __name__ == '__main__':
1293     unittest.main(testRunner=VppTestRunner)