ipip: Unintialized return variable (coverity warning)
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
10     IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
11
12
13 from framework import VppTestCase, VppTestRunner
14 from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
15 from vpp_papi import VppEnum
16
17
18 class IPsecIPv4Params(object):
19
20     addr_type = socket.AF_INET
21     addr_any = "0.0.0.0"
22     addr_bcast = "255.255.255.255"
23     addr_len = 32
24     is_ipv6 = 0
25
26     def __init__(self):
27         self.remote_tun_if_host = '1.1.1.1'
28         self.remote_tun_if_host6 = '1111::1'
29
30         self.scapy_tun_sa_id = 10
31         self.scapy_tun_spi = 1001
32         self.vpp_tun_sa_id = 20
33         self.vpp_tun_spi = 1000
34
35         self.scapy_tra_sa_id = 30
36         self.scapy_tra_spi = 2001
37         self.vpp_tra_sa_id = 40
38         self.vpp_tra_spi = 2000
39
40         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
41                                  IPSEC_API_INTEG_ALG_SHA1_96)
42         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
43         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
44
45         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
46                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
47         self.crypt_algo = 'AES-CBC'  # scapy name
48         self.crypt_key = b'JPjyOWBeVEQiMe7h'
49         self.salt = 0
50         self.flags = 0
51         self.nat_header = None
52
53
54 class IPsecIPv6Params(object):
55
56     addr_type = socket.AF_INET6
57     addr_any = "0::0"
58     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
59     addr_len = 128
60     is_ipv6 = 1
61
62     def __init__(self):
63         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
64         self.remote_tun_if_host4 = '1.1.1.1'
65
66         self.scapy_tun_sa_id = 50
67         self.scapy_tun_spi = 3001
68         self.vpp_tun_sa_id = 60
69         self.vpp_tun_spi = 3000
70
71         self.scapy_tra_sa_id = 70
72         self.scapy_tra_spi = 4001
73         self.vpp_tra_sa_id = 80
74         self.vpp_tra_spi = 4000
75
76         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
77                                  IPSEC_API_INTEG_ALG_SHA1_96)
78         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
79         self.auth_key = b'C91KUR9GYMm5GfkEvNjX'
80
81         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
82                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
83         self.crypt_algo = 'AES-CBC'  # scapy name
84         self.crypt_key = b'JPjyOWBeVEQiMe7h'
85         self.salt = 0
86         self.flags = 0
87         self.nat_header = None
88
89
90 def mk_scapy_crypt_key(p):
91     if p.crypt_algo == "AES-GCM":
92         return p.crypt_key + struct.pack("!I", p.salt)
93     else:
94         return p.crypt_key
95
96
97 def config_tun_params(p, encryption_type, tun_if):
98     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
99     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
100                              IPSEC_API_SAD_FLAG_USE_ESN))
101     p.tun_dst = tun_if.remote_addr[p.addr_type]
102     p.tun_src = tun_if.local_addr[p.addr_type]
103     crypt_key = mk_scapy_crypt_key(p)
104     p.scapy_tun_sa = SecurityAssociation(
105         encryption_type, spi=p.vpp_tun_spi,
106         crypt_algo=p.crypt_algo,
107         crypt_key=crypt_key,
108         auth_algo=p.auth_algo, auth_key=p.auth_key,
109         tunnel_header=ip_class_by_addr_type[p.addr_type](
110             src=p.tun_dst,
111             dst=p.tun_src),
112         nat_t_header=p.nat_header,
113         esn_en=esn_en)
114     p.vpp_tun_sa = SecurityAssociation(
115         encryption_type, spi=p.scapy_tun_spi,
116         crypt_algo=p.crypt_algo,
117         crypt_key=crypt_key,
118         auth_algo=p.auth_algo, auth_key=p.auth_key,
119         tunnel_header=ip_class_by_addr_type[p.addr_type](
120             dst=p.tun_dst,
121             src=p.tun_src),
122         nat_t_header=p.nat_header,
123         esn_en=esn_en)
124
125
126 def config_tra_params(p, encryption_type):
127     esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
128                              IPSEC_API_SAD_FLAG_USE_ESN))
129     crypt_key = mk_scapy_crypt_key(p)
130     p.scapy_tra_sa = SecurityAssociation(
131         encryption_type,
132         spi=p.vpp_tra_spi,
133         crypt_algo=p.crypt_algo,
134         crypt_key=crypt_key,
135         auth_algo=p.auth_algo,
136         auth_key=p.auth_key,
137         nat_t_header=p.nat_header,
138         esn_en=esn_en)
139     p.vpp_tra_sa = SecurityAssociation(
140         encryption_type,
141         spi=p.scapy_tra_spi,
142         crypt_algo=p.crypt_algo,
143         crypt_key=crypt_key,
144         auth_algo=p.auth_algo,
145         auth_key=p.auth_key,
146         nat_t_header=p.nat_header,
147         esn_en=esn_en)
148
149
150 class TemplateIpsec(VppTestCase):
151     """
152     TRANSPORT MODE:
153
154      ------   encrypt   ---
155     |tra_if| <-------> |VPP|
156      ------   decrypt   ---
157
158     TUNNEL MODE:
159
160      ------   encrypt   ---   plain   ---
161     |tun_if| <-------  |VPP| <------ |pg1|
162      ------             ---           ---
163
164      ------   decrypt   ---   plain   ---
165     |tun_if| ------->  |VPP| ------> |pg1|
166      ------             ---           ---
167     """
168     tun_spd_id = 1
169     tra_spd_id = 2
170
171     def ipsec_select_backend(self):
172         """ empty method to be overloaded when necessary """
173         pass
174
175     @classmethod
176     def setUpClass(cls):
177         super(TemplateIpsec, cls).setUpClass()
178
179     @classmethod
180     def tearDownClass(cls):
181         super(TemplateIpsec, cls).tearDownClass()
182
183     def setup_params(self):
184         self.ipv4_params = IPsecIPv4Params()
185         self.ipv6_params = IPsecIPv6Params()
186         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
187                        self.ipv6_params.addr_type: self.ipv6_params}
188
189     def config_interfaces(self):
190         self.create_pg_interfaces(range(3))
191         self.interfaces = list(self.pg_interfaces)
192         for i in self.interfaces:
193             i.admin_up()
194             i.config_ip4()
195             i.resolve_arp()
196             i.config_ip6()
197             i.resolve_ndp()
198
199     def setUp(self):
200         super(TemplateIpsec, self).setUp()
201
202         self.setup_params()
203
204         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
205                                  IPSEC_API_PROTO_ESP)
206         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
207                                 IPSEC_API_PROTO_AH)
208
209         self.config_interfaces()
210
211         self.ipsec_select_backend()
212
213     def unconfig_interfaces(self):
214         for i in self.interfaces:
215             i.admin_down()
216             i.unconfig_ip4()
217             i.unconfig_ip6()
218
219     def tearDown(self):
220         super(TemplateIpsec, self).tearDown()
221
222         self.unconfig_interfaces()
223
224     def show_commands_at_teardown(self):
225         self.logger.info(self.vapi.cli("show hardware"))
226
227     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
228                          payload_size=54):
229         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
230                 sa.encrypt(IP(src=src, dst=dst) /
231                            ICMP() / Raw(b'X' * payload_size))
232                 for i in range(count)]
233
234     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
235                           payload_size=54):
236         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
237                 sa.encrypt(IPv6(src=src, dst=dst) /
238                            ICMPv6EchoRequest(id=0, seq=1,
239                                              data='X' * payload_size))
240                 for i in range(count)]
241
242     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IP(src=src, dst=dst) / ICMP() / Raw(b'X' * payload_size)
245                 for i in range(count)]
246
247     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
248         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
249                 IPv6(src=src, dst=dst) /
250                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
251                 for i in range(count)]
252
253
254 class IpsecTcp(object):
255     def verify_tcp_checksum(self):
256         self.vapi.cli("test http server")
257         p = self.params[socket.AF_INET]
258         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
259                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
260                                           dst=self.tun_if.local_ip4) /
261                                        TCP(flags='S', dport=80)))
262         self.logger.debug(ppp("Sending packet:", send))
263         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
264         recv = recv[0]
265         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
266         self.assert_packet_checksums_valid(decrypted)
267
268
269 class IpsecTcpTests(IpsecTcp):
270     def test_tcp_checksum(self):
271         """ verify checksum correctness for vpp generated packets """
272         self.verify_tcp_checksum()
273
274
275 class IpsecTra4(object):
276     """ verify methods for Transport v4 """
277     def verify_tra_anti_replay(self):
278         p = self.params[socket.AF_INET]
279         esn_en = p.vpp_tra_sa.esn_en
280
281         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
282                                self.tra4_encrypt_node_name)
283         replay_node_name = ('/err/%s/SA replayed packet' %
284                             self.tra4_decrypt_node_name)
285         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
286             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
287                                      self.tra4_decrypt_node_name)
288         else:
289             hash_failed_node_name = ('/err/%s/Integrity check failed' %
290                                      self.tra4_decrypt_node_name)
291         replay_count = self.statistics.get_err_counter(replay_node_name)
292         hash_failed_count = self.statistics.get_err_counter(
293             hash_failed_node_name)
294         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
295
296         if ESP == self.encryption_type:
297             undersize_node_name = ('/err/%s/undersized packet' %
298                                    self.tra4_decrypt_node_name)
299             undersize_count = self.statistics.get_err_counter(
300                 undersize_node_name)
301
302         #
303         # send packets with seq numbers 1->34
304         # this means the window size is still in Case B (see RFC4303
305         # Appendix A)
306         #
307         # for reasons i haven't investigated Scapy won't create a packet with
308         # seq_num=0
309         #
310         pkts = [(Ether(src=self.tra_if.remote_mac,
311                        dst=self.tra_if.local_mac) /
312                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
313                                            dst=self.tra_if.local_ip4) /
314                                         ICMP(),
315                                         seq_num=seq))
316                 for seq in range(1, 34)]
317         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
318
319         # replayed packets are dropped
320         self.send_and_assert_no_replies(self.tra_if, pkts)
321         replay_count += len(pkts)
322         self.assert_error_counter_equal(replay_node_name, replay_count)
323
324         #
325         # now send a batch of packets all with the same sequence number
326         # the first packet in the batch is legitimate, the rest bogus
327         #
328         pkts = (Ether(src=self.tra_if.remote_mac,
329                       dst=self.tra_if.local_mac) /
330                 p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
331                                           dst=self.tra_if.local_ip4) /
332                                        ICMP(),
333                                        seq_num=35))
334         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8,
335                                          self.tra_if, n_rx=1)
336         replay_count += 7
337         self.assert_error_counter_equal(replay_node_name, replay_count)
338
339         #
340         # now move the window over to 257 (more than one byte) and into Case A
341         #
342         pkt = (Ether(src=self.tra_if.remote_mac,
343                      dst=self.tra_if.local_mac) /
344                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
345                                          dst=self.tra_if.local_ip4) /
346                                       ICMP(),
347                                       seq_num=257))
348         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
349
350         # replayed packets are dropped
351         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
352         replay_count += 3
353         self.assert_error_counter_equal(replay_node_name, replay_count)
354
355         # the window size is 64 packets
356         # in window are still accepted
357         pkt = (Ether(src=self.tra_if.remote_mac,
358                      dst=self.tra_if.local_mac) /
359                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
360                                          dst=self.tra_if.local_ip4) /
361                                       ICMP(),
362                                       seq_num=200))
363         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
364
365         # a packet that does not decrypt does not move the window forward
366         bogus_sa = SecurityAssociation(self.encryption_type,
367                                        p.vpp_tra_spi,
368                                        crypt_algo=p.crypt_algo,
369                                        crypt_key=mk_scapy_crypt_key(p)[::-1],
370                                        auth_algo=p.auth_algo,
371                                        auth_key=p.auth_key[::-1])
372         pkt = (Ether(src=self.tra_if.remote_mac,
373                      dst=self.tra_if.local_mac) /
374                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375                                    dst=self.tra_if.local_ip4) /
376                                 ICMP(),
377                                 seq_num=350))
378         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
379
380         hash_failed_count += 17
381         self.assert_error_counter_equal(hash_failed_node_name,
382                                         hash_failed_count)
383
384         # a malformed 'runt' packet
385         #  created by a mis-constructed SA
386         if (ESP == self.encryption_type and p.crypt_algo != "NULL"):
387             bogus_sa = SecurityAssociation(self.encryption_type,
388                                            p.vpp_tra_spi)
389             pkt = (Ether(src=self.tra_if.remote_mac,
390                          dst=self.tra_if.local_mac) /
391                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
392                                        dst=self.tra_if.local_ip4) /
393                                     ICMP(),
394                                     seq_num=350))
395             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
396
397             undersize_count += 17
398             self.assert_error_counter_equal(undersize_node_name,
399                                             undersize_count)
400
401         # which we can determine since this packet is still in the window
402         pkt = (Ether(src=self.tra_if.remote_mac,
403                      dst=self.tra_if.local_mac) /
404                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
405                                          dst=self.tra_if.local_ip4) /
406                                       ICMP(),
407                                       seq_num=234))
408         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
409
410         #
411         # out of window are dropped
412         #  this is Case B. So VPP will consider this to be a high seq num wrap
413         #  and so the decrypt attempt will fail
414         #
415         pkt = (Ether(src=self.tra_if.remote_mac,
416                      dst=self.tra_if.local_mac) /
417                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
418                                          dst=self.tra_if.local_ip4) /
419                                       ICMP(),
420                                       seq_num=17))
421         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
422
423         if esn_en:
424             # an out of window error with ESN looks like a high sequence
425             # wrap. but since it isn't then the verify will fail.
426             hash_failed_count += 17
427             self.assert_error_counter_equal(hash_failed_node_name,
428                                             hash_failed_count)
429
430         else:
431             replay_count += 17
432             self.assert_error_counter_equal(replay_node_name,
433                                             replay_count)
434
435         # valid packet moves the window over to 258
436         pkt = (Ether(src=self.tra_if.remote_mac,
437                      dst=self.tra_if.local_mac) /
438                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439                                          dst=self.tra_if.local_ip4) /
440                                       ICMP(),
441                                       seq_num=258))
442         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
443         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
444
445         #
446         # move VPP's SA TX seq-num to just before the seq-number wrap.
447         # then fire in a packet that VPP should drop on TX because it
448         # causes the TX seq number to wrap; unless we're using extened sequence
449         # numbers.
450         #
451         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
452         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
453         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
454
455         pkts = [(Ether(src=self.tra_if.remote_mac,
456                        dst=self.tra_if.local_mac) /
457                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
458                                            dst=self.tra_if.local_ip4) /
459                                         ICMP(),
460                                         seq_num=seq))
461                 for seq in range(259, 280)]
462
463         if esn_en:
464             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
465
466             #
467             # in order for scapy to decrypt its SA's high order number needs
468             # to wrap
469             #
470             p.vpp_tra_sa.seq_num = 0x100000000
471             for rx in rxs:
472                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
473
474             #
475             # wrap scapy's TX high sequence number. VPP is in case B, so it
476             # will consider this a high seq wrap also.
477             # The low seq num we set it to will place VPP's RX window in Case A
478             #
479             p.scapy_tra_sa.seq_num = 0x100000005
480             pkt = (Ether(src=self.tra_if.remote_mac,
481                          dst=self.tra_if.local_mac) /
482                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
483                                              dst=self.tra_if.local_ip4) /
484                                           ICMP(),
485                                           seq_num=0x100000005))
486             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
487             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
488
489             #
490             # A packet that has seq num between (2^32-64) and 5 is within
491             # the window
492             #
493             p.scapy_tra_sa.seq_num = 0xfffffffd
494             pkt = (Ether(src=self.tra_if.remote_mac,
495                          dst=self.tra_if.local_mac) /
496                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
497                                              dst=self.tra_if.local_ip4) /
498                                           ICMP(),
499                                           seq_num=0xfffffffd))
500             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
501             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
502
503             #
504             # While in case A we cannot wrap the high sequence number again
505             # becuase VPP will consider this packet to be one that moves the
506             # window forward
507             #
508             pkt = (Ether(src=self.tra_if.remote_mac,
509                          dst=self.tra_if.local_mac) /
510                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
511                                              dst=self.tra_if.local_ip4) /
512                                           ICMP(),
513                                           seq_num=0x200000999))
514             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
515
516             hash_failed_count += 1
517             self.assert_error_counter_equal(hash_failed_node_name,
518                                             hash_failed_count)
519
520             #
521             # but if we move the wondow forward to case B, then we can wrap
522             # again
523             #
524             p.scapy_tra_sa.seq_num = 0x100000555
525             pkt = (Ether(src=self.tra_if.remote_mac,
526                          dst=self.tra_if.local_mac) /
527                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
528                                              dst=self.tra_if.local_ip4) /
529                                           ICMP(),
530                                           seq_num=0x100000555))
531             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
532             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
533
534             p.scapy_tra_sa.seq_num = 0x200000444
535             pkt = (Ether(src=self.tra_if.remote_mac,
536                          dst=self.tra_if.local_mac) /
537                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
538                                              dst=self.tra_if.local_ip4) /
539                                           ICMP(),
540                                           seq_num=0x200000444))
541             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
542             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
543
544         else:
545             #
546             # without ESN TX sequence numbers can't wrap and packets are
547             # dropped from here on out.
548             #
549             self.send_and_assert_no_replies(self.tra_if, pkts)
550             seq_cycle_count += len(pkts)
551             self.assert_error_counter_equal(seq_cycle_node_name,
552                                             seq_cycle_count)
553
554         # move the security-associations seq number on to the last we used
555         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
556         p.scapy_tra_sa.seq_num = 351
557         p.vpp_tra_sa.seq_num = 351
558
559     def verify_tra_basic4(self, count=1, payload_size=54):
560         """ ipsec v4 transport basic test """
561         self.vapi.cli("clear errors")
562         self.vapi.cli("clear ipsec sa")
563         try:
564             p = self.params[socket.AF_INET]
565             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
566                                               src=self.tra_if.remote_ip4,
567                                               dst=self.tra_if.local_ip4,
568                                               count=count,
569                                               payload_size=payload_size)
570             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
571                                              self.tra_if)
572             for rx in recv_pkts:
573                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
574                 self.assert_packet_checksums_valid(rx)
575                 try:
576                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
577                     self.assert_packet_checksums_valid(decrypted)
578                 except:
579                     self.logger.debug(ppp("Unexpected packet:", rx))
580                     raise
581         finally:
582             self.logger.info(self.vapi.ppcli("show error"))
583             self.logger.info(self.vapi.ppcli("show ipsec all"))
584
585         pkts = p.tra_sa_in.get_stats()['packets']
586         self.assertEqual(pkts, count,
587                          "incorrect SA in counts: expected %d != %d" %
588                          (count, pkts))
589         pkts = p.tra_sa_out.get_stats()['packets']
590         self.assertEqual(pkts, count,
591                          "incorrect SA out counts: expected %d != %d" %
592                          (count, pkts))
593
594         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
595         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
596
597
598 class IpsecTra4Tests(IpsecTra4):
599     """ UT test methods for Transport v4 """
600     def test_tra_anti_replay(self):
601         """ ipsec v4 transport anti-replay test """
602         self.verify_tra_anti_replay()
603
604     def test_tra_basic(self, count=1):
605         """ ipsec v4 transport basic test """
606         self.verify_tra_basic4(count=1)
607
608     def test_tra_burst(self):
609         """ ipsec v4 transport burst test """
610         self.verify_tra_basic4(count=257)
611
612
613 class IpsecTra6(object):
614     """ verify methods for Transport v6 """
615     def verify_tra_basic6(self, count=1, payload_size=54):
616         self.vapi.cli("clear errors")
617         self.vapi.cli("clear ipsec sa")
618         try:
619             p = self.params[socket.AF_INET6]
620             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
621                                                src=self.tra_if.remote_ip6,
622                                                dst=self.tra_if.local_ip6,
623                                                count=count,
624                                                payload_size=payload_size)
625             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
626                                              self.tra_if)
627             for rx in recv_pkts:
628                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
629                                  rx[IPv6].plen)
630                 try:
631                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
632                     self.assert_packet_checksums_valid(decrypted)
633                 except:
634                     self.logger.debug(ppp("Unexpected packet:", rx))
635                     raise
636         finally:
637             self.logger.info(self.vapi.ppcli("show error"))
638             self.logger.info(self.vapi.ppcli("show ipsec all"))
639
640         pkts = p.tra_sa_in.get_stats()['packets']
641         self.assertEqual(pkts, count,
642                          "incorrect SA in counts: expected %d != %d" %
643                          (count, pkts))
644         pkts = p.tra_sa_out.get_stats()['packets']
645         self.assertEqual(pkts, count,
646                          "incorrect SA out counts: expected %d != %d" %
647                          (count, pkts))
648         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
649         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
650
651     def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
652                                    payload_size=54):
653         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
654                 sa.encrypt(IPv6(src=src, dst=dst) /
655                            ICMPv6EchoRequest(id=0, seq=1,
656                                              data='X' * payload_size))
657                 for i in range(count)]
658
659     def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
660         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
661                 IPv6(src=src, dst=dst) /
662                 IPv6ExtHdrHopByHop() /
663                 IPv6ExtHdrFragment(id=2, offset=200) /
664                 Raw(b'\xff' * 200)
665                 for i in range(count)]
666
667     def verify_tra_encrypted6(self, p, sa, rxs):
668         decrypted = []
669         for rx in rxs:
670             self.assert_packet_checksums_valid(rx)
671             try:
672                 decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
673                 decrypted.append(decrypt_pkt)
674                 self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
675                 self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
676             except:
677                 self.logger.debug(ppp("Unexpected packet:", rx))
678                 try:
679                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
680                 except:
681                     pass
682                 raise
683         return decrypted
684
685     def verify_tra_66_ext_hdrs(self, p):
686         count = 63
687
688         #
689         # check we can decrypt with options
690         #
691         tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
692                                              src=self.tra_if.remote_ip6,
693                                              dst=self.tra_if.local_ip6,
694                                              count=count)
695         self.send_and_expect(self.tra_if, tx, self.tra_if)
696
697         #
698         # injecting a packet from ourselves to be routed of box is a hack
699         # but it matches an outbout policy, alors je ne regrette rien
700         #
701
702         # one extension before ESP
703         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
704               IPv6(src=self.tra_if.local_ip6,
705                    dst=self.tra_if.remote_ip6) /
706               IPv6ExtHdrFragment(id=2, offset=200) /
707               Raw(b'\xff' * 200))
708
709         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
710         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
711
712         for dc in dcs:
713             # for reasons i'm not going to investigate scapy does not
714             # created the correct headers after decrypt. but reparsing
715             # the ipv6 packet fixes it
716             dc = IPv6(raw(dc[IPv6]))
717             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
718
719         # two extensions before ESP
720         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
721               IPv6(src=self.tra_if.local_ip6,
722                    dst=self.tra_if.remote_ip6) /
723               IPv6ExtHdrHopByHop() /
724               IPv6ExtHdrFragment(id=2, offset=200) /
725               Raw(b'\xff' * 200))
726
727         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
728         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
729
730         for dc in dcs:
731             dc = IPv6(raw(dc[IPv6]))
732             self.assertTrue(dc[IPv6ExtHdrHopByHop])
733             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
734
735         # two extensions before ESP, one after
736         tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
737               IPv6(src=self.tra_if.local_ip6,
738                    dst=self.tra_if.remote_ip6) /
739               IPv6ExtHdrHopByHop() /
740               IPv6ExtHdrFragment(id=2, offset=200) /
741               IPv6ExtHdrDestOpt() /
742               Raw(b'\xff' * 200))
743
744         rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
745         dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
746
747         for dc in dcs:
748             dc = IPv6(raw(dc[IPv6]))
749             self.assertTrue(dc[IPv6ExtHdrDestOpt])
750             self.assertTrue(dc[IPv6ExtHdrHopByHop])
751             self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
752
753
754 class IpsecTra6Tests(IpsecTra6):
755     """ UT test methods for Transport v6 """
756     def test_tra_basic6(self):
757         """ ipsec v6 transport basic test """
758         self.verify_tra_basic6(count=1)
759
760     def test_tra_burst6(self):
761         """ ipsec v6 transport burst test """
762         self.verify_tra_basic6(count=257)
763
764
765 class IpsecTra6ExtTests(IpsecTra6):
766     def test_tra_ext_hdrs_66(self):
767         """ ipsec 6o6 tra extension headers test """
768         self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
769
770
771 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
772     """ UT test methods for Transport v6 and v4"""
773     pass
774
775
776 class IpsecTun4(object):
777     """ verify methods for Tunnel v4 """
778     def verify_counters4(self, p, count, n_frags=None, worker=None):
779         if not n_frags:
780             n_frags = count
781         if (hasattr(p, "spd_policy_in_any")):
782             pkts = p.spd_policy_in_any.get_stats(worker)['packets']
783             self.assertEqual(pkts, count,
784                              "incorrect SPD any policy: expected %d != %d" %
785                              (count, pkts))
786
787         if (hasattr(p, "tun_sa_in")):
788             pkts = p.tun_sa_in.get_stats(worker)['packets']
789             self.assertEqual(pkts, count,
790                              "incorrect SA in counts: expected %d != %d" %
791                              (count, pkts))
792             pkts = p.tun_sa_out.get_stats(worker)['packets']
793             self.assertEqual(pkts, count,
794                              "incorrect SA out counts: expected %d != %d" %
795                              (count, pkts))
796
797         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
798         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
799
800     def verify_decrypted(self, p, rxs):
801         for rx in rxs:
802             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
803             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
804             self.assert_packet_checksums_valid(rx)
805
806     def verify_encrypted(self, p, sa, rxs):
807         decrypt_pkts = []
808         for rx in rxs:
809             if p.nat_header:
810                 self.assertEqual(rx[UDP].dport, 4500)
811             self.assert_packet_checksums_valid(rx)
812             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
813             try:
814                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
815                 if not decrypt_pkt.haslayer(IP):
816                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
817                 decrypt_pkts.append(decrypt_pkt)
818                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
819                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
820             except:
821                 self.logger.debug(ppp("Unexpected packet:", rx))
822                 try:
823                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
824                 except:
825                     pass
826                 raise
827         pkts = reassemble4(decrypt_pkts)
828         for pkt in pkts:
829             self.assert_packet_checksums_valid(pkt)
830
831     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
832         self.vapi.cli("clear errors")
833         self.vapi.cli("clear ipsec counters")
834         if not n_rx:
835             n_rx = count
836         try:
837             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
838                                               src=p.remote_tun_if_host,
839                                               dst=self.pg1.remote_ip4,
840                                               count=count,
841                                               payload_size=payload_size)
842             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
843             self.verify_decrypted(p, recv_pkts)
844
845             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
846                                       dst=p.remote_tun_if_host, count=count,
847                                       payload_size=payload_size)
848             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
849                                              self.tun_if, n_rx)
850             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
851
852             for rx in recv_pkts:
853                 self.assertEqual(rx[IP].src, p.tun_src)
854                 self.assertEqual(rx[IP].dst, p.tun_dst)
855
856         finally:
857             self.logger.info(self.vapi.ppcli("show error"))
858             self.logger.info(self.vapi.ppcli("show ipsec all"))
859
860         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
861         self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
862         self.verify_counters4(p, count, n_rx)
863
864     def verify_tun_reass_44(self, p):
865         self.vapi.cli("clear errors")
866         self.vapi.ip_reassembly_enable_disable(
867             sw_if_index=self.tun_if.sw_if_index, enable_ip4=True)
868
869         try:
870             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
871                                               src=p.remote_tun_if_host,
872                                               dst=self.pg1.remote_ip4,
873                                               payload_size=1900,
874                                               count=1)
875             send_pkts = fragment_rfc791(send_pkts[0], 1400)
876             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
877                                              self.pg1, n_rx=1)
878             self.verify_decrypted(p, recv_pkts)
879
880             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
881                                       dst=p.remote_tun_if_host, count=1)
882             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
883                                              self.tun_if)
884             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
885
886         finally:
887             self.logger.info(self.vapi.ppcli("show error"))
888             self.logger.info(self.vapi.ppcli("show ipsec all"))
889
890         self.verify_counters4(p, 1, 1)
891         self.vapi.ip_reassembly_enable_disable(
892             sw_if_index=self.tun_if.sw_if_index, enable_ip4=False)
893
894     def verify_tun_64(self, p, count=1):
895         self.vapi.cli("clear errors")
896         try:
897             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
898                                                src=p.remote_tun_if_host6,
899                                                dst=self.pg1.remote_ip6,
900                                                count=count)
901             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
902             for recv_pkt in recv_pkts:
903                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
904                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
905                 self.assert_packet_checksums_valid(recv_pkt)
906             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
907                                        dst=p.remote_tun_if_host6, count=count)
908             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
909             for recv_pkt in recv_pkts:
910                 try:
911                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
912                     if not decrypt_pkt.haslayer(IPv6):
913                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
914                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
915                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
916                     self.assert_packet_checksums_valid(decrypt_pkt)
917                 except:
918                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
919                     try:
920                         self.logger.debug(
921                             ppp("Decrypted packet:", decrypt_pkt))
922                     except:
923                         pass
924                     raise
925         finally:
926             self.logger.info(self.vapi.ppcli("show error"))
927             self.logger.info(self.vapi.ppcli("show ipsec all"))
928
929         self.verify_counters4(p, count)
930
931     def verify_keepalive(self, p):
932         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
933                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
934                UDP(sport=333, dport=4500) /
935                Raw(b'\xff'))
936         self.send_and_assert_no_replies(self.tun_if, pkt*31)
937         self.assert_error_counter_equal(
938             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
939
940         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
941                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
942                UDP(sport=333, dport=4500) /
943                Raw(b'\xfe'))
944         self.send_and_assert_no_replies(self.tun_if, pkt*31)
945         self.assert_error_counter_equal(
946             '/err/%s/Too Short' % self.tun4_input_node, 31)
947
948
949 class IpsecTun4Tests(IpsecTun4):
950     """ UT test methods for Tunnel v4 """
951     def test_tun_basic44(self):
952         """ ipsec 4o4 tunnel basic test """
953         self.verify_tun_44(self.params[socket.AF_INET], count=1)
954         self.tun_if.admin_down()
955         self.tun_if.resolve_arp()
956         self.tun_if.admin_up()
957         self.verify_tun_44(self.params[socket.AF_INET], count=1)
958
959     def test_tun_reass_basic44(self):
960         """ ipsec 4o4 tunnel basic reassembly test """
961         self.verify_tun_reass_44(self.params[socket.AF_INET])
962
963     def test_tun_burst44(self):
964         """ ipsec 4o4 tunnel burst test """
965         self.verify_tun_44(self.params[socket.AF_INET], count=127)
966
967
968 class IpsecTun6(object):
969     """ verify methods for Tunnel v6 """
970     def verify_counters6(self, p_in, p_out, count, worker=None):
971         if (hasattr(p_in, "tun_sa_in")):
972             pkts = p_in.tun_sa_in.get_stats(worker)['packets']
973             self.assertEqual(pkts, count,
974                              "incorrect SA in counts: expected %d != %d" %
975                              (count, pkts))
976         if (hasattr(p_out, "tun_sa_out")):
977             pkts = p_out.tun_sa_out.get_stats(worker)['packets']
978             self.assertEqual(pkts, count,
979                              "incorrect SA out counts: expected %d != %d" %
980                              (count, pkts))
981         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
982         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
983
984     def verify_decrypted6(self, p, rxs):
985         for rx in rxs:
986             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
987             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
988             self.assert_packet_checksums_valid(rx)
989
990     def verify_encrypted6(self, p, sa, rxs):
991         for rx in rxs:
992             self.assert_packet_checksums_valid(rx)
993             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
994                              rx[IPv6].plen)
995             try:
996                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
997                 if not decrypt_pkt.haslayer(IPv6):
998                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
999                 self.assert_packet_checksums_valid(decrypt_pkt)
1000                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
1001                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
1002             except:
1003                 self.logger.debug(ppp("Unexpected packet:", rx))
1004                 try:
1005                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
1006                 except:
1007                     pass
1008                 raise
1009
1010     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
1011         self.vapi.cli("clear errors")
1012         self.vapi.cli("clear ipsec sa")
1013
1014         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1015                                            src=p_in.remote_tun_if_host,
1016                                            dst=self.pg1.remote_ip6,
1017                                            count=count)
1018         self.send_and_assert_no_replies(self.tun_if, send_pkts)
1019         self.logger.info(self.vapi.cli("sh punt stats"))
1020
1021     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
1022         self.vapi.cli("clear errors")
1023         self.vapi.cli("clear ipsec sa")
1024         if not p_out:
1025             p_out = p_in
1026         try:
1027             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
1028                                                src=p_in.remote_tun_if_host,
1029                                                dst=self.pg1.remote_ip6,
1030                                                count=count,
1031                                                payload_size=payload_size)
1032             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1033             self.verify_decrypted6(p_in, recv_pkts)
1034
1035             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1036                                        dst=p_out.remote_tun_if_host,
1037                                        count=count,
1038                                        payload_size=payload_size)
1039             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1040             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
1041
1042             for rx in recv_pkts:
1043                 self.assertEqual(rx[IPv6].src, p_out.tun_src)
1044                 self.assertEqual(rx[IPv6].dst, p_out.tun_dst)
1045
1046         finally:
1047             self.logger.info(self.vapi.ppcli("show error"))
1048             self.logger.info(self.vapi.ppcli("show ipsec all"))
1049         self.verify_counters6(p_in, p_out, count)
1050
1051     def verify_tun_reass_66(self, p):
1052         self.vapi.cli("clear errors")
1053         self.vapi.ip_reassembly_enable_disable(
1054             sw_if_index=self.tun_if.sw_if_index, enable_ip6=True)
1055
1056         try:
1057             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1058                                                src=p.remote_tun_if_host,
1059                                                dst=self.pg1.remote_ip6,
1060                                                count=1,
1061                                                payload_size=1850)
1062             send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
1063             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1064                                              self.pg1, n_rx=1)
1065             self.verify_decrypted6(p, recv_pkts)
1066
1067             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1068                                        dst=p.remote_tun_if_host,
1069                                        count=1,
1070                                        payload_size=64)
1071             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1072                                              self.tun_if)
1073             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1074         finally:
1075             self.logger.info(self.vapi.ppcli("show error"))
1076             self.logger.info(self.vapi.ppcli("show ipsec all"))
1077         self.verify_counters6(p, p, 1)
1078         self.vapi.ip_reassembly_enable_disable(
1079             sw_if_index=self.tun_if.sw_if_index, enable_ip6=False)
1080
1081     def verify_tun_46(self, p, count=1):
1082         """ ipsec 4o6 tunnel basic test """
1083         self.vapi.cli("clear errors")
1084         try:
1085             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1086                                               src=p.remote_tun_if_host4,
1087                                               dst=self.pg1.remote_ip4,
1088                                               count=count)
1089             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
1090             for recv_pkt in recv_pkts:
1091                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
1092                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
1093                 self.assert_packet_checksums_valid(recv_pkt)
1094             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1095                                       dst=p.remote_tun_if_host4,
1096                                       count=count)
1097             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
1098             for recv_pkt in recv_pkts:
1099                 try:
1100                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
1101                     if not decrypt_pkt.haslayer(IP):
1102                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
1103                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
1104                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
1105                     self.assert_packet_checksums_valid(decrypt_pkt)
1106                 except:
1107                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
1108                     try:
1109                         self.logger.debug(ppp("Decrypted packet:",
1110                                               decrypt_pkt))
1111                     except:
1112                         pass
1113                     raise
1114         finally:
1115             self.logger.info(self.vapi.ppcli("show error"))
1116             self.logger.info(self.vapi.ppcli("show ipsec all"))
1117         self.verify_counters6(p, p, count)
1118
1119
1120 class IpsecTun6Tests(IpsecTun6):
1121     """ UT test methods for Tunnel v6 """
1122
1123     def test_tun_basic66(self):
1124         """ ipsec 6o6 tunnel basic test """
1125         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
1126
1127     def test_tun_reass_basic66(self):
1128         """ ipsec 6o6 tunnel basic reassembly test """
1129         self.verify_tun_reass_66(self.params[socket.AF_INET6])
1130
1131     def test_tun_burst66(self):
1132         """ ipsec 6o6 tunnel burst test """
1133         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
1134
1135
1136 class IpsecTun6HandoffTests(IpsecTun6):
1137     """ UT test methods for Tunnel v6 with multiple workers """
1138     worker_config = "workers 2"
1139
1140     def test_tun_handoff_66(self):
1141         """ ipsec 6o6 tunnel worker hand-off test """
1142         N_PKTS = 15
1143         p = self.params[socket.AF_INET6]
1144
1145         # inject alternately on worker 0 and 1. all counts on the SA
1146         # should be against worker 0
1147         for worker in [0, 1, 0, 1]:
1148             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
1149                                                src=p.remote_tun_if_host,
1150                                                dst=self.pg1.remote_ip6,
1151                                                count=N_PKTS)
1152             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1153                                              self.pg1, worker=worker)
1154             self.verify_decrypted6(p, recv_pkts)
1155
1156             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
1157                                        dst=p.remote_tun_if_host,
1158                                        count=N_PKTS)
1159             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1160                                              self.tun_if, worker=worker)
1161             self.verify_encrypted6(p, p.vpp_tun_sa, recv_pkts)
1162
1163         # all counts against the first worker that was used
1164         self.verify_counters6(p, p, 4*N_PKTS, worker=0)
1165
1166
1167 class IpsecTun4HandoffTests(IpsecTun4):
1168     """ UT test methods for Tunnel v4 with multiple workers """
1169     worker_config = "workers 2"
1170
1171     def test_tun_handooff_44(self):
1172         """ ipsec 4o4 tunnel worker hand-off test """
1173         N_PKTS = 15
1174         p = self.params[socket.AF_INET]
1175
1176         # inject alternately on worker 0 and 1. all counts on the SA
1177         # should be against worker 0
1178         for worker in [0, 1, 0, 1]:
1179             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
1180                                               src=p.remote_tun_if_host,
1181                                               dst=self.pg1.remote_ip4,
1182                                               count=N_PKTS)
1183             recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
1184                                              self.pg1, worker=worker)
1185             self.verify_decrypted(p, recv_pkts)
1186
1187             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
1188                                       dst=p.remote_tun_if_host,
1189                                       count=N_PKTS)
1190             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
1191                                              self.tun_if, worker=worker)
1192             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
1193
1194         # all counts against the first worker that was used
1195         self.verify_counters4(p, 4*N_PKTS, worker=0)
1196
1197
1198 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
1199     """ UT test methods for Tunnel v6 & v4 """
1200     pass
1201
1202
1203 if __name__ == '__main__':
1204     unittest.main(testRunner=VppTestRunner)