ipsec: Targeted unit testing
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params(object):
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 10
31         self.scapy_tun_spi = 1001
32         self.vpp_tun_sa_id = 20
33         self.vpp_tun_spi = 1000
34
35         self.scapy_tra_sa_id = 30
36         self.scapy_tra_spi = 2001
37         self.vpp_tra_sa_id = 40
38         self.vpp_tra_spi = 2000
39
40         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41                                  IPSEC_API_INTEG_ALG_SHA1_96)
42         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
43         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
44
45         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47         self.crypt_algo = 'AES-CBC'  # scapy name
48         self.crypt_key = b'JPjyOWBeVEQiMe7h'
49         self.salt = 0
50         self.flags = 0
51         self.nat_header = None
52
53
54 class IPsecIPv6Params(object):
55
56     addr_type = socket.AF_INET6
57     addr_any = "0::0"
58     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
59     addr_len = 128
60     is_ipv6 = 1
61
62     def __init__(self):
63         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
64         self.remote_tun_if_host4 = '1.1.1.1'
65
66         self.scapy_tun_sa_id = 50
67         self.scapy_tun_spi = 3001
68         self.vpp_tun_sa_id = 60
69         self.vpp_tun_spi = 3000
70
71         self.scapy_tra_sa_id = 70
72         self.scapy_tra_spi = 4001
73         self.vpp_tra_sa_id = 80
74         self.vpp_tra_spi = 4000
75
76         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
77                                  IPSEC_API_INTEG_ALG_SHA1_96)
78         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
79         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
80
81         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
82                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
83         self.crypt_algo = 'AES-CBC'  # scapy name
84         self.crypt_key = b'JPjyOWBeVEQiMe7h'
85         self.salt = 0
86         self.flags = 0
87         self.nat_header = None
88
89
90 def mk_scapy_crypt_key(p):
91     if p.crypt_algo == "AES-GCM":
92         return p.crypt_key + struct.pack("!I", p.salt)
93     else:
94         return p.crypt_key
95
96
97 def config_tun_params(p, encryption_type, tun_if):
98     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
99     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
100                              IPSEC_API_SAD_FLAG_USE_ESN))
101     crypt_key = mk_scapy_crypt_key(p)
102     p.scapy_tun_sa = SecurityAssociation(
103         encryption_type, spi=p.vpp_tun_spi,
104         crypt_algo=p.crypt_algo,
105         crypt_key=crypt_key,
106         auth_algo=p.auth_algo, auth_key=p.auth_key,
107         tunnel_header=ip_class_by_addr_type[p.addr_type](
108             src=tun_if.remote_addr[p.addr_type],
109             dst=tun_if.local_addr[p.addr_type]),
110         nat_t_header=p.nat_header,
111         esn_en=esn_en)
112     p.vpp_tun_sa = SecurityAssociation(
113         encryption_type, spi=p.scapy_tun_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=crypt_key,
116         auth_algo=p.auth_algo, auth_key=p.auth_key,
117         tunnel_header=ip_class_by_addr_type[p.addr_type](
118             dst=tun_if.remote_addr[p.addr_type],
119             src=tun_if.local_addr[p.addr_type]),
120         nat_t_header=p.nat_header,
121         esn_en=esn_en)
122
123
124 def config_tra_params(p, encryption_type):
125     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
126                              IPSEC_API_SAD_FLAG_USE_ESN))
127     crypt_key = mk_scapy_crypt_key(p)
128     p.scapy_tra_sa = SecurityAssociation(
129         encryption_type,
130         spi=p.vpp_tra_spi,
131         crypt_algo=p.crypt_algo,
132         crypt_key=crypt_key,
133         auth_algo=p.auth_algo,
134         auth_key=p.auth_key,
135         nat_t_header=p.nat_header,
136         esn_en=esn_en)
137     p.vpp_tra_sa = SecurityAssociation(
138         encryption_type,
139         spi=p.scapy_tra_spi,
140         crypt_algo=p.crypt_algo,
141         crypt_key=crypt_key,
142         auth_algo=p.auth_algo,
143         auth_key=p.auth_key,
144         nat_t_header=p.nat_header,
145         esn_en=esn_en)
146
147
148 class TemplateIpsec(VppTestCase):
149     """
150     TRANSPORT MODE:
151
152      ------   encrypt   ---
153     |tra_if| <-------> |VPP|
154      ------   decrypt   ---
155
156     TUNNEL MODE:
157
158      ------   encrypt   ---   plain   ---
159     |tun_if| <-------  |VPP| <------ |pg1|
160      ------             ---           ---
161
162      ------   decrypt   ---   plain   ---
163     |tun_if| ------->  |VPP| ------> |pg1|
164      ------             ---           ---
165     """
166     tun_spd_id = 1
167     tra_spd_id = 2
168
169     def ipsec_select_backend(self):
170         """ empty method to be overloaded when necessary """
171         pass
172
173     @classmethod
174     def setUpClass(cls):
175         super(TemplateIpsec, cls).setUpClass()
176
177     @classmethod
178     def tearDownClass(cls):
179         super(TemplateIpsec, cls).tearDownClass()
180
181     def setup_params(self):
182         self.ipv4_params = IPsecIPv4Params()
183         self.ipv6_params = IPsecIPv6Params()
184         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
185                        self.ipv6_params.addr_type: self.ipv6_params}
186
187     def config_interfaces(self):
188         self.create_pg_interfaces(range(3))
189         self.interfaces = list(self.pg_interfaces)
190         for i in self.interfaces:
191             i.admin_up()
192             i.config_ip4()
193             i.resolve_arp()
194             i.config_ip6()
195             i.resolve_ndp()
196
197     def setUp(self):
198         super(TemplateIpsec, self).setUp()
199
200         self.setup_params()
201
202         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
203                                  IPSEC_API_PROTO_ESP)
204         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
205                                 IPSEC_API_PROTO_AH)
206
207         self.config_interfaces()
208
209         self.ipsec_select_backend()
210
211     def unconfig_interfaces(self):
212         for i in self.interfaces:
213             i.admin_down()
214             i.unconfig_ip4()
215             i.unconfig_ip6()
216
217     def tearDown(self):
218         super(TemplateIpsec, self).tearDown()
219
220         self.unconfig_interfaces()
221
222     def show_commands_at_teardown(self):
223         self.logger.info(self.vapi.cli("show hardware"))
224
225     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
226                          payload_size=54):
227         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
228                 sa.encrypt(IP(src=src, dst=dst) /
229                            ICMP() / Raw(b'X' * payload_size))
230                 for i in range(count)]
231
232     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
233                           payload_size=54):
234         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
235                 sa.encrypt(IPv6(src=src, dst=dst) /
236                            ICMPv6EchoRequest(id=0, seq=1,
237                                              data='X' * payload_size))
238                 for i in range(count)]
239
240     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
241         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
242                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
243                 for i in range(count)]
244
245     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
246         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
247                 IPv6(src=src, dst=dst) /
248                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
249                 for i in range(count)]
250
251
252 class IpsecTcp(object):
253     def verify_tcp_checksum(self):
254         self.vapi.cli("test http server")
255         p = self.params[socket.AF_INET]
256         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
257                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
258                                           dst=self.tun_if.local_ip4) /
259                                        TCP(flags='S', dport=80)))
260         self.logger.debug(ppp("Sending packet:", send))
261         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
262         recv = recv[0]
263         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
264         self.assert_packet_checksums_valid(decrypted)
265
266
267 class IpsecTcpTests(IpsecTcp):
268     def test_tcp_checksum(self):
269         """ verify checksum correctness for vpp generated packets """
270         self.verify_tcp_checksum()
271
272
273 class IpsecTra4(object):
274     """ verify methods for Transport v4 """
275     def verify_tra_anti_replay(self):
276         p = self.params[socket.AF_INET]
277         esn_en = p.vpp_tra_sa.esn_en
278
279         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
280                                self.tra4_encrypt_node_name)
281         replay_node_name = ('/err/%s/SA replayed packet' %
282                             self.tra4_decrypt_node_name)
283         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
284             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
285                                      self.tra4_decrypt_node_name)
286         else:
287             hash_failed_node_name = ('/err/%s/Integrity check failed' %
288                                      self.tra4_decrypt_node_name)
289         replay_count = self.statistics.get_err_counter(replay_node_name)
290         hash_failed_count = self.statistics.get_err_counter(
291             hash_failed_node_name)
292         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
293
294         if ESP == self.encryption_type:
295             undersize_node_name = ('/err/%s/undersized packet' %
296                                    self.tra4_decrypt_node_name)
297             undersize_count = self.statistics.get_err_counter(
298                 undersize_node_name)
299
300         #
301         # send packets with seq numbers 1->34
302         # this means the window size is still in Case B (see RFC4303
303         # Appendix A)
304         #
305         # for reasons i haven't investigated Scapy won't create a packet with
306         # seq_num=0
307         #
308         pkts = [(Ether(src=self.tra_if.remote_mac,
309                        dst=self.tra_if.local_mac) /
310                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
311                                            dst=self.tra_if.local_ip4) /
312                                         ICMP(),
313                                         seq_num=seq))
314                 for seq in range(1, 34)]
315         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
316
317         # replayed packets are dropped
318         self.send_and_assert_no_replies(self.tra_if, pkts)
319         replay_count += len(pkts)
320         self.assert_error_counter_equal(replay_node_name, replay_count)
321
322         #
323         # now send a batch of packets all with the same sequence number
324         # the first packet in the batch is legitimate, the rest bogus
325         #
326         pkts = (Ether(src=self.tra_if.remote_mac,
327                       dst=self.tra_if.local_mac) /
328                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
329                                           dst=self.tra_if.local_ip4) /
330                                        ICMP(),
331                                        seq_num=35))
332         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
333                                          self.tra_if, n_rx=1)
334         replay_count += 7
335         self.assert_error_counter_equal(replay_node_name, replay_count)
336
337         #
338         # now move the window over to 257 (more than one byte) and into Case A
339         #
340         pkt = (Ether(src=self.tra_if.remote_mac,
341                      dst=self.tra_if.local_mac) /
342                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
343                                          dst=self.tra_if.local_ip4) /
344                                       ICMP(),
345                                       seq_num=257))
346         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
347
348         # replayed packets are dropped
349         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
350         replay_count += 3
351         self.assert_error_counter_equal(replay_node_name, replay_count)
352
353         # the window size is 64 packets
354         # in window are still accepted
355         pkt = (Ether(src=self.tra_if.remote_mac,
356                      dst=self.tra_if.local_mac) /
357                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
358                                          dst=self.tra_if.local_ip4) /
359                                       ICMP(),
360                                       seq_num=200))
361         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
362
363         # a packet that does not decrypt does not move the window forward
364         bogus_sa = SecurityAssociation(self.encryption_type,
365                                        p.vpp_tra_spi,
366                                        crypt_algo=p.crypt_algo,
367                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
368                                        auth_algo=p.auth_algo,
369                                        auth_key=p.auth_key[::-1])
370         pkt = (Ether(src=self.tra_if.remote_mac,
371                      dst=self.tra_if.local_mac) /
372                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
373                                    dst=self.tra_if.local_ip4) /
374                                 ICMP(),
375                                 seq_num=350))
376         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
377
378         hash_failed_count += 17
379         self.assert_error_counter_equal(hash_failed_node_name,
380                                         hash_failed_count)
381
382         # a malformed 'runt' packet
383         #  created by a mis-constructed SA
384         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
385             bogus_sa = SecurityAssociation(self.encryption_type,
386                                            p.vpp_tra_spi)
387             pkt = (Ether(src=self.tra_if.remote_mac,
388                          dst=self.tra_if.local_mac) /
389                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
390                                        dst=self.tra_if.local_ip4) /
391                                     ICMP(),
392                                     seq_num=350))
393             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
394
395             undersize_count += 17
396             self.assert_error_counter_equal(undersize_node_name,
397                                             undersize_count)
398
399         # which we can determine since this packet is still in the window
400         pkt = (Ether(src=self.tra_if.remote_mac,
401                      dst=self.tra_if.local_mac) /
402                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
403                                          dst=self.tra_if.local_ip4) /
404                                       ICMP(),
405                                       seq_num=234))
406         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
407
408         #
409         # out of window are dropped
410         #  this is Case B. So VPP will consider this to be a high seq num wrap
411         #  and so the decrypt attempt will fail
412         #
413         pkt = (Ether(src=self.tra_if.remote_mac,
414                      dst=self.tra_if.local_mac) /
415                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
416                                          dst=self.tra_if.local_ip4) /
417                                       ICMP(),
418                                       seq_num=17))
419         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
420
421         if esn_en:
422             # an out of window error with ESN looks like a high sequence
423             # wrap. but since it isn't then the verify will fail.
424             hash_failed_count += 17
425             self.assert_error_counter_equal(hash_failed_node_name,
426                                             hash_failed_count)
427
428         else:
429             replay_count += 17
430             self.assert_error_counter_equal(replay_node_name,
431                                             replay_count)
432
433         # valid packet moves the window over to 258
434         pkt = (Ether(src=self.tra_if.remote_mac,
435                      dst=self.tra_if.local_mac) /
436                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
437                                          dst=self.tra_if.local_ip4) /
438                                       ICMP(),
439                                       seq_num=258))
440         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
441         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
442
443         #
444         # move VPP's SA TX seq-num to just before the seq-number wrap.
445         # then fire in a packet that VPP should drop on TX because it
446         # causes the TX seq number to wrap; unless we're using extened sequence
447         # numbers.
448         #
449         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
450         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
451         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
452
453         pkts = [(Ether(src=self.tra_if.remote_mac,
454                        dst=self.tra_if.local_mac) /
455                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
456                                            dst=self.tra_if.local_ip4) /
457                                         ICMP(),
458                                         seq_num=seq))
459                 for seq in range(259, 280)]
460
461         if esn_en:
462             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
463
464             #
465             # in order for scapy to decrypt its SA's high order number needs
466             # to wrap
467             #
468             p.vpp_tra_sa.seq_num = 0x100000000
469             for rx in rxs:
470                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
471
472             #
473             # wrap scapy's TX high sequence number. VPP is in case B, so it
474             # will consider this a high seq wrap also.
475             # The low seq num we set it to will place VPP's RX window in Case A
476             #
477             p.scapy_tra_sa.seq_num = 0x100000005
478             pkt = (Ether(src=self.tra_if.remote_mac,
479                          dst=self.tra_if.local_mac) /
480                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
481                                              dst=self.tra_if.local_ip4) /
482                                           ICMP(),
483                                           seq_num=0x100000005))
484             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
485             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
486
487             #
488             # A packet that has seq num between (2^32-64) and 5 is within
489             # the window
490             #
491             p.scapy_tra_sa.seq_num = 0xfffffffd
492             pkt = (Ether(src=self.tra_if.remote_mac,
493                          dst=self.tra_if.local_mac) /
494                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
495                                              dst=self.tra_if.local_ip4) /
496                                           ICMP(),
497                                           seq_num=0xfffffffd))
498             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
499             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
500
501             #
502             # While in case A we cannot wrap the high sequence number again
503             # becuase VPP will consider this packet to be one that moves the
504             # window forward
505             #
506             pkt = (Ether(src=self.tra_if.remote_mac,
507                          dst=self.tra_if.local_mac) /
508                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
509                                              dst=self.tra_if.local_ip4) /
510                                           ICMP(),
511                                           seq_num=0x200000999))
512             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
513
514             hash_failed_count += 1
515             self.assert_error_counter_equal(hash_failed_node_name,
516                                             hash_failed_count)
517
518             #
519             # but if we move the wondow forward to case B, then we can wrap
520             # again
521             #
522             p.scapy_tra_sa.seq_num = 0x100000555
523             pkt = (Ether(src=self.tra_if.remote_mac,
524                          dst=self.tra_if.local_mac) /
525                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
526                                              dst=self.tra_if.local_ip4) /
527                                           ICMP(),
528                                           seq_num=0x100000555))
529             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
530             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
531
532             p.scapy_tra_sa.seq_num = 0x200000444
533             pkt = (Ether(src=self.tra_if.remote_mac,
534                          dst=self.tra_if.local_mac) /
535                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
536                                              dst=self.tra_if.local_ip4) /
537                                           ICMP(),
538                                           seq_num=0x200000444))
539             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
540             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
541
542         else:
543             #
544             # without ESN TX sequence numbers can't wrap and packets are
545             # dropped from here on out.
546             #
547             self.send_and_assert_no_replies(self.tra_if, pkts)
548             seq_cycle_count += len(pkts)
549             self.assert_error_counter_equal(seq_cycle_node_name,
550                                             seq_cycle_count)
551
552         # move the security-associations seq number on to the last we used
553         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
554         p.scapy_tra_sa.seq_num = 351
555         p.vpp_tra_sa.seq_num = 351
556
557     def verify_tra_basic4(self, count=1):
558         """ ipsec v4 transport basic test """
559         self.vapi.cli("clear errors")
560         self.vapi.cli("clear ipsec sa")
561         try:
562             p = self.params[socket.AF_INET]
563             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
564                                               src=self.tra_if.remote_ip4,
565                                               dst=self.tra_if.local_ip4,
566                                               count=count)
567             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
568                                              self.tra_if)
569             for rx in recv_pkts:
570                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
571                 self.assert_packet_checksums_valid(rx)
572                 try:
573                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
574                     self.assert_packet_checksums_valid(decrypted)
575                 except:
576                     self.logger.debug(ppp("Unexpected packet:", rx))
577                     raise
578         finally:
579             self.logger.info(self.vapi.ppcli("show error"))
580             self.logger.info(self.vapi.ppcli("show ipsec all"))
581
582         pkts = p.tra_sa_in.get_stats()['packets']
583         self.assertEqual(pkts, count,
584                          "incorrect SA in counts: expected %d != %d" %
585                          (count, pkts))
586         pkts = p.tra_sa_out.get_stats()['packets']
587         self.assertEqual(pkts, count,
588                          "incorrect SA out counts: expected %d != %d" %
589                          (count, pkts))
590
591         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
592         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
593
594
595 class IpsecTra4Tests(IpsecTra4):
596     """ UT test methods for Transport v4 """
597     def test_tra_anti_replay(self):
598         """ ipsec v4 transport anti-reply test """
599         self.verify_tra_anti_replay()
600
601     def test_tra_basic(self, count=1):
602         """ ipsec v4 transport basic test """
603         self.verify_tra_basic4(count=1)
604
605     def test_tra_burst(self):
606         """ ipsec v4 transport burst test """
607         self.verify_tra_basic4(count=257)
608
609
610 class IpsecTra6(object):
611     """ verify methods for Transport v6 """
612     def verify_tra_basic6(self, count=1):
613         self.vapi.cli("clear errors")
614         try:
615             p = self.params[socket.AF_INET6]
616             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
617                                                src=self.tra_if.remote_ip6,
618                                                dst=self.tra_if.local_ip6,
619                                                count=count)
620             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
621                                              self.tra_if)
622             for rx in recv_pkts:
623                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
624                                  rx[IPv6].plen)
625                 try:
626                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
627                     self.assert_packet_checksums_valid(decrypted)
628                 except:
629                     self.logger.debug(ppp("Unexpected packet:", rx))
630                     raise
631         finally:
632             self.logger.info(self.vapi.ppcli("show error"))
633             self.logger.info(self.vapi.ppcli("show ipsec all"))
634
635         pkts = p.tra_sa_in.get_stats()['packets']
636         self.assertEqual(pkts, count,
637                          "incorrect SA in counts: expected %d != %d" %
638                          (count, pkts))
639         pkts = p.tra_sa_out.get_stats()['packets']
640         self.assertEqual(pkts, count,
641                          "incorrect SA out counts: expected %d != %d" %
642                          (count, pkts))
643         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
644         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
645
646     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
647                                    payload_size=54):
648         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
649                 sa.encrypt(IPv6(src=src, dst=dst) /
650                            ICMPv6EchoRequest(id=0, seq=1,
651                                              data='X' * payload_size))
652                 for i in range(count)]
653
654     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
655         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
656                 IPv6(src=src, dst=dst) /
657                 IPv6ExtHdrHopByHop() /
658                 IPv6ExtHdrFragment(id=2, offset=200) /
659                 Raw(b'\xff' * 200)
660                 for i in range(count)]
661
662     def verify_tra_encrypted6(self, p, sa, rxs):
663         decrypted = []
664         for rx in rxs:
665             self.assert_packet_checksums_valid(rx)
666             try:
667                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
668                 decrypted.append(decrypt_pkt)
669                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
670                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
671             except:
672                 self.logger.debug(ppp("Unexpected packet:", rx))
673                 try:
674                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
675                 except:
676                     pass
677                 raise
678         return decrypted
679
680     def verify_tra_66_ext_hdrs(self, p):
681         count = 63
682
683         #
684         # check we can decrypt with options
685         #
686         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
687                                              src=self.tra_if.remote_ip6,
688                                              dst=self.tra_if.local_ip6,
689                                              count=count)
690         self.send_and_expect(self.tra_if, tx, self.tra_if)
691
692         #
693         # injecting a packet from ourselves to be routed of box is a hack
694         # but it matches an outbout policy, alors je ne regrette rien
695         #
696
697         # one extension before ESP
698         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
699               IPv6(src=self.tra_if.local_ip6,
700                    dst=self.tra_if.remote_ip6) /
701               IPv6ExtHdrFragment(id=2, offset=200) /
702               Raw(b'\xff' * 200))
703
704         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
705         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
706
707         for dc in dcs:
708             # for reasons i'm not going to investigate scapy does not
709             # created the correct headers after decrypt. but reparsing
710             # the ipv6 packet fixes it
711             dc = IPv6(raw(dc[IPv6]))
712             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
713
714         # two extensions before ESP
715         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
716               IPv6(src=self.tra_if.local_ip6,
717                    dst=self.tra_if.remote_ip6) /
718               IPv6ExtHdrHopByHop() /
719               IPv6ExtHdrFragment(id=2, offset=200) /
720               Raw(b'\xff' * 200))
721
722         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
723         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
724
725         for dc in dcs:
726             dc = IPv6(raw(dc[IPv6]))
727             self.assertTrue(dc[IPv6ExtHdrHopByHop])
728             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
729
730         # two extensions before ESP, one after
731         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
732               IPv6(src=self.tra_if.local_ip6,
733                    dst=self.tra_if.remote_ip6) /
734               IPv6ExtHdrHopByHop() /
735               IPv6ExtHdrFragment(id=2, offset=200) /
736               IPv6ExtHdrDestOpt() /
737               Raw(b'\xff' * 200))
738
739         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
740         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
741
742         for dc in dcs:
743             dc = IPv6(raw(dc[IPv6]))
744             self.assertTrue(dc[IPv6ExtHdrDestOpt])
745             self.assertTrue(dc[IPv6ExtHdrHopByHop])
746             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
747
748
749 class IpsecTra6Tests(IpsecTra6):
750     """ UT test methods for Transport v6 """
751     def test_tra_basic6(self):
752         """ ipsec v6 transport basic test """
753         self.verify_tra_basic6(count=1)
754
755     def test_tra_burst6(self):
756         """ ipsec v6 transport burst test """
757         self.verify_tra_basic6(count=257)
758
759
760 class IpsecTra6ExtTests(IpsecTra6):
761     def test_tra_ext_hdrs_66(self):
762         """ ipsec 6o6 tra extension headers test """
763         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
764
765
766 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
767     """ UT test methods for Transport v6 and v4"""
768     pass
769
770
771 class IpsecTun4(object):
772     """ verify methods for Tunnel v4 """
773     def verify_counters4(self, p, count, n_frags=None, worker=None):
774         if not n_frags:
775             n_frags = count
776         if (hasattr(p, "spd_policy_in_any")):
777             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
778             self.assertEqual(pkts, count,
779                              "incorrect SPD any policy: expected %d != %d" %
780                              (count, pkts))
781
782         if (hasattr(p, "tun_sa_in")):
783             pkts = p.tun_sa_in.get_stats(worker)['packets']
784             self.assertEqual(pkts, count,
785                              "incorrect SA in counts: expected %d != %d" %
786                              (count, pkts))
787             pkts = p.tun_sa_out.get_stats(worker)['packets']
788             self.assertEqual(pkts, count,
789                              "incorrect SA out counts: expected %d != %d" %
790                              (count, pkts))
791
792         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
793         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
794
795     def verify_decrypted(self, p, rxs):
796         for rx in rxs:
797             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
798             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
799             self.assert_packet_checksums_valid(rx)
800
801     def verify_encrypted(self, p, sa, rxs):
802         decrypt_pkts = []
803         for rx in rxs:
804             if p.nat_header:
805                 self.assertEqual(rx[UDP].dport, 4500)
806             self.assert_packet_checksums_valid(rx)
807             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
808             try:
809                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
810                 if not decrypt_pkt.haslayer(IP):
811                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
812                 decrypt_pkts.append(decrypt_pkt)
813                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
814                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
815             except:
816                 self.logger.debug(ppp("Unexpected packet:", rx))
817                 try:
818                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
819                 except:
820                     pass
821                 raise
822         pkts = reassemble4(decrypt_pkts)
823         for pkt in pkts:
824             self.assert_packet_checksums_valid(pkt)
825
826     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
827         self.vapi.cli("clear errors")
828         self.vapi.cli("clear ipsec counters")
829         if not n_rx:
830             n_rx = count
831         try:
832             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
833                                               src=p.remote_tun_if_host,
834                                               dst=self.pg1.remote_ip4,
835                                               count=count)
836             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
837             self.verify_decrypted(p, recv_pkts)
838
839             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
840                                       dst=p.remote_tun_if_host, count=count,
841                                       payload_size=payload_size)
842             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
843                                              self.tun_if, n_rx)
844             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
845
846         finally:
847             self.logger.info(self.vapi.ppcli("show error"))
848             self.logger.info(self.vapi.ppcli("show ipsec all"))
849
850         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
851         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
852         self.verify_counters4(p, count, n_rx)
853
854     """ verify methods for Transport v4 """
855     def verify_tun_44_bad_packet_sizes(self, p):
856         # with a buffer size of 2048, 1989 bytes of payload
857         # means there isn't space to insert the ESP header
858         N_PKTS = 63
859         for p_siz in [1989, 8500]:
860             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
861                                               src=p.remote_tun_if_host,
862                                               dst=self.pg1.remote_ip4,
863                                               count=N_PKTS,
864                                               payload_size=p_siz)
865             self.send_and_assert_no_replies(self.tun_if, send_pkts)
866             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
867                                       dst=p.remote_tun_if_host, count=N_PKTS,
868                                       payload_size=p_siz)
869             self.send_and_assert_no_replies(self.pg1, send_pkts,
870                                             self.tun_if)
871
872         # both large packets on decrpyt count against chained buffers
873         # the 9000 bytes one does on encrypt
874         self.assertEqual(2 * N_PKTS,
875                          self.statistics.get_err_counter(
876                              '/err/%s/chained buffers (packet dropped)' %
877                              self.tun4_decrypt_node_name))
878         self.assertEqual(N_PKTS,
879                          self.statistics.get_err_counter(
880                              '/err/%s/chained buffers (packet dropped)' %
881                              self.tun4_encrypt_node_name))
882
883         # on encrypt the 1989 size is no trailer space
884         self.assertEqual(N_PKTS,
885                          self.statistics.get_err_counter(
886                              '/err/%s/no trailer space (packet dropped)' %
887                              self.tun4_encrypt_node_name))
888
889     def verify_tun_reass_44(self, p):
890         self.vapi.cli("clear errors")
891         self.vapi.ip_reassembly_enable_disable(
892             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
893
894         try:
895             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
896                                               src=p.remote_tun_if_host,
897                                               dst=self.pg1.remote_ip4,
898                                               payload_size=1900,
899                                               count=1)
900             send_pkts = fragment_rfc791(send_pkts[0], 1400)
901             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
902                                              self.pg1, n_rx=1)
903             self.verify_decrypted(p, recv_pkts)
904
905             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
906                                       dst=p.remote_tun_if_host, count=1)
907             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
908                                              self.tun_if)
909             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
910
911         finally:
912             self.logger.info(self.vapi.ppcli("show error"))
913             self.logger.info(self.vapi.ppcli("show ipsec all"))
914
915         self.verify_counters4(p, 1, 1)
916         self.vapi.ip_reassembly_enable_disable(
917             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
918
919     def verify_tun_64(self, p, count=1):
920         self.vapi.cli("clear errors")
921         try:
922             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
923                                                src=p.remote_tun_if_host6,
924                                                dst=self.pg1.remote_ip6,
925                                                count=count)
926             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
927             for recv_pkt in recv_pkts:
928                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
929                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
930                 self.assert_packet_checksums_valid(recv_pkt)
931             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
932                                        dst=p.remote_tun_if_host6, count=count)
933             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
934             for recv_pkt in recv_pkts:
935                 try:
936                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
937                     if not decrypt_pkt.haslayer(IPv6):
938                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
939                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
940                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
941                     self.assert_packet_checksums_valid(decrypt_pkt)
942                 except:
943                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
944                     try:
945                         self.logger.debug(
946                             ppp("Decrypted packet:", decrypt_pkt))
947                     except:
948                         pass
949                     raise
950         finally:
951             self.logger.info(self.vapi.ppcli("show error"))
952             self.logger.info(self.vapi.ppcli("show ipsec all"))
953
954         self.verify_counters4(p, count)
955
956     def verify_keepalive(self, p):
957         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
958                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
959                UDP(sport=333, dport=4500) /
960                Raw(b'\xff'))
961         self.send_and_assert_no_replies(self.tun_if, pkt*31)
962         self.assert_error_counter_equal(
963             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
964
965         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
966                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
967                UDP(sport=333, dport=4500) /
968                Raw(b'\xfe'))
969         self.send_and_assert_no_replies(self.tun_if, pkt*31)
970         self.assert_error_counter_equal(
971             '/err/%s/Too Short' % self.tun4_input_node, 31)
972
973
974 class IpsecTun4Tests(IpsecTun4):
975     """ UT test methods for Tunnel v4 """
976     def test_tun_basic44(self):
977         """ ipsec 4o4 tunnel basic test """
978         self.verify_tun_44(self.params[socket.AF_INET], count=1)
979         self.tun_if.admin_down()
980         self.tun_if.resolve_arp()
981         self.tun_if.admin_up()
982         self.verify_tun_44(self.params[socket.AF_INET], count=1)
983
984     def test_tun_reass_basic44(self):
985         """ ipsec 4o4 tunnel basic reassembly test """
986         self.verify_tun_reass_44(self.params[socket.AF_INET])
987
988     def test_tun_burst44(self):
989         """ ipsec 4o4 tunnel burst test """
990         self.verify_tun_44(self.params[socket.AF_INET], count=127)
991
992
993 class IpsecTunEsp4Tests(IpsecTun4):
994     def test_tun_bad_packet_sizes(self):
995         """ ipsec v4 tunnel bad packet size """
996         self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
997
998
999 class IpsecTun6(object):
1000     """ verify methods for Tunnel v6 """
1001     def verify_counters6(self, p_in, p_out, count, worker=None):
1002         if (hasattr(p_in, "tun_sa_in")):
1003             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1004             self.assertEqual(pkts, count,
1005                              "incorrect SA in counts: expected %d != %d" %
1006                              (count, pkts))
1007         if (hasattr(p_out, "tun_sa_out")):
1008             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1009             self.assertEqual(pkts, count,
1010                              "incorrect SA out counts: expected %d != %d" %
1011                              (count, pkts))
1012         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1013         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1014
1015     def verify_decrypted6(self, p, rxs):
1016         for rx in rxs:
1017             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1018             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1019             self.assert_packet_checksums_valid(rx)
1020
1021     def verify_encrypted6(self, p, sa, rxs):
1022         for rx in rxs:
1023             self.assert_packet_checksums_valid(rx)
1024             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1025                              rx[IPv6].plen)
1026             try:
1027                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1028                 if not decrypt_pkt.haslayer(IPv6):
1029                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1030                 self.assert_packet_checksums_valid(decrypt_pkt)
1031                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1032                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1033             except:
1034                 self.logger.debug(ppp("Unexpected packet:", rx))
1035                 try:
1036                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1037                 except:
1038                     pass
1039                 raise
1040
1041     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1042         self.vapi.cli("clear errors")
1043         self.vapi.cli("clear ipsec sa")
1044
1045         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1046                                            src=p_in.remote_tun_if_host,
1047                                            dst=self.pg1.remote_ip6,
1048                                            count=count)
1049         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1050         self.logger.info(self.vapi.cli("sh punt stats"))
1051
1052     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1053         self.vapi.cli("clear errors")
1054         self.vapi.cli("clear ipsec sa")
1055         if not p_out:
1056             p_out = p_in
1057         try:
1058             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1059                                                src=p_in.remote_tun_if_host,
1060                                                dst=self.pg1.remote_ip6,
1061                                                count=count)
1062             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1063             self.verify_decrypted6(p_in, recv_pkts)
1064
1065             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1066                                        dst=p_out.remote_tun_if_host,
1067                                        count=count,
1068                                        payload_size=payload_size)
1069             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1070             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1071
1072         finally:
1073             self.logger.info(self.vapi.ppcli("show error"))
1074             self.logger.info(self.vapi.ppcli("show ipsec all"))
1075         self.verify_counters6(p_in, p_out, count)
1076
1077     def verify_tun_reass_66(self, p):
1078         self.vapi.cli("clear errors")
1079         self.vapi.ip_reassembly_enable_disable(
1080             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1081
1082         try:
1083             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1084                                                src=p.remote_tun_if_host,
1085                                                dst=self.pg1.remote_ip6,
1086                                                count=1,
1087                                                payload_size=1850)
1088             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1089             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1090                                              self.pg1, n_rx=1)
1091             self.verify_decrypted6(p, recv_pkts)
1092
1093             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1094                                        dst=p.remote_tun_if_host,
1095                                        count=1,
1096                                        payload_size=64)
1097             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1098                                              self.tun_if)
1099             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1100         finally:
1101             self.logger.info(self.vapi.ppcli("show error"))
1102             self.logger.info(self.vapi.ppcli("show ipsec all"))
1103         self.verify_counters6(p, p, 1)
1104         self.vapi.ip_reassembly_enable_disable(
1105             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1106
1107     def verify_tun_46(self, p, count=1):
1108         """ ipsec 4o6 tunnel basic test """
1109         self.vapi.cli("clear errors")
1110         try:
1111             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1112                                               src=p.remote_tun_if_host4,
1113                                               dst=self.pg1.remote_ip4,
1114                                               count=count)
1115             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1116             for recv_pkt in recv_pkts:
1117                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1118                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1119                 self.assert_packet_checksums_valid(recv_pkt)
1120             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1121                                       dst=p.remote_tun_if_host4,
1122                                       count=count)
1123             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1124             for recv_pkt in recv_pkts:
1125                 try:
1126                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1127                     if not decrypt_pkt.haslayer(IP):
1128                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1129                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1130                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1131                     self.assert_packet_checksums_valid(decrypt_pkt)
1132                 except:
1133                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1134                     try:
1135                         self.logger.debug(ppp("Decrypted packet:",
1136                                               decrypt_pkt))
1137                     except:
1138                         pass
1139                     raise
1140         finally:
1141             self.logger.info(self.vapi.ppcli("show error"))
1142             self.logger.info(self.vapi.ppcli("show ipsec all"))
1143         self.verify_counters6(p, p, count)
1144
1145
1146 class IpsecTun6Tests(IpsecTun6):
1147     """ UT test methods for Tunnel v6 """
1148
1149     def test_tun_basic66(self):
1150         """ ipsec 6o6 tunnel basic test """
1151         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1152
1153     def test_tun_reass_basic66(self):
1154         """ ipsec 6o6 tunnel basic reassembly test """
1155         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1156
1157     def test_tun_burst66(self):
1158         """ ipsec 6o6 tunnel burst test """
1159         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1160
1161
1162 class IpsecTun6HandoffTests(IpsecTun6):
1163     """ UT test methods for Tunnel v6 with multiple workers """
1164     worker_config = "workers 2"
1165
1166     def test_tun_handoff_66(self):
1167         """ ipsec 6o6 tunnel worker hand-off test """
1168         N_PKTS = 15
1169         p = self.params[socket.AF_INET6]
1170
1171         # inject alternately on worker 0 and 1. all counts on the SA
1172         # should be against worker 0
1173         for worker in [0, 1, 0, 1]:
1174             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1175                                                src=p.remote_tun_if_host,
1176                                                dst=self.pg1.remote_ip6,
1177                                                count=N_PKTS)
1178             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1179                                              self.pg1, worker=worker)
1180             self.verify_decrypted6(p, recv_pkts)
1181
1182             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1183                                        dst=p.remote_tun_if_host,
1184                                        count=N_PKTS)
1185             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1186                                              self.tun_if, worker=worker)
1187             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1188
1189         # all counts against the first worker that was used
1190         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1191
1192
1193 class IpsecTun4HandoffTests(IpsecTun4):
1194     """ UT test methods for Tunnel v4 with multiple workers """
1195     worker_config = "workers 2"
1196
1197     def test_tun_handooff_44(self):
1198         """ ipsec 4o4 tunnel worker hand-off test """
1199         N_PKTS = 15
1200         p = self.params[socket.AF_INET]
1201
1202         # inject alternately on worker 0 and 1. all counts on the SA
1203         # should be against worker 0
1204         for worker in [0, 1, 0, 1]:
1205             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1206                                               src=p.remote_tun_if_host,
1207                                               dst=self.pg1.remote_ip4,
1208                                               count=N_PKTS)
1209             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1210                                              self.pg1, worker=worker)
1211             self.verify_decrypted(p, recv_pkts)
1212
1213             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1214                                       dst=p.remote_tun_if_host,
1215                                       count=N_PKTS)
1216             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1217                                              self.tun_if, worker=worker)
1218             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1219
1220         # all counts against the first worker that was used
1221         self.verify_counters4(p, 4*N_PKTS, worker=0)
1222
1223
1224 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1225     """ UT test methods for Tunnel v6 & v4 """
1226     pass
1227
1228
1229 if __name__ == '__main__':
1230     unittest.main(testRunner=VppTestRunner)