Tests: Refactor tearDown show command logging, add lifecycle markers.
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp, reassemble4
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.flags = 0
46         self.nat_header = None
47
48
49 class IPsecIPv6Params(object):
50
51     addr_type = socket.AF_INET6
52     addr_any = "0::0"
53     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
54     addr_len = 128
55     is_ipv6 = 1
56
57     def __init__(self):
58         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59         self.remote_tun_if_host4 = '1.1.1.1'
60
61         self.scapy_tun_sa_id = 50
62         self.scapy_tun_spi = 3001
63         self.vpp_tun_sa_id = 60
64         self.vpp_tun_spi = 3000
65
66         self.scapy_tra_sa_id = 70
67         self.scapy_tra_spi = 4001
68         self.vpp_tra_sa_id = 80
69         self.vpp_tra_spi = 4000
70
71         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
72                                  IPSEC_API_INTEG_ALG_SHA1_96)
73         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
74         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
75
76         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
77                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
78         self.crypt_algo = 'AES-CBC'  # scapy name
79         self.crypt_key = 'JPjyOWBeVEQiMe7h'
80         self.flags = 0
81         self.nat_header = None
82
83
84 def config_tun_params(p, encryption_type, tun_if):
85     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
86     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
87                               IPSEC_API_SAD_FLAG_USE_ESN))
88     p.scapy_tun_sa = SecurityAssociation(
89         encryption_type, spi=p.vpp_tun_spi,
90         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
91         auth_algo=p.auth_algo, auth_key=p.auth_key,
92         tunnel_header=ip_class_by_addr_type[p.addr_type](
93             src=tun_if.remote_addr[p.addr_type],
94             dst=tun_if.local_addr[p.addr_type]),
95         nat_t_header=p.nat_header,
96         use_esn=use_esn)
97     p.vpp_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.scapy_tun_spi,
99         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
100         auth_algo=p.auth_algo, auth_key=p.auth_key,
101         tunnel_header=ip_class_by_addr_type[p.addr_type](
102             dst=tun_if.remote_addr[p.addr_type],
103             src=tun_if.local_addr[p.addr_type]),
104         nat_t_header=p.nat_header,
105         use_esn=use_esn)
106
107
108 def config_tra_params(p, encryption_type):
109     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
110                               IPSEC_API_SAD_FLAG_USE_ESN))
111     p.scapy_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.vpp_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header,
119         use_esn=use_esn)
120     p.vpp_tra_sa = SecurityAssociation(
121         encryption_type,
122         spi=p.scapy_tra_spi,
123         crypt_algo=p.crypt_algo,
124         crypt_key=p.crypt_key,
125         auth_algo=p.auth_algo,
126         auth_key=p.auth_key,
127         nat_t_header=p.nat_header,
128         use_esn=use_esn)
129
130
131 class TemplateIpsec(VppTestCase):
132     """
133     TRANSPORT MODE:
134
135      ------   encrypt   ---
136     |tra_if| <-------> |VPP|
137      ------   decrypt   ---
138
139     TUNNEL MODE:
140
141      ------   encrypt   ---   plain   ---
142     |tun_if| <-------  |VPP| <------ |pg1|
143      ------             ---           ---
144
145      ------   decrypt   ---   plain   ---
146     |tun_if| ------->  |VPP| ------> |pg1|
147      ------             ---           ---
148     """
149     tun_spd_id = 1
150     tra_spd_id = 2
151
152     def ipsec_select_backend(self):
153         """ empty method to be overloaded when necessary """
154         pass
155
156     @classmethod
157     def setUpClass(cls):
158         super(TemplateIpsec, cls).setUpClass()
159
160     @classmethod
161     def tearDownClass(cls):
162         super(TemplateIpsec, cls).tearDownClass()
163
164     def setup_params(self):
165         self.ipv4_params = IPsecIPv4Params()
166         self.ipv6_params = IPsecIPv6Params()
167         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
168                        self.ipv6_params.addr_type: self.ipv6_params}
169
170     def config_interfaces(self):
171         self.create_pg_interfaces(range(3))
172         self.interfaces = list(self.pg_interfaces)
173         for i in self.interfaces:
174             i.admin_up()
175             i.config_ip4()
176             i.resolve_arp()
177             i.config_ip6()
178             i.resolve_ndp()
179
180     def setUp(self):
181         super(TemplateIpsec, self).setUp()
182
183         self.setup_params()
184
185         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
186                                  IPSEC_API_PROTO_ESP)
187         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
188                                 IPSEC_API_PROTO_AH)
189
190         self.config_interfaces()
191
192         self.ipsec_select_backend()
193
194     def unconfig_interfaces(self):
195         for i in self.interfaces:
196             i.admin_down()
197             i.unconfig_ip4()
198             i.unconfig_ip6()
199
200     def tearDown(self):
201         super(TemplateIpsec, self).tearDown()
202
203         self.unconfig_interfaces()
204
205     def show_commands_at_teardown(self):
206         self.logger.info(self.vapi.cli("show hardware"))
207
208     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
209                          payload_size=54):
210         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
211                 sa.encrypt(IP(src=src, dst=dst) /
212                            ICMP() / Raw('X' * payload_size))
213                 for i in range(count)]
214
215     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
216                           payload_size=54):
217         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
218                 sa.encrypt(IPv6(src=src, dst=dst) /
219                            ICMPv6EchoRequest(id=0, seq=1,
220                                              data='X' * payload_size))
221                 for i in range(count)]
222
223     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
224         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
225                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
226                 for i in range(count)]
227
228     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
229         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
230                 IPv6(src=src, dst=dst) /
231                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
232                 for i in range(count)]
233
234
235 class IpsecTcp(object):
236     def verify_tcp_checksum(self):
237         self.vapi.cli("test http server")
238         p = self.params[socket.AF_INET]
239         config_tun_params(p, self.encryption_type, self.tun_if)
240         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
241                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
242                                           dst=self.tun_if.local_ip4) /
243                                        TCP(flags='S', dport=80)))
244         self.logger.debug(ppp("Sending packet:", send))
245         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
246         recv = recv[0]
247         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
248         self.assert_packet_checksums_valid(decrypted)
249
250
251 class IpsecTcpTests(IpsecTcp):
252     def test_tcp_checksum(self):
253         """ verify checksum correctness for vpp generated packets """
254         self.verify_tcp_checksum()
255
256
257 class IpsecTra4(object):
258     """ verify methods for Transport v4 """
259     def verify_tra_anti_replay(self, count=1):
260         p = self.params[socket.AF_INET]
261         use_esn = p.vpp_tra_sa.use_esn
262
263         # fire in a packet with seq number 1
264         pkt = (Ether(src=self.tra_if.remote_mac,
265                      dst=self.tra_if.local_mac) /
266                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
267                                          dst=self.tra_if.local_ip4) /
268                                       ICMP(),
269                                       seq_num=1))
270         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
271
272         # now move the window over to 235
273         pkt = (Ether(src=self.tra_if.remote_mac,
274                      dst=self.tra_if.local_mac) /
275                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
276                                          dst=self.tra_if.local_ip4) /
277                                       ICMP(),
278                                       seq_num=235))
279         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
280
281         # replayed packets are dropped
282         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
283         self.assert_packet_counter_equal(
284             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
285
286         # the window size is 64 packets
287         # in window are still accepted
288         pkt = (Ether(src=self.tra_if.remote_mac,
289                      dst=self.tra_if.local_mac) /
290                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
291                                          dst=self.tra_if.local_ip4) /
292                                       ICMP(),
293                                       seq_num=172))
294         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
295
296         # a packet that does not decrypt does not move the window forward
297         bogus_sa = SecurityAssociation(self.encryption_type,
298                                        p.vpp_tra_spi)
299         pkt = (Ether(src=self.tra_if.remote_mac,
300                      dst=self.tra_if.local_mac) /
301                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
302                                    dst=self.tra_if.local_ip4) /
303                                 ICMP(),
304                                 seq_num=350))
305         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
306
307         self.assert_packet_counter_equal(
308             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
309
310         # which we can determine since this packet is still in the window
311         pkt = (Ether(src=self.tra_if.remote_mac,
312                      dst=self.tra_if.local_mac) /
313                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
314                                          dst=self.tra_if.local_ip4) /
315                                       ICMP(),
316                                       seq_num=234))
317         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
318
319         # out of window are dropped
320         pkt = (Ether(src=self.tra_if.remote_mac,
321                      dst=self.tra_if.local_mac) /
322                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
323                                          dst=self.tra_if.local_ip4) /
324                                       ICMP(),
325                                       seq_num=17))
326         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
327
328         if use_esn:
329             # an out of window error with ESN looks like a high sequence
330             # wrap. but since it isn't then the verify will fail.
331             self.assert_packet_counter_equal(
332                 '/err/%s/Integrity check failed' %
333                 self.tra4_decrypt_node_name, 34)
334
335         else:
336             self.assert_packet_counter_equal(
337                 '/err/%s/SA replayed packet' %
338                 self.tra4_decrypt_node_name, 20)
339
340         # valid packet moves the window over to 236
341         pkt = (Ether(src=self.tra_if.remote_mac,
342                      dst=self.tra_if.local_mac) /
343                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
344                                          dst=self.tra_if.local_ip4) /
345                                       ICMP(),
346                                       seq_num=236))
347         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
348         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
349
350         # move VPP's SA to just before the seq-number wrap
351         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
352
353         # then fire in a packet that VPP should drop because it causes the
354         # seq number to wrap  unless we're using extended.
355         pkt = (Ether(src=self.tra_if.remote_mac,
356                      dst=self.tra_if.local_mac) /
357                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
358                                          dst=self.tra_if.local_ip4) /
359                                       ICMP(),
360                                       seq_num=237))
361
362         if use_esn:
363             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
364             # in order to decrpyt the high order number needs to wrap
365             p.vpp_tra_sa.seq_num = 0x100000000
366             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
367
368             # send packets with high bits set
369             p.scapy_tra_sa.seq_num = 0x100000005
370             pkt = (Ether(src=self.tra_if.remote_mac,
371                          dst=self.tra_if.local_mac) /
372                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
373                                              dst=self.tra_if.local_ip4) /
374                                           ICMP(),
375                                           seq_num=0x100000005))
376             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
377             # in order to decrpyt the high order number needs to wrap
378             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
379         else:
380             self.send_and_assert_no_replies(self.tra_if, [pkt])
381             self.assert_packet_counter_equal(
382                 '/err/%s/sequence number cycled' %
383                 self.tra4_encrypt_node_name, 1)
384
385         # move the security-associations seq number on to the last we used
386         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
387         p.scapy_tra_sa.seq_num = 351
388         p.vpp_tra_sa.seq_num = 351
389
390     def verify_tra_basic4(self, count=1):
391         """ ipsec v4 transport basic test """
392         self.vapi.cli("clear errors")
393         try:
394             p = self.params[socket.AF_INET]
395             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
396                                               src=self.tra_if.remote_ip4,
397                                               dst=self.tra_if.local_ip4,
398                                               count=count)
399             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
400                                              self.tra_if)
401             for rx in recv_pkts:
402                 try:
403                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
404                     self.assert_packet_checksums_valid(decrypted)
405                 except:
406                     self.logger.debug(ppp("Unexpected packet:", rx))
407                     raise
408         finally:
409             self.logger.info(self.vapi.ppcli("show error"))
410             self.logger.info(self.vapi.ppcli("show ipsec"))
411
412         pkts = p.tra_sa_in.get_stats()['packets']
413         self.assertEqual(pkts, count,
414                          "incorrect SA in counts: expected %d != %d" %
415                          (count, pkts))
416         pkts = p.tra_sa_out.get_stats()['packets']
417         self.assertEqual(pkts, count,
418                          "incorrect SA out counts: expected %d != %d" %
419                          (count, pkts))
420
421         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
422         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
423
424
425 class IpsecTra4Tests(IpsecTra4):
426     """ UT test methods for Transport v4 """
427     def test_tra_anti_replay(self):
428         """ ipsec v4 transport anti-reply test """
429         self.verify_tra_anti_replay(count=1)
430
431     def test_tra_basic(self, count=1):
432         """ ipsec v4 transport basic test """
433         self.verify_tra_basic4(count=1)
434
435     def test_tra_burst(self):
436         """ ipsec v4 transport burst test """
437         self.verify_tra_basic4(count=257)
438
439
440 class IpsecTra6(object):
441     """ verify methods for Transport v6 """
442     def verify_tra_basic6(self, count=1):
443         self.vapi.cli("clear errors")
444         try:
445             p = self.params[socket.AF_INET6]
446             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
447                                                src=self.tra_if.remote_ip6,
448                                                dst=self.tra_if.local_ip6,
449                                                count=count)
450             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
451                                              self.tra_if)
452             for rx in recv_pkts:
453                 try:
454                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
455                     self.assert_packet_checksums_valid(decrypted)
456                 except:
457                     self.logger.debug(ppp("Unexpected packet:", rx))
458                     raise
459         finally:
460             self.logger.info(self.vapi.ppcli("show error"))
461             self.logger.info(self.vapi.ppcli("show ipsec"))
462
463         pkts = p.tra_sa_in.get_stats()['packets']
464         self.assertEqual(pkts, count,
465                          "incorrect SA in counts: expected %d != %d" %
466                          (count, pkts))
467         pkts = p.tra_sa_out.get_stats()['packets']
468         self.assertEqual(pkts, count,
469                          "incorrect SA out counts: expected %d != %d" %
470                          (count, pkts))
471         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
472         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
473
474
475 class IpsecTra6Tests(IpsecTra6):
476     """ UT test methods for Transport v6 """
477     def test_tra_basic6(self):
478         """ ipsec v6 transport basic test """
479         self.verify_tra_basic6(count=1)
480
481     def test_tra_burst6(self):
482         """ ipsec v6 transport burst test """
483         self.verify_tra_basic6(count=257)
484
485
486 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
487     """ UT test methods for Transport v6 and v4"""
488     pass
489
490
491 class IpsecTun4(object):
492     """ verify methods for Tunnel v4 """
493     def verify_counters(self, p, count):
494         if (hasattr(p, "spd_policy_in_any")):
495             pkts = p.spd_policy_in_any.get_stats()['packets']
496             self.assertEqual(pkts, count,
497                              "incorrect SPD any policy: expected %d != %d" %
498                              (count, pkts))
499
500         if (hasattr(p, "tun_sa_in")):
501             pkts = p.tun_sa_in.get_stats()['packets']
502             self.assertEqual(pkts, count,
503                              "incorrect SA in counts: expected %d != %d" %
504                              (count, pkts))
505             pkts = p.tun_sa_out.get_stats()['packets']
506             self.assertEqual(pkts, count,
507                              "incorrect SA out counts: expected %d != %d" %
508                              (count, pkts))
509
510         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
511         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
512
513     def verify_decrypted(self, p, rxs):
514         for rx in rxs:
515             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
516             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
517             self.assert_packet_checksums_valid(rx)
518
519     def verify_encrypted(self, p, sa, rxs):
520         decrypt_pkts = []
521         for rx in rxs:
522             try:
523                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
524                 if not decrypt_pkt.haslayer(IP):
525                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
526                 decrypt_pkts.append(decrypt_pkt)
527                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
528                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
529             except:
530                 self.logger.debug(ppp("Unexpected packet:", rx))
531                 try:
532                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
533                 except:
534                     pass
535                 raise
536         pkts = reassemble4(decrypt_pkts)
537         for pkt in pkts:
538             self.assert_packet_checksums_valid(pkt)
539
540     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
541         self.vapi.cli("clear errors")
542         if not n_rx:
543             n_rx = count
544         try:
545             config_tun_params(p, self.encryption_type, self.tun_if)
546             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
547                                               src=p.remote_tun_if_host,
548                                               dst=self.pg1.remote_ip4,
549                                               count=count)
550             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
551             self.verify_decrypted(p, recv_pkts)
552
553             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
554                                       dst=p.remote_tun_if_host, count=count,
555                                       payload_size=payload_size)
556             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
557                                              self.tun_if, n_rx)
558             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
559
560         finally:
561             self.logger.info(self.vapi.ppcli("show error"))
562             self.logger.info(self.vapi.ppcli("show ipsec"))
563
564         self.verify_counters(p, count)
565
566     def verify_tun_64(self, p, count=1):
567         self.vapi.cli("clear errors")
568         try:
569             config_tun_params(p, self.encryption_type, self.tun_if)
570             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
571                                                src=p.remote_tun_if_host6,
572                                                dst=self.pg1.remote_ip6,
573                                                count=count)
574             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
575             for recv_pkt in recv_pkts:
576                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
577                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
578                 self.assert_packet_checksums_valid(recv_pkt)
579             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
580                                        dst=p.remote_tun_if_host6, count=count)
581             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
582             for recv_pkt in recv_pkts:
583                 try:
584                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
585                     if not decrypt_pkt.haslayer(IPv6):
586                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
587                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
588                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
589                     self.assert_packet_checksums_valid(decrypt_pkt)
590                 except:
591                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
592                     try:
593                         self.logger.debug(
594                             ppp("Decrypted packet:", decrypt_pkt))
595                     except:
596                         pass
597                     raise
598         finally:
599             self.logger.info(self.vapi.ppcli("show error"))
600             self.logger.info(self.vapi.ppcli("show ipsec"))
601
602         self.verify_counters(p, count)
603
604
605 class IpsecTun4Tests(IpsecTun4):
606     """ UT test methods for Tunnel v4 """
607     def test_tun_basic44(self):
608         """ ipsec 4o4 tunnel basic test """
609         self.verify_tun_44(self.params[socket.AF_INET], count=1)
610
611     def test_tun_burst44(self):
612         """ ipsec 4o4 tunnel burst test """
613         self.verify_tun_44(self.params[socket.AF_INET], count=257)
614
615
616 class IpsecTun6(object):
617     """ verify methods for Tunnel v6 """
618     def verify_counters(self, p, count):
619         if (hasattr(p, "tun_sa_in")):
620             pkts = p.tun_sa_in.get_stats()['packets']
621             self.assertEqual(pkts, count,
622                              "incorrect SA in counts: expected %d != %d" %
623                              (count, pkts))
624             pkts = p.tun_sa_out.get_stats()['packets']
625             self.assertEqual(pkts, count,
626                              "incorrect SA out counts: expected %d != %d" %
627                              (count, pkts))
628         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
629         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
630
631     def verify_tun_66(self, p, count=1):
632         """ ipsec 6o6 tunnel basic test """
633         self.vapi.cli("clear errors")
634         try:
635             config_tun_params(p, self.encryption_type, self.tun_if)
636             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
637                                                src=p.remote_tun_if_host,
638                                                dst=self.pg1.remote_ip6,
639                                                count=count)
640             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
641             for recv_pkt in recv_pkts:
642                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
643                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
644                 self.assert_packet_checksums_valid(recv_pkt)
645             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
646                                        dst=p.remote_tun_if_host,
647                                        count=count)
648             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
649             for recv_pkt in recv_pkts:
650                 try:
651                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
652                     if not decrypt_pkt.haslayer(IPv6):
653                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
654                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
655                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
656                     self.assert_packet_checksums_valid(decrypt_pkt)
657                 except:
658                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
659                     try:
660                         self.logger.debug(
661                             ppp("Decrypted packet:", decrypt_pkt))
662                     except:
663                         pass
664                     raise
665         finally:
666             self.logger.info(self.vapi.ppcli("show error"))
667             self.logger.info(self.vapi.ppcli("show ipsec"))
668         self.verify_counters(p, count)
669
670     def verify_tun_46(self, p, count=1):
671         """ ipsec 4o6 tunnel basic test """
672         self.vapi.cli("clear errors")
673         try:
674             config_tun_params(p, self.encryption_type, self.tun_if)
675             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
676                                               src=p.remote_tun_if_host4,
677                                               dst=self.pg1.remote_ip4,
678                                               count=count)
679             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
680             for recv_pkt in recv_pkts:
681                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
682                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
683                 self.assert_packet_checksums_valid(recv_pkt)
684             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
685                                       dst=p.remote_tun_if_host4,
686                                       count=count)
687             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
688             for recv_pkt in recv_pkts:
689                 try:
690                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
691                     if not decrypt_pkt.haslayer(IP):
692                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
693                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
694                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
695                     self.assert_packet_checksums_valid(decrypt_pkt)
696                 except:
697                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
698                     try:
699                         self.logger.debug(ppp("Decrypted packet:",
700                                               decrypt_pkt))
701                     except:
702                         pass
703                     raise
704         finally:
705             self.logger.info(self.vapi.ppcli("show error"))
706             self.logger.info(self.vapi.ppcli("show ipsec"))
707         self.verify_counters(p, count)
708
709
710 class IpsecTun6Tests(IpsecTun6):
711     """ UT test methods for Tunnel v6 """
712
713     def test_tun_basic66(self):
714         """ ipsec 6o6 tunnel basic test """
715         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
716
717     def test_tun_burst66(self):
718         """ ipsec 6o6 tunnel burst test """
719         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
720
721
722 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
723     """ UT test methods for Tunnel v6 & v4 """
724     pass
725
726
727 if __name__ == '__main__':
728     unittest.main(testRunner=VppTestRunner)