ipsec: add support for AES CTR
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params:
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 100
31         self.scapy_tun_spi = 1000
32         self.vpp_tun_sa_id = 200
33         self.vpp_tun_spi = 2000
34
35         self.scapy_tra_sa_id = 300
36         self.scapy_tra_spi = 3000
37         self.vpp_tra_sa_id = 400
38         self.vpp_tra_spi = 4000
39
40         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41                                  IPSEC_API_INTEG_ALG_SHA1_96)
42         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
43         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
44
45         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47         self.crypt_algo = 'AES-CBC'  # scapy name
48         self.crypt_key = b'JPjyOWBeVEQiMe7h'
49         self.salt = 0
50         self.flags = 0
51         self.nat_header = None
52         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
53                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
54         self.dscp = 0
55
56
57 class IPsecIPv6Params:
58
59     addr_type = socket.AF_INET6
60     addr_any = "0::0"
61     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
62     addr_len = 128
63     is_ipv6 = 1
64
65     def __init__(self):
66         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
67         self.remote_tun_if_host4 = '1.1.1.1'
68
69         self.scapy_tun_sa_id = 500
70         self.scapy_tun_spi = 3001
71         self.vpp_tun_sa_id = 600
72         self.vpp_tun_spi = 3000
73
74         self.scapy_tra_sa_id = 700
75         self.scapy_tra_spi = 4001
76         self.vpp_tra_sa_id = 800
77         self.vpp_tra_spi = 4000
78
79         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
80                                  IPSEC_API_INTEG_ALG_SHA1_96)
81         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
82         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
83
84         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
85                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
86         self.crypt_algo = 'AES-CBC'  # scapy name
87         self.crypt_key = b'JPjyOWBeVEQiMe7h'
88         self.salt = 0
89         self.flags = 0
90         self.nat_header = None
91         self.tun_flags = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
92                           TUNNEL_API_ENCAP_DECAP_FLAG_NONE)
93         self.dscp = 0
94
95
96 def mk_scapy_crypt_key(p):
97     if p.crypt_algo in ("AES-GCM", "AES-CTR"):
98         return p.crypt_key + struct.pack("!I", p.salt)
99     else:
100         return p.crypt_key
101
102
103 def config_tun_params(p, encryption_type, tun_if):
104     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
105     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
106                              IPSEC_API_SAD_FLAG_USE_ESN))
107     p.tun_dst = tun_if.remote_addr[p.addr_type]
108     p.tun_src = tun_if.local_addr[p.addr_type]
109     crypt_key = mk_scapy_crypt_key(p)
110     p.scapy_tun_sa = SecurityAssociation(
111         encryption_type, spi=p.vpp_tun_spi,
112         crypt_algo=p.crypt_algo,
113         crypt_key=crypt_key,
114         auth_algo=p.auth_algo, auth_key=p.auth_key,
115         tunnel_header=ip_class_by_addr_type[p.addr_type](
116             src=p.tun_dst,
117             dst=p.tun_src),
118         nat_t_header=p.nat_header,
119         esn_en=esn_en)
120     p.vpp_tun_sa = SecurityAssociation(
121         encryption_type, spi=p.scapy_tun_spi,
122         crypt_algo=p.crypt_algo,
123         crypt_key=crypt_key,
124         auth_algo=p.auth_algo, auth_key=p.auth_key,
125         tunnel_header=ip_class_by_addr_type[p.addr_type](
126             dst=p.tun_dst,
127             src=p.tun_src),
128         nat_t_header=p.nat_header,
129         esn_en=esn_en)
130
131
132 def config_tra_params(p, encryption_type):
133     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
134                              IPSEC_API_SAD_FLAG_USE_ESN))
135     crypt_key = mk_scapy_crypt_key(p)
136     p.scapy_tra_sa = SecurityAssociation(
137         encryption_type,
138         spi=p.vpp_tra_spi,
139         crypt_algo=p.crypt_algo,
140         crypt_key=crypt_key,
141         auth_algo=p.auth_algo,
142         auth_key=p.auth_key,
143         nat_t_header=p.nat_header,
144         esn_en=esn_en)
145     p.vpp_tra_sa = SecurityAssociation(
146         encryption_type,
147         spi=p.scapy_tra_spi,
148         crypt_algo=p.crypt_algo,
149         crypt_key=crypt_key,
150         auth_algo=p.auth_algo,
151         auth_key=p.auth_key,
152         nat_t_header=p.nat_header,
153         esn_en=esn_en)
154
155
156 class TemplateIpsec(VppTestCase):
157     """
158     TRANSPORT MODE:
159
160      ------   encrypt   ---
161     |tra_if| <-------> |VPP|
162      ------   decrypt   ---
163
164     TUNNEL MODE:
165
166      ------   encrypt   ---   plain   ---
167     |tun_if| <-------  |VPP| <------ |pg1|
168      ------             ---           ---
169
170      ------   decrypt   ---   plain   ---
171     |tun_if| ------->  |VPP| ------> |pg1|
172      ------             ---           ---
173     """
174     tun_spd_id = 1
175     tra_spd_id = 2
176
177     def ipsec_select_backend(self):
178         """ empty method to be overloaded when necessary """
179         pass
180
181     @classmethod
182     def setUpClass(cls):
183         super(TemplateIpsec, cls).setUpClass()
184
185     @classmethod
186     def tearDownClass(cls):
187         super(TemplateIpsec, cls).tearDownClass()
188
189     def setup_params(self):
190         if not hasattr(self, 'ipv4_params'):
191             self.ipv4_params = IPsecIPv4Params()
192         if not hasattr(self, 'ipv6_params'):
193             self.ipv6_params = IPsecIPv6Params()
194         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
195                        self.ipv6_params.addr_type: self.ipv6_params}
196
197     def config_interfaces(self):
198         self.create_pg_interfaces(range(3))
199         self.interfaces = list(self.pg_interfaces)
200         for i in self.interfaces:
201             i.admin_up()
202             i.config_ip4()
203             i.resolve_arp()
204             i.config_ip6()
205             i.resolve_ndp()
206
207     def setUp(self):
208         super(TemplateIpsec, self).setUp()
209
210         self.setup_params()
211
212         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
213                                  IPSEC_API_PROTO_ESP)
214         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
215                                 IPSEC_API_PROTO_AH)
216
217         self.config_interfaces()
218
219         self.ipsec_select_backend()
220
221     def unconfig_interfaces(self):
222         for i in self.interfaces:
223             i.admin_down()
224             i.unconfig_ip4()
225             i.unconfig_ip6()
226
227     def tearDown(self):
228         super(TemplateIpsec, self).tearDown()
229
230         self.unconfig_interfaces()
231
232     def show_commands_at_teardown(self):
233         self.logger.info(self.vapi.cli("show hardware"))
234
235     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
236                          payload_size=54):
237         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
238                 sa.encrypt(IP(src=src, dst=dst) /
239                            ICMP() / Raw(b'X' * payload_size))
240                 for i in range(count)]
241
242     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
243                           payload_size=54):
244         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
245                 sa.encrypt(IPv6(src=src, dst=dst) /
246                            ICMPv6EchoRequest(id=0, seq=1,
247                                              data='X' * payload_size))
248                 for i in range(count)]
249
250     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
251         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
252                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
253                 for i in range(count)]
254
255     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
256         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
257                 IPv6(src=src, dst=dst) /
258                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
259                 for i in range(count)]
260
261
262 class IpsecTcp(object):
263     def verify_tcp_checksum(self):
264         self.vapi.cli("test http server")
265         p = self.params[socket.AF_INET]
266         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
267                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
268                                           dst=self.tun_if.local_ip4) /
269                                        TCP(flags='S', dport=80)))
270         self.logger.debug(ppp("Sending packet:", send))
271         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
272         recv = recv[0]
273         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
274         self.assert_packet_checksums_valid(decrypted)
275
276
277 class IpsecTcpTests(IpsecTcp):
278     def test_tcp_checksum(self):
279         """ verify checksum correctness for vpp generated packets """
280         self.verify_tcp_checksum()
281
282
283 class IpsecTra4(object):
284     """ verify methods for Transport v4 """
285     def verify_tra_anti_replay(self):
286         p = self.params[socket.AF_INET]
287         esn_en = p.vpp_tra_sa.esn_en
288
289         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
290                                self.tra4_encrypt_node_name)
291         replay_node_name = ('/err/%s/SA replayed packet' %
292                             self.tra4_decrypt_node_name)
293         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
294             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
295                                      self.tra4_decrypt_node_name)
296         else:
297             hash_failed_node_name = ('/err/%s/Integrity check failed' %
298                                      self.tra4_decrypt_node_name)
299         replay_count = self.statistics.get_err_counter(replay_node_name)
300         hash_failed_count = self.statistics.get_err_counter(
301             hash_failed_node_name)
302         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
303
304         if ESP == self.encryption_type:
305             undersize_node_name = ('/err/%s/undersized packet' %
306                                    self.tra4_decrypt_node_name)
307             undersize_count = self.statistics.get_err_counter(
308                 undersize_node_name)
309
310         #
311         # send packets with seq numbers 1->34
312         # this means the window size is still in Case B (see RFC4303
313         # Appendix A)
314         #
315         # for reasons i haven't investigated Scapy won't create a packet with
316         # seq_num=0
317         #
318         pkts = [(Ether(src=self.tra_if.remote_mac,
319                        dst=self.tra_if.local_mac) /
320                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
321                                            dst=self.tra_if.local_ip4) /
322                                         ICMP(),
323                                         seq_num=seq))
324                 for seq in range(1, 34)]
325         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
326
327         # replayed packets are dropped
328         self.send_and_assert_no_replies(self.tra_if, pkts)
329         replay_count += len(pkts)
330         self.assert_error_counter_equal(replay_node_name, replay_count)
331
332         #
333         # now send a batch of packets all with the same sequence number
334         # the first packet in the batch is legitimate, the rest bogus
335         #
336         pkts = (Ether(src=self.tra_if.remote_mac,
337                       dst=self.tra_if.local_mac) /
338                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
339                                           dst=self.tra_if.local_ip4) /
340                                        ICMP(),
341                                        seq_num=35))
342         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
343                                          self.tra_if, n_rx=1)
344         replay_count += 7
345         self.assert_error_counter_equal(replay_node_name, replay_count)
346
347         #
348         # now move the window over to 257 (more than one byte) and into Case A
349         #
350         pkt = (Ether(src=self.tra_if.remote_mac,
351                      dst=self.tra_if.local_mac) /
352                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
353                                          dst=self.tra_if.local_ip4) /
354                                       ICMP(),
355                                       seq_num=257))
356         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
357
358         # replayed packets are dropped
359         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
360         replay_count += 3
361         self.assert_error_counter_equal(replay_node_name, replay_count)
362
363         # the window size is 64 packets
364         # in window are still accepted
365         pkt = (Ether(src=self.tra_if.remote_mac,
366                      dst=self.tra_if.local_mac) /
367                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
368                                          dst=self.tra_if.local_ip4) /
369                                       ICMP(),
370                                       seq_num=200))
371         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
372
373         # a packet that does not decrypt does not move the window forward
374         bogus_sa = SecurityAssociation(self.encryption_type,
375                                        p.vpp_tra_spi,
376                                        crypt_algo=p.crypt_algo,
377                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
378                                        auth_algo=p.auth_algo,
379                                        auth_key=p.auth_key[::-1])
380         pkt = (Ether(src=self.tra_if.remote_mac,
381                      dst=self.tra_if.local_mac) /
382                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
383                                    dst=self.tra_if.local_ip4) /
384                                 ICMP(),
385                                 seq_num=350))
386         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
387
388         hash_failed_count += 17
389         self.assert_error_counter_equal(hash_failed_node_name,
390                                         hash_failed_count)
391
392         # a malformed 'runt' packet
393         #  created by a mis-constructed SA
394         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
395             bogus_sa = SecurityAssociation(self.encryption_type,
396                                            p.vpp_tra_spi)
397             pkt = (Ether(src=self.tra_if.remote_mac,
398                          dst=self.tra_if.local_mac) /
399                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
400                                        dst=self.tra_if.local_ip4) /
401                                     ICMP(),
402                                     seq_num=350))
403             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
404
405             undersize_count += 17
406             self.assert_error_counter_equal(undersize_node_name,
407                                             undersize_count)
408
409         # which we can determine since this packet is still in the window
410         pkt = (Ether(src=self.tra_if.remote_mac,
411                      dst=self.tra_if.local_mac) /
412                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
413                                          dst=self.tra_if.local_ip4) /
414                                       ICMP(),
415                                       seq_num=234))
416         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
417
418         #
419         # out of window are dropped
420         #  this is Case B. So VPP will consider this to be a high seq num wrap
421         #  and so the decrypt attempt will fail
422         #
423         pkt = (Ether(src=self.tra_if.remote_mac,
424                      dst=self.tra_if.local_mac) /
425                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
426                                          dst=self.tra_if.local_ip4) /
427                                       ICMP(),
428                                       seq_num=17))
429         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
430
431         if esn_en:
432             # an out of window error with ESN looks like a high sequence
433             # wrap. but since it isn't then the verify will fail.
434             hash_failed_count += 17
435             self.assert_error_counter_equal(hash_failed_node_name,
436                                             hash_failed_count)
437
438         else:
439             replay_count += 17
440             self.assert_error_counter_equal(replay_node_name,
441                                             replay_count)
442
443         # valid packet moves the window over to 258
444         pkt = (Ether(src=self.tra_if.remote_mac,
445                      dst=self.tra_if.local_mac) /
446                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
447                                          dst=self.tra_if.local_ip4) /
448                                       ICMP(),
449                                       seq_num=258))
450         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
451         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
452
453         #
454         # move VPP's SA TX seq-num to just before the seq-number wrap.
455         # then fire in a packet that VPP should drop on TX because it
456         # causes the TX seq number to wrap; unless we're using extened sequence
457         # numbers.
458         #
459         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
460         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
461         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
462
463         pkts = [(Ether(src=self.tra_if.remote_mac,
464                        dst=self.tra_if.local_mac) /
465                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
466                                            dst=self.tra_if.local_ip4) /
467                                         ICMP(),
468                                         seq_num=seq))
469                 for seq in range(259, 280)]
470
471         if esn_en:
472             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
473
474             #
475             # in order for scapy to decrypt its SA's high order number needs
476             # to wrap
477             #
478             p.vpp_tra_sa.seq_num = 0x100000000
479             for rx in rxs:
480                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
481
482             #
483             # wrap scapy's TX high sequence number. VPP is in case B, so it
484             # will consider this a high seq wrap also.
485             # The low seq num we set it to will place VPP's RX window in Case A
486             #
487             p.scapy_tra_sa.seq_num = 0x100000005
488             pkt = (Ether(src=self.tra_if.remote_mac,
489                          dst=self.tra_if.local_mac) /
490                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
491                                              dst=self.tra_if.local_ip4) /
492                                           ICMP(),
493                                           seq_num=0x100000005))
494             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
495             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
496
497             #
498             # A packet that has seq num between (2^32-64) and 5 is within
499             # the window
500             #
501             p.scapy_tra_sa.seq_num = 0xfffffffd
502             pkt = (Ether(src=self.tra_if.remote_mac,
503                          dst=self.tra_if.local_mac) /
504                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
505                                              dst=self.tra_if.local_ip4) /
506                                           ICMP(),
507                                           seq_num=0xfffffffd))
508             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
509             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
510
511             #
512             # While in case A we cannot wrap the high sequence number again
513             # becuase VPP will consider this packet to be one that moves the
514             # window forward
515             #
516             pkt = (Ether(src=self.tra_if.remote_mac,
517                          dst=self.tra_if.local_mac) /
518                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
519                                              dst=self.tra_if.local_ip4) /
520                                           ICMP(),
521                                           seq_num=0x200000999))
522             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
523
524             hash_failed_count += 1
525             self.assert_error_counter_equal(hash_failed_node_name,
526                                             hash_failed_count)
527
528             #
529             # but if we move the wondow forward to case B, then we can wrap
530             # again
531             #
532             p.scapy_tra_sa.seq_num = 0x100000555
533             pkt = (Ether(src=self.tra_if.remote_mac,
534                          dst=self.tra_if.local_mac) /
535                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
536                                              dst=self.tra_if.local_ip4) /
537                                           ICMP(),
538                                           seq_num=0x100000555))
539             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
540             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
541
542             p.scapy_tra_sa.seq_num = 0x200000444
543             pkt = (Ether(src=self.tra_if.remote_mac,
544                          dst=self.tra_if.local_mac) /
545                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
546                                              dst=self.tra_if.local_ip4) /
547                                           ICMP(),
548                                           seq_num=0x200000444))
549             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
550             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
551
552         else:
553             #
554             # without ESN TX sequence numbers can't wrap and packets are
555             # dropped from here on out.
556             #
557             self.send_and_assert_no_replies(self.tra_if, pkts)
558             seq_cycle_count += len(pkts)
559             self.assert_error_counter_equal(seq_cycle_node_name,
560                                             seq_cycle_count)
561
562         # move the security-associations seq number on to the last we used
563         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
564         p.scapy_tra_sa.seq_num = 351
565         p.vpp_tra_sa.seq_num = 351
566
567     def verify_tra_basic4(self, count=1, payload_size=54):
568         """ ipsec v4 transport basic test """
569         self.vapi.cli("clear errors")
570         self.vapi.cli("clear ipsec sa")
571         try:
572             p = self.params[socket.AF_INET]
573             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
574                                               src=self.tra_if.remote_ip4,
575                                               dst=self.tra_if.local_ip4,
576                                               count=count,
577                                               payload_size=payload_size)
578             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
579                                              self.tra_if)
580             for rx in recv_pkts:
581                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
582                 self.assert_packet_checksums_valid(rx)
583                 try:
584                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
585                     self.assert_packet_checksums_valid(decrypted)
586                 except:
587                     self.logger.debug(ppp("Unexpected packet:", rx))
588                     raise
589         finally:
590             self.logger.info(self.vapi.ppcli("show error"))
591             self.logger.info(self.vapi.ppcli("show ipsec all"))
592
593         pkts = p.tra_sa_in.get_stats()['packets']
594         self.assertEqual(pkts, count,
595                          "incorrect SA in counts: expected %d != %d" %
596                          (count, pkts))
597         pkts = p.tra_sa_out.get_stats()['packets']
598         self.assertEqual(pkts, count,
599                          "incorrect SA out counts: expected %d != %d" %
600                          (count, pkts))
601
602         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
603         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
604
605
606 class IpsecTra4Tests(IpsecTra4):
607     """ UT test methods for Transport v4 """
608     def test_tra_anti_replay(self):
609         """ ipsec v4 transport anti-replay test """
610         self.verify_tra_anti_replay()
611
612     def test_tra_basic(self, count=1):
613         """ ipsec v4 transport basic test """
614         self.verify_tra_basic4(count=1)
615
616     def test_tra_burst(self):
617         """ ipsec v4 transport burst test """
618         self.verify_tra_basic4(count=257)
619
620
621 class IpsecTra6(object):
622     """ verify methods for Transport v6 """
623     def verify_tra_basic6(self, count=1, payload_size=54):
624         self.vapi.cli("clear errors")
625         self.vapi.cli("clear ipsec sa")
626         try:
627             p = self.params[socket.AF_INET6]
628             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
629                                                src=self.tra_if.remote_ip6,
630                                                dst=self.tra_if.local_ip6,
631                                                count=count,
632                                                payload_size=payload_size)
633             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
634                                              self.tra_if)
635             for rx in recv_pkts:
636                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
637                                  rx[IPv6].plen)
638                 try:
639                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
640                     self.assert_packet_checksums_valid(decrypted)
641                 except:
642                     self.logger.debug(ppp("Unexpected packet:", rx))
643                     raise
644         finally:
645             self.logger.info(self.vapi.ppcli("show error"))
646             self.logger.info(self.vapi.ppcli("show ipsec all"))
647
648         pkts = p.tra_sa_in.get_stats()['packets']
649         self.assertEqual(pkts, count,
650                          "incorrect SA in counts: expected %d != %d" %
651                          (count, pkts))
652         pkts = p.tra_sa_out.get_stats()['packets']
653         self.assertEqual(pkts, count,
654                          "incorrect SA out counts: expected %d != %d" %
655                          (count, pkts))
656         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
657         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
658
659     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
660                                    payload_size=54):
661         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
662                 sa.encrypt(IPv6(src=src, dst=dst) /
663                            ICMPv6EchoRequest(id=0, seq=1,
664                                              data='X' * payload_size))
665                 for i in range(count)]
666
667     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
668         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
669                 IPv6(src=src, dst=dst) /
670                 IPv6ExtHdrHopByHop() /
671                 IPv6ExtHdrFragment(id=2, offset=200) /
672                 Raw(b'\xff' * 200)
673                 for i in range(count)]
674
675     def verify_tra_encrypted6(self, p, sa, rxs):
676         decrypted = []
677         for rx in rxs:
678             self.assert_packet_checksums_valid(rx)
679             try:
680                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
681                 decrypted.append(decrypt_pkt)
682                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
683                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
684             except:
685                 self.logger.debug(ppp("Unexpected packet:", rx))
686                 try:
687                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
688                 except:
689                     pass
690                 raise
691         return decrypted
692
693     def verify_tra_66_ext_hdrs(self, p):
694         count = 63
695
696         #
697         # check we can decrypt with options
698         #
699         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
700                                              src=self.tra_if.remote_ip6,
701                                              dst=self.tra_if.local_ip6,
702                                              count=count)
703         self.send_and_expect(self.tra_if, tx, self.tra_if)
704
705         #
706         # injecting a packet from ourselves to be routed of box is a hack
707         # but it matches an outbout policy, alors je ne regrette rien
708         #
709
710         # one extension before ESP
711         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
712               IPv6(src=self.tra_if.local_ip6,
713                    dst=self.tra_if.remote_ip6) /
714               IPv6ExtHdrFragment(id=2, offset=200) /
715               Raw(b'\xff' * 200))
716
717         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
718         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
719
720         for dc in dcs:
721             # for reasons i'm not going to investigate scapy does not
722             # created the correct headers after decrypt. but reparsing
723             # the ipv6 packet fixes it
724             dc = IPv6(raw(dc[IPv6]))
725             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
726
727         # two extensions before ESP
728         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
729               IPv6(src=self.tra_if.local_ip6,
730                    dst=self.tra_if.remote_ip6) /
731               IPv6ExtHdrHopByHop() /
732               IPv6ExtHdrFragment(id=2, offset=200) /
733               Raw(b'\xff' * 200))
734
735         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
736         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
737
738         for dc in dcs:
739             dc = IPv6(raw(dc[IPv6]))
740             self.assertTrue(dc[IPv6ExtHdrHopByHop])
741             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
742
743         # two extensions before ESP, one after
744         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
745               IPv6(src=self.tra_if.local_ip6,
746                    dst=self.tra_if.remote_ip6) /
747               IPv6ExtHdrHopByHop() /
748               IPv6ExtHdrFragment(id=2, offset=200) /
749               IPv6ExtHdrDestOpt() /
750               Raw(b'\xff' * 200))
751
752         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
753         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
754
755         for dc in dcs:
756             dc = IPv6(raw(dc[IPv6]))
757             self.assertTrue(dc[IPv6ExtHdrDestOpt])
758             self.assertTrue(dc[IPv6ExtHdrHopByHop])
759             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
760
761
762 class IpsecTra6Tests(IpsecTra6):
763     """ UT test methods for Transport v6 """
764     def test_tra_basic6(self):
765         """ ipsec v6 transport basic test """
766         self.verify_tra_basic6(count=1)
767
768     def test_tra_burst6(self):
769         """ ipsec v6 transport burst test """
770         self.verify_tra_basic6(count=257)
771
772
773 class IpsecTra6ExtTests(IpsecTra6):
774     def test_tra_ext_hdrs_66(self):
775         """ ipsec 6o6 tra extension headers test """
776         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
777
778
779 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
780     """ UT test methods for Transport v6 and v4"""
781     pass
782
783
784 class IpsecTun4(object):
785     """ verify methods for Tunnel v4 """
786     def verify_counters4(self, p, count, n_frags=None, worker=None):
787         if not n_frags:
788             n_frags = count
789         if (hasattr(p, "spd_policy_in_any")):
790             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
791             self.assertEqual(pkts, count,
792                              "incorrect SPD any policy: expected %d != %d" %
793                              (count, pkts))
794
795         if (hasattr(p, "tun_sa_in")):
796             pkts = p.tun_sa_in.get_stats(worker)['packets']
797             self.assertEqual(pkts, count,
798                              "incorrect SA in counts: expected %d != %d" %
799                              (count, pkts))
800             pkts = p.tun_sa_out.get_stats(worker)['packets']
801             self.assertEqual(pkts, n_frags,
802                              "incorrect SA out counts: expected %d != %d" %
803                              (count, pkts))
804
805         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
806         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
807
808     def verify_decrypted(self, p, rxs):
809         for rx in rxs:
810             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
811             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
812             self.assert_packet_checksums_valid(rx)
813
814     def verify_esp_padding(self, sa, esp_payload, decrypt_pkt):
815         align = sa.crypt_algo.block_size
816         if align < 4:
817             align = 4
818         exp_len = (len(decrypt_pkt) + 2 + (align - 1)) & ~(align - 1)
819         exp_len += sa.crypt_algo.iv_size
820         exp_len += sa.crypt_algo.icv_size or sa.auth_algo.icv_size
821         self.assertEqual(exp_len, len(esp_payload))
822
823     def verify_encrypted(self, p, sa, rxs):
824         decrypt_pkts = []
825         for rx in rxs:
826             if p.nat_header:
827                 self.assertEqual(rx[UDP].dport, 4500)
828             self.assert_packet_checksums_valid(rx)
829             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
830             try:
831                 rx_ip = rx[IP]
832                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
833                 if not decrypt_pkt.haslayer(IP):
834                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
835                 if rx_ip.proto == socket.IPPROTO_ESP:
836                     self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
837                 decrypt_pkts.append(decrypt_pkt)
838                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
839                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
840             except:
841                 self.logger.debug(ppp("Unexpected packet:", rx))
842                 try:
843                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
844                 except:
845                     pass
846                 raise
847         pkts = reassemble4(decrypt_pkts)
848         for pkt in pkts:
849             self.assert_packet_checksums_valid(pkt)
850
851     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
852         self.vapi.cli("clear errors")
853         self.vapi.cli("clear ipsec counters")
854         self.vapi.cli("clear ipsec sa")
855         if not n_rx:
856             n_rx = count
857         try:
858             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
859                                               src=p.remote_tun_if_host,
860                                               dst=self.pg1.remote_ip4,
861                                               count=count,
862                                               payload_size=payload_size)
863             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
864             self.verify_decrypted(p, recv_pkts)
865
866             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
867                                       dst=p.remote_tun_if_host, count=count,
868                                       payload_size=payload_size)
869             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
870                                              self.tun_if, n_rx)
871             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
872
873             for rx in recv_pkts:
874                 self.assertEqual(rx[IP].src, p.tun_src)
875                 self.assertEqual(rx[IP].dst, p.tun_dst)
876
877         finally:
878             self.logger.info(self.vapi.ppcli("show error"))
879             self.logger.info(self.vapi.ppcli("show ipsec all"))
880
881         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
882         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
883         self.verify_counters4(p, count, n_rx)
884
885     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
886         self.vapi.cli("clear errors")
887         if not n_rx:
888             n_rx = count
889         try:
890             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
891                                               src=p.remote_tun_if_host,
892                                               dst=self.pg1.remote_ip4,
893                                               count=count)
894             self.send_and_assert_no_replies(self.tun_if, send_pkts)
895
896             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
897                                       dst=p.remote_tun_if_host, count=count,
898                                       payload_size=payload_size)
899             self.send_and_assert_no_replies(self.pg1, send_pkts)
900
901         finally:
902             self.logger.info(self.vapi.ppcli("show error"))
903             self.logger.info(self.vapi.ppcli("show ipsec all"))
904
905     def verify_tun_reass_44(self, p):
906         self.vapi.cli("clear errors")
907         self.vapi.ip_reassembly_enable_disable(
908             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
909
910         try:
911             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
912                                               src=p.remote_tun_if_host,
913                                               dst=self.pg1.remote_ip4,
914                                               payload_size=1900,
915                                               count=1)
916             send_pkts = fragment_rfc791(send_pkts[0], 1400)
917             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
918                                              self.pg1, n_rx=1)
919             self.verify_decrypted(p, recv_pkts)
920
921             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
922                                       dst=p.remote_tun_if_host, count=1)
923             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
924                                              self.tun_if)
925             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
926
927         finally:
928             self.logger.info(self.vapi.ppcli("show error"))
929             self.logger.info(self.vapi.ppcli("show ipsec all"))
930
931         self.verify_counters4(p, 1, 1)
932         self.vapi.ip_reassembly_enable_disable(
933             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
934
935     def verify_tun_64(self, p, count=1):
936         self.vapi.cli("clear errors")
937         self.vapi.cli("clear ipsec sa")
938         try:
939             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
940                                                src=p.remote_tun_if_host6,
941                                                dst=self.pg1.remote_ip6,
942                                                count=count)
943             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
944             for recv_pkt in recv_pkts:
945                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
946                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
947                 self.assert_packet_checksums_valid(recv_pkt)
948             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
949                                        dst=p.remote_tun_if_host6, count=count)
950             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
951             for recv_pkt in recv_pkts:
952                 try:
953                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
954                     if not decrypt_pkt.haslayer(IPv6):
955                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
956                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
957                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
958                     self.assert_packet_checksums_valid(decrypt_pkt)
959                 except:
960                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
961                     try:
962                         self.logger.debug(
963                             ppp("Decrypted packet:", decrypt_pkt))
964                     except:
965                         pass
966                     raise
967         finally:
968             self.logger.info(self.vapi.ppcli("show error"))
969             self.logger.info(self.vapi.ppcli("show ipsec all"))
970
971         self.verify_counters4(p, count)
972
973     def verify_keepalive(self, p):
974         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
975                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
976                UDP(sport=333, dport=4500) /
977                Raw(b'\xff'))
978         self.send_and_assert_no_replies(self.tun_if, pkt*31)
979         self.assert_error_counter_equal(
980             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
981
982         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
983                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
984                UDP(sport=333, dport=4500) /
985                Raw(b'\xfe'))
986         self.send_and_assert_no_replies(self.tun_if, pkt*31)
987         self.assert_error_counter_equal(
988             '/err/%s/Too Short' % self.tun4_input_node, 31)
989
990
991 class IpsecTun4Tests(IpsecTun4):
992     """ UT test methods for Tunnel v4 """
993     def test_tun_basic44(self):
994         """ ipsec 4o4 tunnel basic test """
995         self.verify_tun_44(self.params[socket.AF_INET], count=1)
996         self.tun_if.admin_down()
997         self.tun_if.resolve_arp()
998         self.tun_if.admin_up()
999         self.verify_tun_44(self.params[socket.AF_INET], count=1)
1000
1001     def test_tun_reass_basic44(self):
1002         """ ipsec 4o4 tunnel basic reassembly test """
1003         self.verify_tun_reass_44(self.params[socket.AF_INET])
1004
1005     def test_tun_burst44(self):
1006         """ ipsec 4o4 tunnel burst test """
1007         self.verify_tun_44(self.params[socket.AF_INET], count=127)
1008
1009
1010 class IpsecTun6(object):
1011     """ verify methods for Tunnel v6 """
1012     def verify_counters6(self, p_in, p_out, count, worker=None):
1013         if (hasattr(p_in, "tun_sa_in")):
1014             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
1015             self.assertEqual(pkts, count,
1016                              "incorrect SA in counts: expected %d != %d" %
1017                              (count, pkts))
1018         if (hasattr(p_out, "tun_sa_out")):
1019             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1020             self.assertEqual(pkts, count,
1021                              "incorrect SA out counts: expected %d != %d" %
1022                              (count, pkts))
1023         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1024         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1025
1026     def verify_decrypted6(self, p, rxs):
1027         for rx in rxs:
1028             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1029             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1030             self.assert_packet_checksums_valid(rx)
1031
1032     def verify_encrypted6(self, p, sa, rxs):
1033         for rx in rxs:
1034             self.assert_packet_checksums_valid(rx)
1035             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1036                              rx[IPv6].plen)
1037             try:
1038                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1039                 if not decrypt_pkt.haslayer(IPv6):
1040                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1041                 self.assert_packet_checksums_valid(decrypt_pkt)
1042                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1043                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1044             except:
1045                 self.logger.debug(ppp("Unexpected packet:", rx))
1046                 try:
1047                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1048                 except:
1049                     pass
1050                 raise
1051
1052     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1053         self.vapi.cli("clear errors")
1054         self.vapi.cli("clear ipsec sa")
1055
1056         send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1057                                            self.tun_if,
1058                                            src=p_in.remote_tun_if_host,
1059                                            dst=self.pg1.remote_ip6,
1060                                            count=count)
1061         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1062         self.logger.info(self.vapi.cli("sh punt stats"))
1063
1064     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1065         self.vapi.cli("clear errors")
1066         self.vapi.cli("clear ipsec sa")
1067         if not p_out:
1068             p_out = p_in
1069         try:
1070             send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1071                                                self.tun_if,
1072                                                src=p_in.remote_tun_if_host,
1073                                                dst=self.pg1.remote_ip6,
1074                                                count=count,
1075                                                payload_size=payload_size)
1076             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1077             self.verify_decrypted6(p_in, recv_pkts)
1078
1079             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1080                                        dst=p_out.remote_tun_if_host,
1081                                        count=count,
1082                                        payload_size=payload_size)
1083             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1084             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1085
1086             for rx in recv_pkts:
1087                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1088                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1089
1090         finally:
1091             self.logger.info(self.vapi.ppcli("show error"))
1092             self.logger.info(self.vapi.ppcli("show ipsec all"))
1093         self.verify_counters6(p_in, p_out, count)
1094
1095     def verify_tun_reass_66(self, p):
1096         self.vapi.cli("clear errors")
1097         self.vapi.ip_reassembly_enable_disable(
1098             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1099
1100         try:
1101             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1102                                                src=p.remote_tun_if_host,
1103                                                dst=self.pg1.remote_ip6,
1104                                                count=1,
1105                                                payload_size=1850)
1106             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1107             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1108                                              self.pg1, n_rx=1)
1109             self.verify_decrypted6(p, recv_pkts)
1110
1111             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1112                                        dst=p.remote_tun_if_host,
1113                                        count=1,
1114                                        payload_size=64)
1115             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1116                                              self.tun_if)
1117             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1118         finally:
1119             self.logger.info(self.vapi.ppcli("show error"))
1120             self.logger.info(self.vapi.ppcli("show ipsec all"))
1121         self.verify_counters6(p, p, 1)
1122         self.vapi.ip_reassembly_enable_disable(
1123             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1124
1125     def verify_tun_46(self, p, count=1):
1126         """ ipsec 4o6 tunnel basic test """
1127         self.vapi.cli("clear errors")
1128         self.vapi.cli("clear ipsec sa")
1129         try:
1130             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1131                                               src=p.remote_tun_if_host4,
1132                                               dst=self.pg1.remote_ip4,
1133                                               count=count)
1134             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1135             for recv_pkt in recv_pkts:
1136                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1137                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1138                 self.assert_packet_checksums_valid(recv_pkt)
1139             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1140                                       dst=p.remote_tun_if_host4,
1141                                       count=count)
1142             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1143             for recv_pkt in recv_pkts:
1144                 try:
1145                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1146                     if not decrypt_pkt.haslayer(IP):
1147                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1148                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1149                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1150                     self.assert_packet_checksums_valid(decrypt_pkt)
1151                 except:
1152                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1153                     try:
1154                         self.logger.debug(ppp("Decrypted packet:",
1155                                               decrypt_pkt))
1156                     except:
1157                         pass
1158                     raise
1159         finally:
1160             self.logger.info(self.vapi.ppcli("show error"))
1161             self.logger.info(self.vapi.ppcli("show ipsec all"))
1162         self.verify_counters6(p, p, count)
1163
1164
1165 class IpsecTun6Tests(IpsecTun6):
1166     """ UT test methods for Tunnel v6 """
1167
1168     def test_tun_basic66(self):
1169         """ ipsec 6o6 tunnel basic test """
1170         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1171
1172     def test_tun_reass_basic66(self):
1173         """ ipsec 6o6 tunnel basic reassembly test """
1174         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1175
1176     def test_tun_burst66(self):
1177         """ ipsec 6o6 tunnel burst test """
1178         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1179
1180
1181 class IpsecTun6HandoffTests(IpsecTun6):
1182     """ UT test methods for Tunnel v6 with multiple workers """
1183     worker_config = "workers 2"
1184
1185     def test_tun_handoff_66(self):
1186         """ ipsec 6o6 tunnel worker hand-off test """
1187         N_PKTS = 15
1188         p = self.params[socket.AF_INET6]
1189
1190         # inject alternately on worker 0 and 1. all counts on the SA
1191         # should be against worker 0
1192         for worker in [0, 1, 0, 1]:
1193             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1194                                                src=p.remote_tun_if_host,
1195                                                dst=self.pg1.remote_ip6,
1196                                                count=N_PKTS)
1197             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1198                                              self.pg1, worker=worker)
1199             self.verify_decrypted6(p, recv_pkts)
1200
1201             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1202                                        dst=p.remote_tun_if_host,
1203                                        count=N_PKTS)
1204             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1205                                              self.tun_if, worker=worker)
1206             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1207
1208         # all counts against the first worker that was used
1209         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1210
1211
1212 class IpsecTun4HandoffTests(IpsecTun4):
1213     """ UT test methods for Tunnel v4 with multiple workers """
1214     worker_config = "workers 2"
1215
1216     def test_tun_handooff_44(self):
1217         """ ipsec 4o4 tunnel worker hand-off test """
1218         N_PKTS = 15
1219         p = self.params[socket.AF_INET]
1220
1221         # inject alternately on worker 0 and 1. all counts on the SA
1222         # should be against worker 0
1223         for worker in [0, 1, 0, 1]:
1224             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1225                                               src=p.remote_tun_if_host,
1226                                               dst=self.pg1.remote_ip4,
1227                                               count=N_PKTS)
1228             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1229                                              self.pg1, worker=worker)
1230             self.verify_decrypted(p, recv_pkts)
1231
1232             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1233                                       dst=p.remote_tun_if_host,
1234                                       count=N_PKTS)
1235             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1236                                              self.tun_if, worker=worker)
1237             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1238
1239         # all counts against the first worker that was used
1240         self.verify_counters4(p, 4*N_PKTS, worker=0)
1241
1242
1243 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1244     """ UT test methods for Tunnel v6 & v4 """
1245     pass
1246
1247
1248 if __name__ == '__main__':
1249     unittest.main(testRunner=VppTestRunner)