ipsec: AH copy destination and source address from template
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params(object):
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 10
31         self.scapy_tun_spi = 1001
32         self.vpp_tun_sa_id = 20
33         self.vpp_tun_spi = 1000
34
35         self.scapy_tra_sa_id = 30
36         self.scapy_tra_spi = 2001
37         self.vpp_tra_sa_id = 40
38         self.vpp_tra_spi = 2000
39
40         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41                                  IPSEC_API_INTEG_ALG_SHA1_96)
42         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
43         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
44
45         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47         self.crypt_algo = 'AES-CBC'  # scapy name
48         self.crypt_key = b'JPjyOWBeVEQiMe7h'
49         self.salt = 0
50         self.flags = 0
51         self.nat_header = None
52
53
54 class IPsecIPv6Params(object):
55
56     addr_type = socket.AF_INET6
57     addr_any = "0::0"
58     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
59     addr_len = 128
60     is_ipv6 = 1
61
62     def __init__(self):
63         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
64         self.remote_tun_if_host4 = '1.1.1.1'
65
66         self.scapy_tun_sa_id = 50
67         self.scapy_tun_spi = 3001
68         self.vpp_tun_sa_id = 60
69         self.vpp_tun_spi = 3000
70
71         self.scapy_tra_sa_id = 70
72         self.scapy_tra_spi = 4001
73         self.vpp_tra_sa_id = 80
74         self.vpp_tra_spi = 4000
75
76         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
77                                  IPSEC_API_INTEG_ALG_SHA1_96)
78         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
79         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
80
81         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
82                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
83         self.crypt_algo = 'AES-CBC'  # scapy name
84         self.crypt_key = b'JPjyOWBeVEQiMe7h'
85         self.salt = 0
86         self.flags = 0
87         self.nat_header = None
88
89
90 def mk_scapy_crypt_key(p):
91     if p.crypt_algo == "AES-GCM":
92         return p.crypt_key + struct.pack("!I", p.salt)
93     else:
94         return p.crypt_key
95
96
97 def config_tun_params(p, encryption_type, tun_if):
98     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
99     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
100                              IPSEC_API_SAD_FLAG_USE_ESN))
101     p.tun_dst = tun_if.remote_addr[p.addr_type]
102     p.tun_src = tun_if.local_addr[p.addr_type]
103     crypt_key = mk_scapy_crypt_key(p)
104     p.scapy_tun_sa = SecurityAssociation(
105         encryption_type, spi=p.vpp_tun_spi,
106         crypt_algo=p.crypt_algo,
107         crypt_key=crypt_key,
108         auth_algo=p.auth_algo, auth_key=p.auth_key,
109         tunnel_header=ip_class_by_addr_type[p.addr_type](
110             src=p.tun_dst,
111             dst=p.tun_src),
112         nat_t_header=p.nat_header,
113         esn_en=esn_en)
114     p.vpp_tun_sa = SecurityAssociation(
115         encryption_type, spi=p.scapy_tun_spi,
116         crypt_algo=p.crypt_algo,
117         crypt_key=crypt_key,
118         auth_algo=p.auth_algo, auth_key=p.auth_key,
119         tunnel_header=ip_class_by_addr_type[p.addr_type](
120             dst=p.tun_dst,
121             src=p.tun_src),
122         nat_t_header=p.nat_header,
123         esn_en=esn_en)
124
125
126 def config_tra_params(p, encryption_type):
127     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
128                              IPSEC_API_SAD_FLAG_USE_ESN))
129     crypt_key = mk_scapy_crypt_key(p)
130     p.scapy_tra_sa = SecurityAssociation(
131         encryption_type,
132         spi=p.vpp_tra_spi,
133         crypt_algo=p.crypt_algo,
134         crypt_key=crypt_key,
135         auth_algo=p.auth_algo,
136         auth_key=p.auth_key,
137         nat_t_header=p.nat_header,
138         esn_en=esn_en)
139     p.vpp_tra_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.scapy_tra_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         nat_t_header=p.nat_header,
147         esn_en=esn_en)
148
149
150 class TemplateIpsec(VppTestCase):
151     """
152     TRANSPORT MODE:
153
154      ------   encrypt   ---
155     |tra_if| <-------> |VPP|
156      ------   decrypt   ---
157
158     TUNNEL MODE:
159
160      ------   encrypt   ---   plain   ---
161     |tun_if| <-------  |VPP| <------ |pg1|
162      ------             ---           ---
163
164      ------   decrypt   ---   plain   ---
165     |tun_if| ------->  |VPP| ------> |pg1|
166      ------             ---           ---
167     """
168     tun_spd_id = 1
169     tra_spd_id = 2
170
171     def ipsec_select_backend(self):
172         """ empty method to be overloaded when necessary """
173         pass
174
175     @classmethod
176     def setUpClass(cls):
177         super(TemplateIpsec, cls).setUpClass()
178
179     @classmethod
180     def tearDownClass(cls):
181         super(TemplateIpsec, cls).tearDownClass()
182
183     def setup_params(self):
184         self.ipv4_params = IPsecIPv4Params()
185         self.ipv6_params = IPsecIPv6Params()
186         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
187                        self.ipv6_params.addr_type: self.ipv6_params}
188
189     def config_interfaces(self):
190         self.create_pg_interfaces(range(3))
191         self.interfaces = list(self.pg_interfaces)
192         for i in self.interfaces:
193             i.admin_up()
194             i.config_ip4()
195             i.resolve_arp()
196             i.config_ip6()
197             i.resolve_ndp()
198
199     def setUp(self):
200         super(TemplateIpsec, self).setUp()
201
202         self.setup_params()
203
204         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
205                                  IPSEC_API_PROTO_ESP)
206         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
207                                 IPSEC_API_PROTO_AH)
208
209         self.config_interfaces()
210
211         self.ipsec_select_backend()
212
213     def unconfig_interfaces(self):
214         for i in self.interfaces:
215             i.admin_down()
216             i.unconfig_ip4()
217             i.unconfig_ip6()
218
219     def tearDown(self):
220         super(TemplateIpsec, self).tearDown()
221
222         self.unconfig_interfaces()
223
224     def show_commands_at_teardown(self):
225         self.logger.info(self.vapi.cli("show hardware"))
226
227     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
228                          payload_size=54):
229         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
230                 sa.encrypt(IP(src=src, dst=dst) /
231                            ICMP() / Raw(b'X' * payload_size))
232                 for i in range(count)]
233
234     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
235                           payload_size=54):
236         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
237                 sa.encrypt(IPv6(src=src, dst=dst) /
238                            ICMPv6EchoRequest(id=0, seq=1,
239                                              data='X' * payload_size))
240                 for i in range(count)]
241
242     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
245                 for i in range(count)]
246
247     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
248         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
249                 IPv6(src=src, dst=dst) /
250                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
251                 for i in range(count)]
252
253
254 class IpsecTcp(object):
255     def verify_tcp_checksum(self):
256         self.vapi.cli("test http server")
257         p = self.params[socket.AF_INET]
258         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
259                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
260                                           dst=self.tun_if.local_ip4) /
261                                        TCP(flags='S', dport=80)))
262         self.logger.debug(ppp("Sending packet:", send))
263         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
264         recv = recv[0]
265         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
266         self.assert_packet_checksums_valid(decrypted)
267
268
269 class IpsecTcpTests(IpsecTcp):
270     def test_tcp_checksum(self):
271         """ verify checksum correctness for vpp generated packets """
272         self.verify_tcp_checksum()
273
274
275 class IpsecTra4(object):
276     """ verify methods for Transport v4 """
277     def verify_tra_anti_replay(self):
278         p = self.params[socket.AF_INET]
279         esn_en = p.vpp_tra_sa.esn_en
280
281         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
282                                self.tra4_encrypt_node_name)
283         replay_node_name = ('/err/%s/SA replayed packet' %
284                             self.tra4_decrypt_node_name)
285         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
286             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
287                                      self.tra4_decrypt_node_name)
288         else:
289             hash_failed_node_name = ('/err/%s/Integrity check failed' %
290                                      self.tra4_decrypt_node_name)
291         replay_count = self.statistics.get_err_counter(replay_node_name)
292         hash_failed_count = self.statistics.get_err_counter(
293             hash_failed_node_name)
294         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
295
296         if ESP == self.encryption_type:
297             undersize_node_name = ('/err/%s/undersized packet' %
298                                    self.tra4_decrypt_node_name)
299             undersize_count = self.statistics.get_err_counter(
300                 undersize_node_name)
301
302         #
303         # send packets with seq numbers 1->34
304         # this means the window size is still in Case B (see RFC4303
305         # Appendix A)
306         #
307         # for reasons i haven't investigated Scapy won't create a packet with
308         # seq_num=0
309         #
310         pkts = [(Ether(src=self.tra_if.remote_mac,
311                        dst=self.tra_if.local_mac) /
312                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
313                                            dst=self.tra_if.local_ip4) /
314                                         ICMP(),
315                                         seq_num=seq))
316                 for seq in range(1, 34)]
317         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
318
319         # replayed packets are dropped
320         self.send_and_assert_no_replies(self.tra_if, pkts)
321         replay_count += len(pkts)
322         self.assert_error_counter_equal(replay_node_name, replay_count)
323
324         #
325         # now send a batch of packets all with the same sequence number
326         # the first packet in the batch is legitimate, the rest bogus
327         #
328         pkts = (Ether(src=self.tra_if.remote_mac,
329                       dst=self.tra_if.local_mac) /
330                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
331                                           dst=self.tra_if.local_ip4) /
332                                        ICMP(),
333                                        seq_num=35))
334         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
335                                          self.tra_if, n_rx=1)
336         replay_count += 7
337         self.assert_error_counter_equal(replay_node_name, replay_count)
338
339         #
340         # now move the window over to 257 (more than one byte) and into Case A
341         #
342         pkt = (Ether(src=self.tra_if.remote_mac,
343                      dst=self.tra_if.local_mac) /
344                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
345                                          dst=self.tra_if.local_ip4) /
346                                       ICMP(),
347                                       seq_num=257))
348         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
349
350         # replayed packets are dropped
351         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
352         replay_count += 3
353         self.assert_error_counter_equal(replay_node_name, replay_count)
354
355         # the window size is 64 packets
356         # in window are still accepted
357         pkt = (Ether(src=self.tra_if.remote_mac,
358                      dst=self.tra_if.local_mac) /
359                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
360                                          dst=self.tra_if.local_ip4) /
361                                       ICMP(),
362                                       seq_num=200))
363         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
364
365         # a packet that does not decrypt does not move the window forward
366         bogus_sa = SecurityAssociation(self.encryption_type,
367                                        p.vpp_tra_spi,
368                                        crypt_algo=p.crypt_algo,
369                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
370                                        auth_algo=p.auth_algo,
371                                        auth_key=p.auth_key[::-1])
372         pkt = (Ether(src=self.tra_if.remote_mac,
373                      dst=self.tra_if.local_mac) /
374                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375                                    dst=self.tra_if.local_ip4) /
376                                 ICMP(),
377                                 seq_num=350))
378         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
379
380         hash_failed_count += 17
381         self.assert_error_counter_equal(hash_failed_node_name,
382                                         hash_failed_count)
383
384         # a malformed 'runt' packet
385         #  created by a mis-constructed SA
386         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
387             bogus_sa = SecurityAssociation(self.encryption_type,
388                                            p.vpp_tra_spi)
389             pkt = (Ether(src=self.tra_if.remote_mac,
390                          dst=self.tra_if.local_mac) /
391                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
392                                        dst=self.tra_if.local_ip4) /
393                                     ICMP(),
394                                     seq_num=350))
395             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
396
397             undersize_count += 17
398             self.assert_error_counter_equal(undersize_node_name,
399                                             undersize_count)
400
401         # which we can determine since this packet is still in the window
402         pkt = (Ether(src=self.tra_if.remote_mac,
403                      dst=self.tra_if.local_mac) /
404                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
405                                          dst=self.tra_if.local_ip4) /
406                                       ICMP(),
407                                       seq_num=234))
408         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
409
410         #
411         # out of window are dropped
412         #  this is Case B. So VPP will consider this to be a high seq num wrap
413         #  and so the decrypt attempt will fail
414         #
415         pkt = (Ether(src=self.tra_if.remote_mac,
416                      dst=self.tra_if.local_mac) /
417                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
418                                          dst=self.tra_if.local_ip4) /
419                                       ICMP(),
420                                       seq_num=17))
421         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
422
423         if esn_en:
424             # an out of window error with ESN looks like a high sequence
425             # wrap. but since it isn't then the verify will fail.
426             hash_failed_count += 17
427             self.assert_error_counter_equal(hash_failed_node_name,
428                                             hash_failed_count)
429
430         else:
431             replay_count += 17
432             self.assert_error_counter_equal(replay_node_name,
433                                             replay_count)
434
435         # valid packet moves the window over to 258
436         pkt = (Ether(src=self.tra_if.remote_mac,
437                      dst=self.tra_if.local_mac) /
438                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439                                          dst=self.tra_if.local_ip4) /
440                                       ICMP(),
441                                       seq_num=258))
442         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
443         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
444
445         #
446         # move VPP's SA TX seq-num to just before the seq-number wrap.
447         # then fire in a packet that VPP should drop on TX because it
448         # causes the TX seq number to wrap; unless we're using extened sequence
449         # numbers.
450         #
451         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
452         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
453         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
454
455         pkts = [(Ether(src=self.tra_if.remote_mac,
456                        dst=self.tra_if.local_mac) /
457                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
458                                            dst=self.tra_if.local_ip4) /
459                                         ICMP(),
460                                         seq_num=seq))
461                 for seq in range(259, 280)]
462
463         if esn_en:
464             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
465
466             #
467             # in order for scapy to decrypt its SA's high order number needs
468             # to wrap
469             #
470             p.vpp_tra_sa.seq_num = 0x100000000
471             for rx in rxs:
472                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
473
474             #
475             # wrap scapy's TX high sequence number. VPP is in case B, so it
476             # will consider this a high seq wrap also.
477             # The low seq num we set it to will place VPP's RX window in Case A
478             #
479             p.scapy_tra_sa.seq_num = 0x100000005
480             pkt = (Ether(src=self.tra_if.remote_mac,
481                          dst=self.tra_if.local_mac) /
482                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
483                                              dst=self.tra_if.local_ip4) /
484                                           ICMP(),
485                                           seq_num=0x100000005))
486             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
487             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
488
489             #
490             # A packet that has seq num between (2^32-64) and 5 is within
491             # the window
492             #
493             p.scapy_tra_sa.seq_num = 0xfffffffd
494             pkt = (Ether(src=self.tra_if.remote_mac,
495                          dst=self.tra_if.local_mac) /
496                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
497                                              dst=self.tra_if.local_ip4) /
498                                           ICMP(),
499                                           seq_num=0xfffffffd))
500             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
501             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
502
503             #
504             # While in case A we cannot wrap the high sequence number again
505             # becuase VPP will consider this packet to be one that moves the
506             # window forward
507             #
508             pkt = (Ether(src=self.tra_if.remote_mac,
509                          dst=self.tra_if.local_mac) /
510                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
511                                              dst=self.tra_if.local_ip4) /
512                                           ICMP(),
513                                           seq_num=0x200000999))
514             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
515
516             hash_failed_count += 1
517             self.assert_error_counter_equal(hash_failed_node_name,
518                                             hash_failed_count)
519
520             #
521             # but if we move the wondow forward to case B, then we can wrap
522             # again
523             #
524             p.scapy_tra_sa.seq_num = 0x100000555
525             pkt = (Ether(src=self.tra_if.remote_mac,
526                          dst=self.tra_if.local_mac) /
527                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
528                                              dst=self.tra_if.local_ip4) /
529                                           ICMP(),
530                                           seq_num=0x100000555))
531             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
532             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
533
534             p.scapy_tra_sa.seq_num = 0x200000444
535             pkt = (Ether(src=self.tra_if.remote_mac,
536                          dst=self.tra_if.local_mac) /
537                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
538                                              dst=self.tra_if.local_ip4) /
539                                           ICMP(),
540                                           seq_num=0x200000444))
541             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
542             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
543
544         else:
545             #
546             # without ESN TX sequence numbers can't wrap and packets are
547             # dropped from here on out.
548             #
549             self.send_and_assert_no_replies(self.tra_if, pkts)
550             seq_cycle_count += len(pkts)
551             self.assert_error_counter_equal(seq_cycle_node_name,
552                                             seq_cycle_count)
553
554         # move the security-associations seq number on to the last we used
555         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
556         p.scapy_tra_sa.seq_num = 351
557         p.vpp_tra_sa.seq_num = 351
558
559     def verify_tra_basic4(self, count=1):
560         """ ipsec v4 transport basic test """
561         self.vapi.cli("clear errors")
562         self.vapi.cli("clear ipsec sa")
563         try:
564             p = self.params[socket.AF_INET]
565             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
566                                               src=self.tra_if.remote_ip4,
567                                               dst=self.tra_if.local_ip4,
568                                               count=count)
569             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
570                                              self.tra_if)
571             for rx in recv_pkts:
572                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
573                 self.assert_packet_checksums_valid(rx)
574                 try:
575                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
576                     self.assert_packet_checksums_valid(decrypted)
577                 except:
578                     self.logger.debug(ppp("Unexpected packet:", rx))
579                     raise
580         finally:
581             self.logger.info(self.vapi.ppcli("show error"))
582             self.logger.info(self.vapi.ppcli("show ipsec all"))
583
584         pkts = p.tra_sa_in.get_stats()['packets']
585         self.assertEqual(pkts, count,
586                          "incorrect SA in counts: expected %d != %d" %
587                          (count, pkts))
588         pkts = p.tra_sa_out.get_stats()['packets']
589         self.assertEqual(pkts, count,
590                          "incorrect SA out counts: expected %d != %d" %
591                          (count, pkts))
592
593         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
594         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
595
596
597 class IpsecTra4Tests(IpsecTra4):
598     """ UT test methods for Transport v4 """
599     def test_tra_anti_replay(self):
600         """ ipsec v4 transport anti-reply test """
601         self.verify_tra_anti_replay()
602
603     def test_tra_basic(self, count=1):
604         """ ipsec v4 transport basic test """
605         self.verify_tra_basic4(count=1)
606
607     def test_tra_burst(self):
608         """ ipsec v4 transport burst test """
609         self.verify_tra_basic4(count=257)
610
611
612 class IpsecTra6(object):
613     """ verify methods for Transport v6 """
614     def verify_tra_basic6(self, count=1):
615         self.vapi.cli("clear errors")
616         try:
617             p = self.params[socket.AF_INET6]
618             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
619                                                src=self.tra_if.remote_ip6,
620                                                dst=self.tra_if.local_ip6,
621                                                count=count)
622             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
623                                              self.tra_if)
624             for rx in recv_pkts:
625                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
626                                  rx[IPv6].plen)
627                 try:
628                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
629                     self.assert_packet_checksums_valid(decrypted)
630                 except:
631                     self.logger.debug(ppp("Unexpected packet:", rx))
632                     raise
633         finally:
634             self.logger.info(self.vapi.ppcli("show error"))
635             self.logger.info(self.vapi.ppcli("show ipsec all"))
636
637         pkts = p.tra_sa_in.get_stats()['packets']
638         self.assertEqual(pkts, count,
639                          "incorrect SA in counts: expected %d != %d" %
640                          (count, pkts))
641         pkts = p.tra_sa_out.get_stats()['packets']
642         self.assertEqual(pkts, count,
643                          "incorrect SA out counts: expected %d != %d" %
644                          (count, pkts))
645         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
646         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
647
648     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
649                                    payload_size=54):
650         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
651                 sa.encrypt(IPv6(src=src, dst=dst) /
652                            ICMPv6EchoRequest(id=0, seq=1,
653                                              data='X' * payload_size))
654                 for i in range(count)]
655
656     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
657         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
658                 IPv6(src=src, dst=dst) /
659                 IPv6ExtHdrHopByHop() /
660                 IPv6ExtHdrFragment(id=2, offset=200) /
661                 Raw(b'\xff' * 200)
662                 for i in range(count)]
663
664     def verify_tra_encrypted6(self, p, sa, rxs):
665         decrypted = []
666         for rx in rxs:
667             self.assert_packet_checksums_valid(rx)
668             try:
669                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
670                 decrypted.append(decrypt_pkt)
671                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
672                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
673             except:
674                 self.logger.debug(ppp("Unexpected packet:", rx))
675                 try:
676                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
677                 except:
678                     pass
679                 raise
680         return decrypted
681
682     def verify_tra_66_ext_hdrs(self, p):
683         count = 63
684
685         #
686         # check we can decrypt with options
687         #
688         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
689                                              src=self.tra_if.remote_ip6,
690                                              dst=self.tra_if.local_ip6,
691                                              count=count)
692         self.send_and_expect(self.tra_if, tx, self.tra_if)
693
694         #
695         # injecting a packet from ourselves to be routed of box is a hack
696         # but it matches an outbout policy, alors je ne regrette rien
697         #
698
699         # one extension before ESP
700         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
701               IPv6(src=self.tra_if.local_ip6,
702                    dst=self.tra_if.remote_ip6) /
703               IPv6ExtHdrFragment(id=2, offset=200) /
704               Raw(b'\xff' * 200))
705
706         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
707         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
708
709         for dc in dcs:
710             # for reasons i'm not going to investigate scapy does not
711             # created the correct headers after decrypt. but reparsing
712             # the ipv6 packet fixes it
713             dc = IPv6(raw(dc[IPv6]))
714             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
715
716         # two extensions before ESP
717         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
718               IPv6(src=self.tra_if.local_ip6,
719                    dst=self.tra_if.remote_ip6) /
720               IPv6ExtHdrHopByHop() /
721               IPv6ExtHdrFragment(id=2, offset=200) /
722               Raw(b'\xff' * 200))
723
724         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
725         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
726
727         for dc in dcs:
728             dc = IPv6(raw(dc[IPv6]))
729             self.assertTrue(dc[IPv6ExtHdrHopByHop])
730             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
731
732         # two extensions before ESP, one after
733         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
734               IPv6(src=self.tra_if.local_ip6,
735                    dst=self.tra_if.remote_ip6) /
736               IPv6ExtHdrHopByHop() /
737               IPv6ExtHdrFragment(id=2, offset=200) /
738               IPv6ExtHdrDestOpt() /
739               Raw(b'\xff' * 200))
740
741         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
742         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
743
744         for dc in dcs:
745             dc = IPv6(raw(dc[IPv6]))
746             self.assertTrue(dc[IPv6ExtHdrDestOpt])
747             self.assertTrue(dc[IPv6ExtHdrHopByHop])
748             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
749
750
751 class IpsecTra6Tests(IpsecTra6):
752     """ UT test methods for Transport v6 """
753     def test_tra_basic6(self):
754         """ ipsec v6 transport basic test """
755         self.verify_tra_basic6(count=1)
756
757     def test_tra_burst6(self):
758         """ ipsec v6 transport burst test """
759         self.verify_tra_basic6(count=257)
760
761
762 class IpsecTra6ExtTests(IpsecTra6):
763     def test_tra_ext_hdrs_66(self):
764         """ ipsec 6o6 tra extension headers test """
765         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
766
767
768 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
769     """ UT test methods for Transport v6 and v4"""
770     pass
771
772
773 class IpsecTun4(object):
774     """ verify methods for Tunnel v4 """
775     def verify_counters4(self, p, count, n_frags=None, worker=None):
776         if not n_frags:
777             n_frags = count
778         if (hasattr(p, "spd_policy_in_any")):
779             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
780             self.assertEqual(pkts, count,
781                              "incorrect SPD any policy: expected %d != %d" %
782                              (count, pkts))
783
784         if (hasattr(p, "tun_sa_in")):
785             pkts = p.tun_sa_in.get_stats(worker)['packets']
786             self.assertEqual(pkts, count,
787                              "incorrect SA in counts: expected %d != %d" %
788                              (count, pkts))
789             pkts = p.tun_sa_out.get_stats(worker)['packets']
790             self.assertEqual(pkts, count,
791                              "incorrect SA out counts: expected %d != %d" %
792                              (count, pkts))
793
794         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
795         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
796
797     def verify_decrypted(self, p, rxs):
798         for rx in rxs:
799             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
800             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
801             self.assert_packet_checksums_valid(rx)
802
803     def verify_encrypted(self, p, sa, rxs):
804         decrypt_pkts = []
805         for rx in rxs:
806             if p.nat_header:
807                 self.assertEqual(rx[UDP].dport, 4500)
808             self.assert_packet_checksums_valid(rx)
809             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
810             try:
811                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
812                 if not decrypt_pkt.haslayer(IP):
813                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
814                 decrypt_pkts.append(decrypt_pkt)
815                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
816                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
817             except:
818                 self.logger.debug(ppp("Unexpected packet:", rx))
819                 try:
820                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
821                 except:
822                     pass
823                 raise
824         pkts = reassemble4(decrypt_pkts)
825         for pkt in pkts:
826             self.assert_packet_checksums_valid(pkt)
827
828     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
829         self.vapi.cli("clear errors")
830         self.vapi.cli("clear ipsec counters")
831         if not n_rx:
832             n_rx = count
833         try:
834             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
835                                               src=p.remote_tun_if_host,
836                                               dst=self.pg1.remote_ip4,
837                                               count=count)
838             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
839             self.verify_decrypted(p, recv_pkts)
840
841             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
842                                       dst=p.remote_tun_if_host, count=count,
843                                       payload_size=payload_size)
844             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
845                                              self.tun_if, n_rx)
846             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
847
848             for rx in recv_pkts:
849                 self.assertEqual(rx[IP].src, p.tun_src)
850                 self.assertEqual(rx[IP].dst, p.tun_dst)
851
852         finally:
853             self.logger.info(self.vapi.ppcli("show error"))
854             self.logger.info(self.vapi.ppcli("show ipsec all"))
855
856         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
857         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
858         self.verify_counters4(p, count, n_rx)
859
860     """ verify methods for Transport v4 """
861     def verify_tun_44_bad_packet_sizes(self, p):
862         # with a buffer size of 2048, 1989 bytes of payload
863         # means there isn't space to insert the ESP header
864         N_PKTS = 63
865         for p_siz in [1989, 8500]:
866             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
867                                               src=p.remote_tun_if_host,
868                                               dst=self.pg1.remote_ip4,
869                                               count=N_PKTS,
870                                               payload_size=p_siz)
871             self.send_and_assert_no_replies(self.tun_if, send_pkts)
872             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
873                                       dst=p.remote_tun_if_host, count=N_PKTS,
874                                       payload_size=p_siz)
875             self.send_and_assert_no_replies(self.pg1, send_pkts,
876                                             self.tun_if)
877
878         # both large packets on decrpyt count against chained buffers
879         # the 9000 bytes one does on encrypt
880         self.assertEqual(2 * N_PKTS,
881                          self.statistics.get_err_counter(
882                              '/err/%s/chained buffers (packet dropped)' %
883                              self.tun4_decrypt_node_name))
884         self.assertEqual(N_PKTS,
885                          self.statistics.get_err_counter(
886                              '/err/%s/chained buffers (packet dropped)' %
887                              self.tun4_encrypt_node_name))
888
889         # on encrypt the 1989 size is no trailer space
890         self.assertEqual(N_PKTS,
891                          self.statistics.get_err_counter(
892                              '/err/%s/no trailer space (packet dropped)' %
893                              self.tun4_encrypt_node_name))
894
895     def verify_tun_reass_44(self, p):
896         self.vapi.cli("clear errors")
897         self.vapi.ip_reassembly_enable_disable(
898             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
899
900         try:
901             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
902                                               src=p.remote_tun_if_host,
903                                               dst=self.pg1.remote_ip4,
904                                               payload_size=1900,
905                                               count=1)
906             send_pkts = fragment_rfc791(send_pkts[0], 1400)
907             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
908                                              self.pg1, n_rx=1)
909             self.verify_decrypted(p, recv_pkts)
910
911             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
912                                       dst=p.remote_tun_if_host, count=1)
913             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
914                                              self.tun_if)
915             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
916
917         finally:
918             self.logger.info(self.vapi.ppcli("show error"))
919             self.logger.info(self.vapi.ppcli("show ipsec all"))
920
921         self.verify_counters4(p, 1, 1)
922         self.vapi.ip_reassembly_enable_disable(
923             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
924
925     def verify_tun_64(self, p, count=1):
926         self.vapi.cli("clear errors")
927         try:
928             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
929                                                src=p.remote_tun_if_host6,
930                                                dst=self.pg1.remote_ip6,
931                                                count=count)
932             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
933             for recv_pkt in recv_pkts:
934                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
935                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
936                 self.assert_packet_checksums_valid(recv_pkt)
937             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
938                                        dst=p.remote_tun_if_host6, count=count)
939             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
940             for recv_pkt in recv_pkts:
941                 try:
942                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
943                     if not decrypt_pkt.haslayer(IPv6):
944                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
945                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
946                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
947                     self.assert_packet_checksums_valid(decrypt_pkt)
948                 except:
949                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
950                     try:
951                         self.logger.debug(
952                             ppp("Decrypted packet:", decrypt_pkt))
953                     except:
954                         pass
955                     raise
956         finally:
957             self.logger.info(self.vapi.ppcli("show error"))
958             self.logger.info(self.vapi.ppcli("show ipsec all"))
959
960         self.verify_counters4(p, count)
961
962     def verify_keepalive(self, p):
963         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
964                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
965                UDP(sport=333, dport=4500) /
966                Raw(b'\xff'))
967         self.send_and_assert_no_replies(self.tun_if, pkt*31)
968         self.assert_error_counter_equal(
969             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
970
971         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
972                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
973                UDP(sport=333, dport=4500) /
974                Raw(b'\xfe'))
975         self.send_and_assert_no_replies(self.tun_if, pkt*31)
976         self.assert_error_counter_equal(
977             '/err/%s/Too Short' % self.tun4_input_node, 31)
978
979
980 class IpsecTun4Tests(IpsecTun4):
981     """ UT test methods for Tunnel v4 """
982     def test_tun_basic44(self):
983         """ ipsec 4o4 tunnel basic test """
984         self.verify_tun_44(self.params[socket.AF_INET], count=1)
985         self.tun_if.admin_down()
986         self.tun_if.resolve_arp()
987         self.tun_if.admin_up()
988         self.verify_tun_44(self.params[socket.AF_INET], count=1)
989
990     def test_tun_reass_basic44(self):
991         """ ipsec 4o4 tunnel basic reassembly test """
992         self.verify_tun_reass_44(self.params[socket.AF_INET])
993
994     def test_tun_burst44(self):
995         """ ipsec 4o4 tunnel burst test """
996         self.verify_tun_44(self.params[socket.AF_INET], count=127)
997
998
999 class IpsecTunEsp4Tests(IpsecTun4):
1000     def test_tun_bad_packet_sizes(self):
1001         """ ipsec v4 tunnel bad packet size """
1002         self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
1003
1004
1005 class IpsecTun6(object):
1006     """ verify methods for Tunnel v6 """
1007     def verify_counters6(self, p_in, p_out, count, worker=None):
1008         if (hasattr(p_in, "tun_sa_in")):
1009             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1010             self.assertEqual(pkts, count,
1011                              "incorrect SA in counts: expected %d != %d" %
1012                              (count, pkts))
1013         if (hasattr(p_out, "tun_sa_out")):
1014             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1015             self.assertEqual(pkts, count,
1016                              "incorrect SA out counts: expected %d != %d" %
1017                              (count, pkts))
1018         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1019         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1020
1021     def verify_decrypted6(self, p, rxs):
1022         for rx in rxs:
1023             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1024             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1025             self.assert_packet_checksums_valid(rx)
1026
1027     def verify_encrypted6(self, p, sa, rxs):
1028         for rx in rxs:
1029             self.assert_packet_checksums_valid(rx)
1030             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1031                              rx[IPv6].plen)
1032             try:
1033                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1034                 if not decrypt_pkt.haslayer(IPv6):
1035                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1036                 self.assert_packet_checksums_valid(decrypt_pkt)
1037                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1038                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1039             except:
1040                 self.logger.debug(ppp("Unexpected packet:", rx))
1041                 try:
1042                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1043                 except:
1044                     pass
1045                 raise
1046
1047     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1048         self.vapi.cli("clear errors")
1049         self.vapi.cli("clear ipsec sa")
1050
1051         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1052                                            src=p_in.remote_tun_if_host,
1053                                            dst=self.pg1.remote_ip6,
1054                                            count=count)
1055         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1056         self.logger.info(self.vapi.cli("sh punt stats"))
1057
1058     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1059         self.vapi.cli("clear errors")
1060         self.vapi.cli("clear ipsec sa")
1061         if not p_out:
1062             p_out = p_in
1063         try:
1064             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1065                                                src=p_in.remote_tun_if_host,
1066                                                dst=self.pg1.remote_ip6,
1067                                                count=count)
1068             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1069             self.verify_decrypted6(p_in, recv_pkts)
1070
1071             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1072                                        dst=p_out.remote_tun_if_host,
1073                                        count=count,
1074                                        payload_size=payload_size)
1075             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1076             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1077
1078             for rx in recv_pkts:
1079                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1080                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1081
1082         finally:
1083             self.logger.info(self.vapi.ppcli("show error"))
1084             self.logger.info(self.vapi.ppcli("show ipsec all"))
1085         self.verify_counters6(p_in, p_out, count)
1086
1087     def verify_tun_reass_66(self, p):
1088         self.vapi.cli("clear errors")
1089         self.vapi.ip_reassembly_enable_disable(
1090             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1091
1092         try:
1093             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1094                                                src=p.remote_tun_if_host,
1095                                                dst=self.pg1.remote_ip6,
1096                                                count=1,
1097                                                payload_size=1850)
1098             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1099             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1100                                              self.pg1, n_rx=1)
1101             self.verify_decrypted6(p, recv_pkts)
1102
1103             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1104                                        dst=p.remote_tun_if_host,
1105                                        count=1,
1106                                        payload_size=64)
1107             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1108                                              self.tun_if)
1109             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1110         finally:
1111             self.logger.info(self.vapi.ppcli("show error"))
1112             self.logger.info(self.vapi.ppcli("show ipsec all"))
1113         self.verify_counters6(p, p, 1)
1114         self.vapi.ip_reassembly_enable_disable(
1115             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1116
1117     def verify_tun_46(self, p, count=1):
1118         """ ipsec 4o6 tunnel basic test """
1119         self.vapi.cli("clear errors")
1120         try:
1121             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1122                                               src=p.remote_tun_if_host4,
1123                                               dst=self.pg1.remote_ip4,
1124                                               count=count)
1125             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1126             for recv_pkt in recv_pkts:
1127                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1128                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1129                 self.assert_packet_checksums_valid(recv_pkt)
1130             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1131                                       dst=p.remote_tun_if_host4,
1132                                       count=count)
1133             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1134             for recv_pkt in recv_pkts:
1135                 try:
1136                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1137                     if not decrypt_pkt.haslayer(IP):
1138                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1139                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1140                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1141                     self.assert_packet_checksums_valid(decrypt_pkt)
1142                 except:
1143                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1144                     try:
1145                         self.logger.debug(ppp("Decrypted packet:",
1146                                               decrypt_pkt))
1147                     except:
1148                         pass
1149                     raise
1150         finally:
1151             self.logger.info(self.vapi.ppcli("show error"))
1152             self.logger.info(self.vapi.ppcli("show ipsec all"))
1153         self.verify_counters6(p, p, count)
1154
1155
1156 class IpsecTun6Tests(IpsecTun6):
1157     """ UT test methods for Tunnel v6 """
1158
1159     def test_tun_basic66(self):
1160         """ ipsec 6o6 tunnel basic test """
1161         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1162
1163     def test_tun_reass_basic66(self):
1164         """ ipsec 6o6 tunnel basic reassembly test """
1165         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1166
1167     def test_tun_burst66(self):
1168         """ ipsec 6o6 tunnel burst test """
1169         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1170
1171
1172 class IpsecTun6HandoffTests(IpsecTun6):
1173     """ UT test methods for Tunnel v6 with multiple workers """
1174     worker_config = "workers 2"
1175
1176     def test_tun_handoff_66(self):
1177         """ ipsec 6o6 tunnel worker hand-off test """
1178         N_PKTS = 15
1179         p = self.params[socket.AF_INET6]
1180
1181         # inject alternately on worker 0 and 1. all counts on the SA
1182         # should be against worker 0
1183         for worker in [0, 1, 0, 1]:
1184             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1185                                                src=p.remote_tun_if_host,
1186                                                dst=self.pg1.remote_ip6,
1187                                                count=N_PKTS)
1188             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1189                                              self.pg1, worker=worker)
1190             self.verify_decrypted6(p, recv_pkts)
1191
1192             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1193                                        dst=p.remote_tun_if_host,
1194                                        count=N_PKTS)
1195             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1196                                              self.tun_if, worker=worker)
1197             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1198
1199         # all counts against the first worker that was used
1200         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1201
1202
1203 class IpsecTun4HandoffTests(IpsecTun4):
1204     """ UT test methods for Tunnel v4 with multiple workers """
1205     worker_config = "workers 2"
1206
1207     def test_tun_handooff_44(self):
1208         """ ipsec 4o4 tunnel worker hand-off test """
1209         N_PKTS = 15
1210         p = self.params[socket.AF_INET]
1211
1212         # inject alternately on worker 0 and 1. all counts on the SA
1213         # should be against worker 0
1214         for worker in [0, 1, 0, 1]:
1215             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1216                                               src=p.remote_tun_if_host,
1217                                               dst=self.pg1.remote_ip4,
1218                                               count=N_PKTS)
1219             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1220                                              self.pg1, worker=worker)
1221             self.verify_decrypted(p, recv_pkts)
1222
1223             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1224                                       dst=p.remote_tun_if_host,
1225                                       count=N_PKTS)
1226             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1227                                              self.tun_if, worker=worker)
1228             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1229
1230         # all counts against the first worker that was used
1231         self.verify_counters4(p, 4*N_PKTS, worker=0)
1232
1233
1234 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1235     """ UT test methods for Tunnel v6 & v4 """
1236     pass
1237
1238
1239 if __name__ == '__main__':
1240     unittest.main(testRunner=VppTestRunner)