ipsec: Dedicated IPSec interface type
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import raw, Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params(object):
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 100
31         self.scapy_tun_spi = 1001
32         self.vpp_tun_sa_id = 200
33         self.vpp_tun_spi = 1000
34
35         self.scapy_tra_sa_id = 300
36         self.scapy_tra_spi = 2001
37         self.vpp_tra_sa_id = 400
38         self.vpp_tra_spi = 2000
39
40         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41                                  IPSEC_API_INTEG_ALG_SHA1_96)
42         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
43         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
44
45         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47         self.crypt_algo = 'AES-CBC'  # scapy name
48         self.crypt_key = b'JPjyOWBeVEQiMe7h'
49         self.salt = 0
50         self.flags = 0
51         self.nat_header = None
52
53
54 class IPsecIPv6Params(object):
55
56     addr_type = socket.AF_INET6
57     addr_any = "0::0"
58     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
59     addr_len = 128
60     is_ipv6 = 1
61
62     def __init__(self):
63         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
64         self.remote_tun_if_host4 = '1.1.1.1'
65
66         self.scapy_tun_sa_id = 500
67         self.scapy_tun_spi = 3001
68         self.vpp_tun_sa_id = 600
69         self.vpp_tun_spi = 3000
70
71         self.scapy_tra_sa_id = 700
72         self.scapy_tra_spi = 4001
73         self.vpp_tra_sa_id = 800
74         self.vpp_tra_spi = 4000
75
76         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
77                                  IPSEC_API_INTEG_ALG_SHA1_96)
78         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
79         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
80
81         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
82                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
83         self.crypt_algo = 'AES-CBC'  # scapy name
84         self.crypt_key = b'JPjyOWBeVEQiMe7h'
85         self.salt = 0
86         self.flags = 0
87         self.nat_header = None
88
89
90 def mk_scapy_crypt_key(p):
91     if p.crypt_algo == "AES-GCM":
92         return p.crypt_key + struct.pack("!I", p.salt)
93     else:
94         return p.crypt_key
95
96
97 def config_tun_params(p, encryption_type, tun_if):
98     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
99     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
100                              IPSEC_API_SAD_FLAG_USE_ESN))
101     p.tun_dst = tun_if.remote_addr[p.addr_type]
102     p.tun_src = tun_if.local_addr[p.addr_type]
103     crypt_key = mk_scapy_crypt_key(p)
104     p.scapy_tun_sa = SecurityAssociation(
105         encryption_type, spi=p.vpp_tun_spi,
106         crypt_algo=p.crypt_algo,
107         crypt_key=crypt_key,
108         auth_algo=p.auth_algo, auth_key=p.auth_key,
109         tunnel_header=ip_class_by_addr_type[p.addr_type](
110             src=p.tun_dst,
111             dst=p.tun_src),
112         nat_t_header=p.nat_header,
113         esn_en=esn_en)
114     p.vpp_tun_sa = SecurityAssociation(
115         encryption_type, spi=p.scapy_tun_spi,
116         crypt_algo=p.crypt_algo,
117         crypt_key=crypt_key,
118         auth_algo=p.auth_algo, auth_key=p.auth_key,
119         tunnel_header=ip_class_by_addr_type[p.addr_type](
120             dst=p.tun_dst,
121             src=p.tun_src),
122         nat_t_header=p.nat_header,
123         esn_en=esn_en)
124
125
126 def config_tra_params(p, encryption_type):
127     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
128                              IPSEC_API_SAD_FLAG_USE_ESN))
129     crypt_key = mk_scapy_crypt_key(p)
130     p.scapy_tra_sa = SecurityAssociation(
131         encryption_type,
132         spi=p.vpp_tra_spi,
133         crypt_algo=p.crypt_algo,
134         crypt_key=crypt_key,
135         auth_algo=p.auth_algo,
136         auth_key=p.auth_key,
137         nat_t_header=p.nat_header,
138         esn_en=esn_en)
139     p.vpp_tra_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.scapy_tra_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         nat_t_header=p.nat_header,
147         esn_en=esn_en)
148
149
150 class TemplateIpsec(VppTestCase):
151     """
152     TRANSPORT MODE:
153
154      ------   encrypt   ---
155     |tra_if| <-------> |VPP|
156      ------   decrypt   ---
157
158     TUNNEL MODE:
159
160      ------   encrypt   ---   plain   ---
161     |tun_if| <-------  |VPP| <------ |pg1|
162      ------             ---           ---
163
164      ------   decrypt   ---   plain   ---
165     |tun_if| ------->  |VPP| ------> |pg1|
166      ------             ---           ---
167     """
168     tun_spd_id = 1
169     tra_spd_id = 2
170
171     def ipsec_select_backend(self):
172         """ empty method to be overloaded when necessary """
173         pass
174
175     @classmethod
176     def setUpClass(cls):
177         super(TemplateIpsec, cls).setUpClass()
178
179     @classmethod
180     def tearDownClass(cls):
181         super(TemplateIpsec, cls).tearDownClass()
182
183     def setup_params(self):
184         self.ipv4_params = IPsecIPv4Params()
185         self.ipv6_params = IPsecIPv6Params()
186         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
187                        self.ipv6_params.addr_type: self.ipv6_params}
188
189     def config_interfaces(self):
190         self.create_pg_interfaces(range(3))
191         self.interfaces = list(self.pg_interfaces)
192         for i in self.interfaces:
193             i.admin_up()
194             i.config_ip4()
195             i.resolve_arp()
196             i.config_ip6()
197             i.resolve_ndp()
198
199     def setUp(self):
200         super(TemplateIpsec, self).setUp()
201
202         self.setup_params()
203
204         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
205                                  IPSEC_API_PROTO_ESP)
206         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
207                                 IPSEC_API_PROTO_AH)
208
209         self.config_interfaces()
210
211         self.ipsec_select_backend()
212
213     def unconfig_interfaces(self):
214         for i in self.interfaces:
215             i.admin_down()
216             i.unconfig_ip4()
217             i.unconfig_ip6()
218
219     def tearDown(self):
220         super(TemplateIpsec, self).tearDown()
221
222         self.unconfig_interfaces()
223
224     def show_commands_at_teardown(self):
225         self.logger.info(self.vapi.cli("show hardware"))
226
227     def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
228                          payload_size=54):
229         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
230                 sa.encrypt(IP(src=src, dst=dst) /
231                            ICMP() / Raw(b'X' * payload_size))
232                 for i in range(count)]
233
234     def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
235                           payload_size=54):
236         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
237                 sa.encrypt(IPv6(src=src, dst=dst) /
238                            ICMPv6EchoRequest(id=0, seq=1,
239                                              data='X' * payload_size))
240                 for i in range(count)]
241
242     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
245                 for i in range(count)]
246
247     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
248         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
249                 IPv6(src=src, dst=dst) /
250                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
251                 for i in range(count)]
252
253
254 class IpsecTcp(object):
255     def verify_tcp_checksum(self):
256         self.vapi.cli("test http server")
257         p = self.params[socket.AF_INET]
258         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
259                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
260                                           dst=self.tun_if.local_ip4) /
261                                        TCP(flags='S', dport=80)))
262         self.logger.debug(ppp("Sending packet:", send))
263         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
264         recv = recv[0]
265         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
266         self.assert_packet_checksums_valid(decrypted)
267
268
269 class IpsecTcpTests(IpsecTcp):
270     def test_tcp_checksum(self):
271         """ verify checksum correctness for vpp generated packets """
272         self.verify_tcp_checksum()
273
274
275 class IpsecTra4(object):
276     """ verify methods for Transport v4 """
277     def verify_tra_anti_replay(self):
278         p = self.params[socket.AF_INET]
279         esn_en = p.vpp_tra_sa.esn_en
280
281         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
282                                self.tra4_encrypt_node_name)
283         replay_node_name = ('/err/%s/SA replayed packet' %
284                             self.tra4_decrypt_node_name)
285         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
286             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
287                                      self.tra4_decrypt_node_name)
288         else:
289             hash_failed_node_name = ('/err/%s/Integrity check failed' %
290                                      self.tra4_decrypt_node_name)
291         replay_count = self.statistics.get_err_counter(replay_node_name)
292         hash_failed_count = self.statistics.get_err_counter(
293             hash_failed_node_name)
294         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
295
296         if ESP == self.encryption_type:
297             undersize_node_name = ('/err/%s/undersized packet' %
298                                    self.tra4_decrypt_node_name)
299             undersize_count = self.statistics.get_err_counter(
300                 undersize_node_name)
301
302         #
303         # send packets with seq numbers 1->34
304         # this means the window size is still in Case B (see RFC4303
305         # Appendix A)
306         #
307         # for reasons i haven't investigated Scapy won't create a packet with
308         # seq_num=0
309         #
310         pkts = [(Ether(src=self.tra_if.remote_mac,
311                        dst=self.tra_if.local_mac) /
312                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
313                                            dst=self.tra_if.local_ip4) /
314                                         ICMP(),
315                                         seq_num=seq))
316                 for seq in range(1, 34)]
317         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
318
319         # replayed packets are dropped
320         self.send_and_assert_no_replies(self.tra_if, pkts)
321         replay_count += len(pkts)
322         self.assert_error_counter_equal(replay_node_name, replay_count)
323
324         #
325         # now send a batch of packets all with the same sequence number
326         # the first packet in the batch is legitimate, the rest bogus
327         #
328         pkts = (Ether(src=self.tra_if.remote_mac,
329                       dst=self.tra_if.local_mac) /
330                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
331                                           dst=self.tra_if.local_ip4) /
332                                        ICMP(),
333                                        seq_num=35))
334         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
335                                          self.tra_if, n_rx=1)
336         replay_count += 7
337         self.assert_error_counter_equal(replay_node_name, replay_count)
338
339         #
340         # now move the window over to 257 (more than one byte) and into Case A
341         #
342         pkt = (Ether(src=self.tra_if.remote_mac,
343                      dst=self.tra_if.local_mac) /
344                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
345                                          dst=self.tra_if.local_ip4) /
346                                       ICMP(),
347                                       seq_num=257))
348         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
349
350         # replayed packets are dropped
351         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
352         replay_count += 3
353         self.assert_error_counter_equal(replay_node_name, replay_count)
354
355         # the window size is 64 packets
356         # in window are still accepted
357         pkt = (Ether(src=self.tra_if.remote_mac,
358                      dst=self.tra_if.local_mac) /
359                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
360                                          dst=self.tra_if.local_ip4) /
361                                       ICMP(),
362                                       seq_num=200))
363         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
364
365         # a packet that does not decrypt does not move the window forward
366         bogus_sa = SecurityAssociation(self.encryption_type,
367                                        p.vpp_tra_spi,
368                                        crypt_algo=p.crypt_algo,
369                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
370                                        auth_algo=p.auth_algo,
371                                        auth_key=p.auth_key[::-1])
372         pkt = (Ether(src=self.tra_if.remote_mac,
373                      dst=self.tra_if.local_mac) /
374                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375                                    dst=self.tra_if.local_ip4) /
376                                 ICMP(),
377                                 seq_num=350))
378         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
379
380         hash_failed_count += 17
381         self.assert_error_counter_equal(hash_failed_node_name,
382                                         hash_failed_count)
383
384         # a malformed 'runt' packet
385         #  created by a mis-constructed SA
386         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
387             bogus_sa = SecurityAssociation(self.encryption_type,
388                                            p.vpp_tra_spi)
389             pkt = (Ether(src=self.tra_if.remote_mac,
390                          dst=self.tra_if.local_mac) /
391                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
392                                        dst=self.tra_if.local_ip4) /
393                                     ICMP(),
394                                     seq_num=350))
395             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
396
397             undersize_count += 17
398             self.assert_error_counter_equal(undersize_node_name,
399                                             undersize_count)
400
401         # which we can determine since this packet is still in the window
402         pkt = (Ether(src=self.tra_if.remote_mac,
403                      dst=self.tra_if.local_mac) /
404                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
405                                          dst=self.tra_if.local_ip4) /
406                                       ICMP(),
407                                       seq_num=234))
408         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
409
410         #
411         # out of window are dropped
412         #  this is Case B. So VPP will consider this to be a high seq num wrap
413         #  and so the decrypt attempt will fail
414         #
415         pkt = (Ether(src=self.tra_if.remote_mac,
416                      dst=self.tra_if.local_mac) /
417                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
418                                          dst=self.tra_if.local_ip4) /
419                                       ICMP(),
420                                       seq_num=17))
421         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
422
423         if esn_en:
424             # an out of window error with ESN looks like a high sequence
425             # wrap. but since it isn't then the verify will fail.
426             hash_failed_count += 17
427             self.assert_error_counter_equal(hash_failed_node_name,
428                                             hash_failed_count)
429
430         else:
431             replay_count += 17
432             self.assert_error_counter_equal(replay_node_name,
433                                             replay_count)
434
435         # valid packet moves the window over to 258
436         pkt = (Ether(src=self.tra_if.remote_mac,
437                      dst=self.tra_if.local_mac) /
438                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439                                          dst=self.tra_if.local_ip4) /
440                                       ICMP(),
441                                       seq_num=258))
442         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
443         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
444
445         #
446         # move VPP's SA TX seq-num to just before the seq-number wrap.
447         # then fire in a packet that VPP should drop on TX because it
448         # causes the TX seq number to wrap; unless we're using extened sequence
449         # numbers.
450         #
451         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
452         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
453         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
454
455         pkts = [(Ether(src=self.tra_if.remote_mac,
456                        dst=self.tra_if.local_mac) /
457                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
458                                            dst=self.tra_if.local_ip4) /
459                                         ICMP(),
460                                         seq_num=seq))
461                 for seq in range(259, 280)]
462
463         if esn_en:
464             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
465
466             #
467             # in order for scapy to decrypt its SA's high order number needs
468             # to wrap
469             #
470             p.vpp_tra_sa.seq_num = 0x100000000
471             for rx in rxs:
472                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
473
474             #
475             # wrap scapy's TX high sequence number. VPP is in case B, so it
476             # will consider this a high seq wrap also.
477             # The low seq num we set it to will place VPP's RX window in Case A
478             #
479             p.scapy_tra_sa.seq_num = 0x100000005
480             pkt = (Ether(src=self.tra_if.remote_mac,
481                          dst=self.tra_if.local_mac) /
482                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
483                                              dst=self.tra_if.local_ip4) /
484                                           ICMP(),
485                                           seq_num=0x100000005))
486             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
487             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
488
489             #
490             # A packet that has seq num between (2^32-64) and 5 is within
491             # the window
492             #
493             p.scapy_tra_sa.seq_num = 0xfffffffd
494             pkt = (Ether(src=self.tra_if.remote_mac,
495                          dst=self.tra_if.local_mac) /
496                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
497                                              dst=self.tra_if.local_ip4) /
498                                           ICMP(),
499                                           seq_num=0xfffffffd))
500             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
501             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
502
503             #
504             # While in case A we cannot wrap the high sequence number again
505             # becuase VPP will consider this packet to be one that moves the
506             # window forward
507             #
508             pkt = (Ether(src=self.tra_if.remote_mac,
509                          dst=self.tra_if.local_mac) /
510                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
511                                              dst=self.tra_if.local_ip4) /
512                                           ICMP(),
513                                           seq_num=0x200000999))
514             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
515
516             hash_failed_count += 1
517             self.assert_error_counter_equal(hash_failed_node_name,
518                                             hash_failed_count)
519
520             #
521             # but if we move the wondow forward to case B, then we can wrap
522             # again
523             #
524             p.scapy_tra_sa.seq_num = 0x100000555
525             pkt = (Ether(src=self.tra_if.remote_mac,
526                          dst=self.tra_if.local_mac) /
527                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
528                                              dst=self.tra_if.local_ip4) /
529                                           ICMP(),
530                                           seq_num=0x100000555))
531             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
532             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
533
534             p.scapy_tra_sa.seq_num = 0x200000444
535             pkt = (Ether(src=self.tra_if.remote_mac,
536                          dst=self.tra_if.local_mac) /
537                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
538                                              dst=self.tra_if.local_ip4) /
539                                           ICMP(),
540                                           seq_num=0x200000444))
541             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
542             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
543
544         else:
545             #
546             # without ESN TX sequence numbers can't wrap and packets are
547             # dropped from here on out.
548             #
549             self.send_and_assert_no_replies(self.tra_if, pkts)
550             seq_cycle_count += len(pkts)
551             self.assert_error_counter_equal(seq_cycle_node_name,
552                                             seq_cycle_count)
553
554         # move the security-associations seq number on to the last we used
555         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
556         p.scapy_tra_sa.seq_num = 351
557         p.vpp_tra_sa.seq_num = 351
558
559     def verify_tra_basic4(self, count=1, payload_size=54):
560         """ ipsec v4 transport basic test """
561         self.vapi.cli("clear errors")
562         self.vapi.cli("clear ipsec sa")
563         try:
564             p = self.params[socket.AF_INET]
565             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tra_sa, self.tra_if,
566                                               src=self.tra_if.remote_ip4,
567                                               dst=self.tra_if.local_ip4,
568                                               count=count,
569                                               payload_size=payload_size)
570             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
571                                              self.tra_if)
572             for rx in recv_pkts:
573                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
574                 self.assert_packet_checksums_valid(rx)
575                 try:
576                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
577                     self.assert_packet_checksums_valid(decrypted)
578                 except:
579                     self.logger.debug(ppp("Unexpected packet:", rx))
580                     raise
581         finally:
582             self.logger.info(self.vapi.ppcli("show error"))
583             self.logger.info(self.vapi.ppcli("show ipsec all"))
584
585         pkts = p.tra_sa_in.get_stats()['packets']
586         self.assertEqual(pkts, count,
587                          "incorrect SA in counts: expected %d != %d" %
588                          (count, pkts))
589         pkts = p.tra_sa_out.get_stats()['packets']
590         self.assertEqual(pkts, count,
591                          "incorrect SA out counts: expected %d != %d" %
592                          (count, pkts))
593
594         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
595         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
596
597
598 class IpsecTra4Tests(IpsecTra4):
599     """ UT test methods for Transport v4 """
600     def test_tra_anti_replay(self):
601         """ ipsec v4 transport anti-replay test """
602         self.verify_tra_anti_replay()
603
604     def test_tra_basic(self, count=1):
605         """ ipsec v4 transport basic test """
606         self.verify_tra_basic4(count=1)
607
608     def test_tra_burst(self):
609         """ ipsec v4 transport burst test """
610         self.verify_tra_basic4(count=257)
611
612
613 class IpsecTra6(object):
614     """ verify methods for Transport v6 """
615     def verify_tra_basic6(self, count=1, payload_size=54):
616         self.vapi.cli("clear errors")
617         self.vapi.cli("clear ipsec sa")
618         try:
619             p = self.params[socket.AF_INET6]
620             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tra_sa, self.tra_if,
621                                                src=self.tra_if.remote_ip6,
622                                                dst=self.tra_if.local_ip6,
623                                                count=count,
624                                                payload_size=payload_size)
625             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
626                                              self.tra_if)
627             for rx in recv_pkts:
628                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
629                                  rx[IPv6].plen)
630                 try:
631                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
632                     self.assert_packet_checksums_valid(decrypted)
633                 except:
634                     self.logger.debug(ppp("Unexpected packet:", rx))
635                     raise
636         finally:
637             self.logger.info(self.vapi.ppcli("show error"))
638             self.logger.info(self.vapi.ppcli("show ipsec all"))
639
640         pkts = p.tra_sa_in.get_stats()['packets']
641         self.assertEqual(pkts, count,
642                          "incorrect SA in counts: expected %d != %d" %
643                          (count, pkts))
644         pkts = p.tra_sa_out.get_stats()['packets']
645         self.assertEqual(pkts, count,
646                          "incorrect SA out counts: expected %d != %d" %
647                          (count, pkts))
648         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
649         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
650
651     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
652                                    payload_size=54):
653         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
654                 sa.encrypt(IPv6(src=src, dst=dst) /
655                            ICMPv6EchoRequest(id=0, seq=1,
656                                              data='X' * payload_size))
657                 for i in range(count)]
658
659     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
660         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
661                 IPv6(src=src, dst=dst) /
662                 IPv6ExtHdrHopByHop() /
663                 IPv6ExtHdrFragment(id=2, offset=200) /
664                 Raw(b'\xff' * 200)
665                 for i in range(count)]
666
667     def verify_tra_encrypted6(self, p, sa, rxs):
668         decrypted = []
669         for rx in rxs:
670             self.assert_packet_checksums_valid(rx)
671             try:
672                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
673                 decrypted.append(decrypt_pkt)
674                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
675                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
676             except:
677                 self.logger.debug(ppp("Unexpected packet:", rx))
678                 try:
679                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
680                 except:
681                     pass
682                 raise
683         return decrypted
684
685     def verify_tra_66_ext_hdrs(self, p):
686         count = 63
687
688         #
689         # check we can decrypt with options
690         #
691         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
692                                              src=self.tra_if.remote_ip6,
693                                              dst=self.tra_if.local_ip6,
694                                              count=count)
695         self.send_and_expect(self.tra_if, tx, self.tra_if)
696
697         #
698         # injecting a packet from ourselves to be routed of box is a hack
699         # but it matches an outbout policy, alors je ne regrette rien
700         #
701
702         # one extension before ESP
703         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
704               IPv6(src=self.tra_if.local_ip6,
705                    dst=self.tra_if.remote_ip6) /
706               IPv6ExtHdrFragment(id=2, offset=200) /
707               Raw(b'\xff' * 200))
708
709         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
710         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
711
712         for dc in dcs:
713             # for reasons i'm not going to investigate scapy does not
714             # created the correct headers after decrypt. but reparsing
715             # the ipv6 packet fixes it
716             dc = IPv6(raw(dc[IPv6]))
717             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
718
719         # two extensions before ESP
720         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
721               IPv6(src=self.tra_if.local_ip6,
722                    dst=self.tra_if.remote_ip6) /
723               IPv6ExtHdrHopByHop() /
724               IPv6ExtHdrFragment(id=2, offset=200) /
725               Raw(b'\xff' * 200))
726
727         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
728         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
729
730         for dc in dcs:
731             dc = IPv6(raw(dc[IPv6]))
732             self.assertTrue(dc[IPv6ExtHdrHopByHop])
733             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
734
735         # two extensions before ESP, one after
736         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
737               IPv6(src=self.tra_if.local_ip6,
738                    dst=self.tra_if.remote_ip6) /
739               IPv6ExtHdrHopByHop() /
740               IPv6ExtHdrFragment(id=2, offset=200) /
741               IPv6ExtHdrDestOpt() /
742               Raw(b'\xff' * 200))
743
744         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
745         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
746
747         for dc in dcs:
748             dc = IPv6(raw(dc[IPv6]))
749             self.assertTrue(dc[IPv6ExtHdrDestOpt])
750             self.assertTrue(dc[IPv6ExtHdrHopByHop])
751             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
752
753
754 class IpsecTra6Tests(IpsecTra6):
755     """ UT test methods for Transport v6 """
756     def test_tra_basic6(self):
757         """ ipsec v6 transport basic test """
758         self.verify_tra_basic6(count=1)
759
760     def test_tra_burst6(self):
761         """ ipsec v6 transport burst test """
762         self.verify_tra_basic6(count=257)
763
764
765 class IpsecTra6ExtTests(IpsecTra6):
766     def test_tra_ext_hdrs_66(self):
767         """ ipsec 6o6 tra extension headers test """
768         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
769
770
771 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
772     """ UT test methods for Transport v6 and v4"""
773     pass
774
775
776 class IpsecTun4(object):
777     """ verify methods for Tunnel v4 """
778     def verify_counters4(self, p, count, n_frags=None, worker=None):
779         if not n_frags:
780             n_frags = count
781         if (hasattr(p, "spd_policy_in_any")):
782             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
783             self.assertEqual(pkts, count,
784                              "incorrect SPD any policy: expected %d != %d" %
785                              (count, pkts))
786
787         if (hasattr(p, "tun_sa_in")):
788             pkts = p.tun_sa_in.get_stats(worker)['packets']
789             self.assertEqual(pkts, count,
790                              "incorrect SA in counts: expected %d != %d" %
791                              (count, pkts))
792             pkts = p.tun_sa_out.get_stats(worker)['packets']
793             self.assertEqual(pkts, count,
794                              "incorrect SA out counts: expected %d != %d" %
795                              (count, pkts))
796
797         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
798         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
799
800     def verify_decrypted(self, p, rxs):
801         for rx in rxs:
802             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
803             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
804             self.assert_packet_checksums_valid(rx)
805
806     def verify_encrypted(self, p, sa, rxs):
807         decrypt_pkts = []
808         for rx in rxs:
809             if p.nat_header:
810                 self.assertEqual(rx[UDP].dport, 4500)
811             self.assert_packet_checksums_valid(rx)
812             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
813             try:
814                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
815                 if not decrypt_pkt.haslayer(IP):
816                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
817                 decrypt_pkts.append(decrypt_pkt)
818                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
819                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
820             except:
821                 self.logger.debug(ppp("Unexpected packet:", rx))
822                 try:
823                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
824                 except:
825                     pass
826                 raise
827         pkts = reassemble4(decrypt_pkts)
828         for pkt in pkts:
829             self.assert_packet_checksums_valid(pkt)
830
831     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
832         self.vapi.cli("clear errors")
833         self.vapi.cli("clear ipsec counters")
834         self.vapi.cli("clear ipsec sa")
835         if not n_rx:
836             n_rx = count
837         try:
838             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
839                                               src=p.remote_tun_if_host,
840                                               dst=self.pg1.remote_ip4,
841                                               count=count,
842                                               payload_size=payload_size)
843             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
844             self.verify_decrypted(p, recv_pkts)
845
846             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
847                                       dst=p.remote_tun_if_host, count=count,
848                                       payload_size=payload_size)
849             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
850                                              self.tun_if, n_rx)
851             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
852
853             for rx in recv_pkts:
854                 self.assertEqual(rx[IP].src, p.tun_src)
855                 self.assertEqual(rx[IP].dst, p.tun_dst)
856
857         finally:
858             self.logger.info(self.vapi.ppcli("show error"))
859             self.logger.info(self.vapi.ppcli("show ipsec all"))
860
861         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
862         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
863         self.verify_counters4(p, count, n_rx)
864
865     def verify_tun_dropped_44(self, p, count=1, payload_size=64, n_rx=None):
866         self.vapi.cli("clear errors")
867         if not n_rx:
868             n_rx = count
869         try:
870             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
871                                               src=p.remote_tun_if_host,
872                                               dst=self.pg1.remote_ip4,
873                                               count=count)
874             self.send_and_assert_no_replies(self.tun_if, send_pkts)
875
876             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
877                                       dst=p.remote_tun_if_host, count=count,
878                                       payload_size=payload_size)
879             self.send_and_assert_no_replies(self.pg1, send_pkts)
880
881         finally:
882             self.logger.info(self.vapi.ppcli("show error"))
883             self.logger.info(self.vapi.ppcli("show ipsec all"))
884
885     def verify_tun_reass_44(self, p):
886         self.vapi.cli("clear errors")
887         self.vapi.ip_reassembly_enable_disable(
888             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
889
890         try:
891             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
892                                               src=p.remote_tun_if_host,
893                                               dst=self.pg1.remote_ip4,
894                                               payload_size=1900,
895                                               count=1)
896             send_pkts = fragment_rfc791(send_pkts[0], 1400)
897             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
898                                              self.pg1, n_rx=1)
899             self.verify_decrypted(p, recv_pkts)
900
901             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
902                                       dst=p.remote_tun_if_host, count=1)
903             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
904                                              self.tun_if)
905             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
906
907         finally:
908             self.logger.info(self.vapi.ppcli("show error"))
909             self.logger.info(self.vapi.ppcli("show ipsec all"))
910
911         self.verify_counters4(p, 1, 1)
912         self.vapi.ip_reassembly_enable_disable(
913             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
914
915     def verify_tun_64(self, p, count=1):
916         self.vapi.cli("clear errors")
917         self.vapi.cli("clear ipsec sa")
918         try:
919             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
920                                                src=p.remote_tun_if_host6,
921                                                dst=self.pg1.remote_ip6,
922                                                count=count)
923             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
924             for recv_pkt in recv_pkts:
925                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
926                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
927                 self.assert_packet_checksums_valid(recv_pkt)
928             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
929                                        dst=p.remote_tun_if_host6, count=count)
930             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
931             for recv_pkt in recv_pkts:
932                 try:
933                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
934                     if not decrypt_pkt.haslayer(IPv6):
935                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
936                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
937                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
938                     self.assert_packet_checksums_valid(decrypt_pkt)
939                 except:
940                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
941                     try:
942                         self.logger.debug(
943                             ppp("Decrypted packet:", decrypt_pkt))
944                     except:
945                         pass
946                     raise
947         finally:
948             self.logger.info(self.vapi.ppcli("show error"))
949             self.logger.info(self.vapi.ppcli("show ipsec all"))
950
951         self.verify_counters4(p, count)
952
953     def verify_keepalive(self, p):
954         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
955                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
956                UDP(sport=333, dport=4500) /
957                Raw(b'\xff'))
958         self.send_and_assert_no_replies(self.tun_if, pkt*31)
959         self.assert_error_counter_equal(
960             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
961
962         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
963                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
964                UDP(sport=333, dport=4500) /
965                Raw(b'\xfe'))
966         self.send_and_assert_no_replies(self.tun_if, pkt*31)
967         self.assert_error_counter_equal(
968             '/err/%s/Too Short' % self.tun4_input_node, 31)
969
970
971 class IpsecTun4Tests(IpsecTun4):
972     """ UT test methods for Tunnel v4 """
973     def test_tun_basic44(self):
974         """ ipsec 4o4 tunnel basic test """
975         self.verify_tun_44(self.params[socket.AF_INET], count=1)
976         self.tun_if.admin_down()
977         self.tun_if.resolve_arp()
978         self.tun_if.admin_up()
979         self.verify_tun_44(self.params[socket.AF_INET], count=1)
980
981     def test_tun_reass_basic44(self):
982         """ ipsec 4o4 tunnel basic reassembly test """
983         self.verify_tun_reass_44(self.params[socket.AF_INET])
984
985     def test_tun_burst44(self):
986         """ ipsec 4o4 tunnel burst test """
987         self.verify_tun_44(self.params[socket.AF_INET], count=127)
988
989
990 class IpsecTun6(object):
991     """ verify methods for Tunnel v6 """
992     def verify_counters6(self, p_in, p_out, count, worker=None):
993         if (hasattr(p_in, "tun_sa_in")):
994             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
995             self.assertEqual(pkts, count,
996                              "incorrect SA in counts: expected %d != %d" %
997                              (count, pkts))
998         if (hasattr(p_out, "tun_sa_out")):
999             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
1000             self.assertEqual(pkts, count,
1001                              "incorrect SA out counts: expected %d != %d" %
1002                              (count, pkts))
1003         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
1004         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
1005
1006     def verify_decrypted6(self, p, rxs):
1007         for rx in rxs:
1008             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
1009             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
1010             self.assert_packet_checksums_valid(rx)
1011
1012     def verify_encrypted6(self, p, sa, rxs):
1013         for rx in rxs:
1014             self.assert_packet_checksums_valid(rx)
1015             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
1016                              rx[IPv6].plen)
1017             try:
1018                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
1019                 if not decrypt_pkt.haslayer(IPv6):
1020                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
1021                 self.assert_packet_checksums_valid(decrypt_pkt)
1022                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1023                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1024             except:
1025                 self.logger.debug(ppp("Unexpected packet:", rx))
1026                 try:
1027                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1028                 except:
1029                     pass
1030                 raise
1031
1032     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1033         self.vapi.cli("clear errors")
1034         self.vapi.cli("clear ipsec sa")
1035
1036         send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1037                                            self.tun_if,
1038                                            src=p_in.remote_tun_if_host,
1039                                            dst=self.pg1.remote_ip6,
1040                                            count=count)
1041         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1042         self.logger.info(self.vapi.cli("sh punt stats"))
1043
1044     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1045         self.vapi.cli("clear errors")
1046         self.vapi.cli("clear ipsec sa")
1047         if not p_out:
1048             p_out = p_in
1049         try:
1050             send_pkts = self.gen_encrypt_pkts6(p_in, p_in.scapy_tun_sa,
1051                                                self.tun_if,
1052                                                src=p_in.remote_tun_if_host,
1053                                                dst=self.pg1.remote_ip6,
1054                                                count=count,
1055                                                payload_size=payload_size)
1056             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1057             self.verify_decrypted6(p_in, recv_pkts)
1058
1059             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1060                                        dst=p_out.remote_tun_if_host,
1061                                        count=count,
1062                                        payload_size=payload_size)
1063             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1064             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1065
1066             for rx in recv_pkts:
1067                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1068                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1069
1070         finally:
1071             self.logger.info(self.vapi.ppcli("show error"))
1072             self.logger.info(self.vapi.ppcli("show ipsec all"))
1073         self.verify_counters6(p_in, p_out, count)
1074
1075     def verify_tun_reass_66(self, p):
1076         self.vapi.cli("clear errors")
1077         self.vapi.ip_reassembly_enable_disable(
1078             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1079
1080         try:
1081             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1082                                                src=p.remote_tun_if_host,
1083                                                dst=self.pg1.remote_ip6,
1084                                                count=1,
1085                                                payload_size=1850)
1086             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1087             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1088                                              self.pg1, n_rx=1)
1089             self.verify_decrypted6(p, recv_pkts)
1090
1091             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1092                                        dst=p.remote_tun_if_host,
1093                                        count=1,
1094                                        payload_size=64)
1095             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1096                                              self.tun_if)
1097             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1098         finally:
1099             self.logger.info(self.vapi.ppcli("show error"))
1100             self.logger.info(self.vapi.ppcli("show ipsec all"))
1101         self.verify_counters6(p, p, 1)
1102         self.vapi.ip_reassembly_enable_disable(
1103             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1104
1105     def verify_tun_46(self, p, count=1):
1106         """ ipsec 4o6 tunnel basic test """
1107         self.vapi.cli("clear errors")
1108         self.vapi.cli("clear ipsec sa")
1109         try:
1110             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1111                                               src=p.remote_tun_if_host4,
1112                                               dst=self.pg1.remote_ip4,
1113                                               count=count)
1114             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1115             for recv_pkt in recv_pkts:
1116                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1117                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1118                 self.assert_packet_checksums_valid(recv_pkt)
1119             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1120                                       dst=p.remote_tun_if_host4,
1121                                       count=count)
1122             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1123             for recv_pkt in recv_pkts:
1124                 try:
1125                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1126                     if not decrypt_pkt.haslayer(IP):
1127                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1128                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1129                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1130                     self.assert_packet_checksums_valid(decrypt_pkt)
1131                 except:
1132                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1133                     try:
1134                         self.logger.debug(ppp("Decrypted packet:",
1135                                               decrypt_pkt))
1136                     except:
1137                         pass
1138                     raise
1139         finally:
1140             self.logger.info(self.vapi.ppcli("show error"))
1141             self.logger.info(self.vapi.ppcli("show ipsec all"))
1142         self.verify_counters6(p, p, count)
1143
1144
1145 class IpsecTun6Tests(IpsecTun6):
1146     """ UT test methods for Tunnel v6 """
1147
1148     def test_tun_basic66(self):
1149         """ ipsec 6o6 tunnel basic test """
1150         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1151
1152     def test_tun_reass_basic66(self):
1153         """ ipsec 6o6 tunnel basic reassembly test """
1154         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1155
1156     def test_tun_burst66(self):
1157         """ ipsec 6o6 tunnel burst test """
1158         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1159
1160
1161 class IpsecTun6HandoffTests(IpsecTun6):
1162     """ UT test methods for Tunnel v6 with multiple workers """
1163     worker_config = "workers 2"
1164
1165     def test_tun_handoff_66(self):
1166         """ ipsec 6o6 tunnel worker hand-off test """
1167         N_PKTS = 15
1168         p = self.params[socket.AF_INET6]
1169
1170         # inject alternately on worker 0 and 1. all counts on the SA
1171         # should be against worker 0
1172         for worker in [0, 1, 0, 1]:
1173             send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
1174                                                src=p.remote_tun_if_host,
1175                                                dst=self.pg1.remote_ip6,
1176                                                count=N_PKTS)
1177             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1178                                              self.pg1, worker=worker)
1179             self.verify_decrypted6(p, recv_pkts)
1180
1181             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1182                                        dst=p.remote_tun_if_host,
1183                                        count=N_PKTS)
1184             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1185                                              self.tun_if, worker=worker)
1186             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1187
1188         # all counts against the first worker that was used
1189         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1190
1191
1192 class IpsecTun4HandoffTests(IpsecTun4):
1193     """ UT test methods for Tunnel v4 with multiple workers """
1194     worker_config = "workers 2"
1195
1196     def test_tun_handooff_44(self):
1197         """ ipsec 4o4 tunnel worker hand-off test """
1198         N_PKTS = 15
1199         p = self.params[socket.AF_INET]
1200
1201         # inject alternately on worker 0 and 1. all counts on the SA
1202         # should be against worker 0
1203         for worker in [0, 1, 0, 1]:
1204             send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
1205                                               src=p.remote_tun_if_host,
1206                                               dst=self.pg1.remote_ip4,
1207                                               count=N_PKTS)
1208             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1209                                              self.pg1, worker=worker)
1210             self.verify_decrypted(p, recv_pkts)
1211
1212             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1213                                       dst=p.remote_tun_if_host,
1214                                       count=N_PKTS)
1215             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1216                                              self.tun_if, worker=worker)
1217             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1218
1219         # all counts against the first worker that was used
1220         self.verify_counters4(p, 4*N_PKTS, worker=0)
1221
1222
1223 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1224     """ UT test methods for Tunnel v6 & v4 """
1225     pass
1226
1227
1228 if __name__ == '__main__':
1229     unittest.main(testRunner=VppTestRunner)