make test: fix redundant setUp() in template_ipsec.py
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp, reassemble4
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.flags = 0
46         self.nat_header = None
47
48
49 class IPsecIPv6Params(object):
50
51     addr_type = socket.AF_INET6
52     addr_any = "0::0"
53     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
54     addr_len = 128
55     is_ipv6 = 1
56
57     def __init__(self):
58         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59         self.remote_tun_if_host4 = '1.1.1.1'
60
61         self.scapy_tun_sa_id = 50
62         self.scapy_tun_spi = 3001
63         self.vpp_tun_sa_id = 60
64         self.vpp_tun_spi = 3000
65
66         self.scapy_tra_sa_id = 70
67         self.scapy_tra_spi = 4001
68         self.vpp_tra_sa_id = 80
69         self.vpp_tra_spi = 4000
70
71         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
72                                  IPSEC_API_INTEG_ALG_SHA1_96)
73         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
74         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
75
76         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
77                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
78         self.crypt_algo = 'AES-CBC'  # scapy name
79         self.crypt_key = 'JPjyOWBeVEQiMe7h'
80         self.flags = 0
81         self.nat_header = None
82
83
84 def config_tun_params(p, encryption_type, tun_if):
85     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
86     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
87                               IPSEC_API_SAD_FLAG_USE_ESN))
88     p.scapy_tun_sa = SecurityAssociation(
89         encryption_type, spi=p.vpp_tun_spi,
90         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
91         auth_algo=p.auth_algo, auth_key=p.auth_key,
92         tunnel_header=ip_class_by_addr_type[p.addr_type](
93             src=tun_if.remote_addr[p.addr_type],
94             dst=tun_if.local_addr[p.addr_type]),
95         nat_t_header=p.nat_header,
96         use_esn=use_esn)
97     p.vpp_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.scapy_tun_spi,
99         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
100         auth_algo=p.auth_algo, auth_key=p.auth_key,
101         tunnel_header=ip_class_by_addr_type[p.addr_type](
102             dst=tun_if.remote_addr[p.addr_type],
103             src=tun_if.local_addr[p.addr_type]),
104         nat_t_header=p.nat_header,
105         use_esn=use_esn)
106
107
108 def config_tra_params(p, encryption_type):
109     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
110                               IPSEC_API_SAD_FLAG_USE_ESN))
111     p.scapy_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.vpp_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header,
119         use_esn=use_esn)
120     p.vpp_tra_sa = SecurityAssociation(
121         encryption_type,
122         spi=p.scapy_tra_spi,
123         crypt_algo=p.crypt_algo,
124         crypt_key=p.crypt_key,
125         auth_algo=p.auth_algo,
126         auth_key=p.auth_key,
127         nat_t_header=p.nat_header,
128         use_esn=use_esn)
129
130
131 class TemplateIpsec(VppTestCase):
132     """
133     TRANSPORT MODE:
134
135      ------   encrypt   ---
136     |tra_if| <-------> |VPP|
137      ------   decrypt   ---
138
139     TUNNEL MODE:
140
141      ------   encrypt   ---   plain   ---
142     |tun_if| <-------  |VPP| <------ |pg1|
143      ------             ---           ---
144
145      ------   decrypt   ---   plain   ---
146     |tun_if| ------->  |VPP| ------> |pg1|
147      ------             ---           ---
148     """
149     tun_spd_id = 1
150     tra_spd_id = 2
151
152     def ipsec_select_backend(self):
153         """ empty method to be overloaded when necessary """
154         pass
155
156     @classmethod
157     def setUpClass(cls):
158         super(TemplateIpsec, cls).setUpClass()
159
160     @classmethod
161     def tearDownClass(cls):
162         super(TemplateIpsec, cls).tearDownClass()
163
164     def setup_params(self):
165         self.ipv4_params = IPsecIPv4Params()
166         self.ipv6_params = IPsecIPv6Params()
167         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
168                        self.ipv6_params.addr_type: self.ipv6_params}
169
170     def config_interfaces(self):
171         self.create_pg_interfaces(range(3))
172         self.interfaces = list(self.pg_interfaces)
173         for i in self.interfaces:
174             i.admin_up()
175             i.config_ip4()
176             i.resolve_arp()
177             i.config_ip6()
178             i.resolve_ndp()
179
180     def setUp(self):
181         super(TemplateIpsec, self).setUp()
182
183         self.setup_params()
184
185         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
186                                  IPSEC_API_PROTO_ESP)
187         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
188                                 IPSEC_API_PROTO_AH)
189
190         self.config_interfaces()
191         self.ipsec_select_backend()
192
193     def unconfig_interfaces(self):
194         for i in self.interfaces:
195             i.admin_down()
196             i.unconfig_ip4()
197             i.unconfig_ip6()
198
199     def tearDown(self):
200         super(TemplateIpsec, self).tearDown()
201
202         self.unconfig_interfaces()
203
204         if not self.vpp_dead:
205             self.vapi.cli("show hardware")
206
207     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
208                          payload_size=54):
209         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
210                 sa.encrypt(IP(src=src, dst=dst) /
211                            ICMP() / Raw('X' * payload_size))
212                 for i in range(count)]
213
214     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
215                           payload_size=54):
216         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
217                 sa.encrypt(IPv6(src=src, dst=dst) /
218                            ICMPv6EchoRequest(id=0, seq=1,
219                                              data='X' * payload_size))
220                 for i in range(count)]
221
222     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
223         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
224                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
225                 for i in range(count)]
226
227     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
228         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
229                 IPv6(src=src, dst=dst) /
230                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
231                 for i in range(count)]
232
233
234 class IpsecTcp(object):
235     def verify_tcp_checksum(self):
236         self.vapi.cli("test http server")
237         p = self.params[socket.AF_INET]
238         config_tun_params(p, self.encryption_type, self.tun_if)
239         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
240                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
241                                           dst=self.tun_if.local_ip4) /
242                                        TCP(flags='S', dport=80)))
243         self.logger.debug(ppp("Sending packet:", send))
244         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
245         recv = recv[0]
246         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
247         self.assert_packet_checksums_valid(decrypted)
248
249
250 class IpsecTcpTests(IpsecTcp):
251     def test_tcp_checksum(self):
252         """ verify checksum correctness for vpp generated packets """
253         self.verify_tcp_checksum()
254
255
256 class IpsecTra4(object):
257     """ verify methods for Transport v4 """
258     def verify_tra_anti_replay(self, count=1):
259         p = self.params[socket.AF_INET]
260         use_esn = p.vpp_tra_sa.use_esn
261
262         # fire in a packet with seq number 1
263         pkt = (Ether(src=self.tra_if.remote_mac,
264                      dst=self.tra_if.local_mac) /
265                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
266                                          dst=self.tra_if.local_ip4) /
267                                       ICMP(),
268                                       seq_num=1))
269         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
270
271         # now move the window over to 235
272         pkt = (Ether(src=self.tra_if.remote_mac,
273                      dst=self.tra_if.local_mac) /
274                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
275                                          dst=self.tra_if.local_ip4) /
276                                       ICMP(),
277                                       seq_num=235))
278         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
279
280         # replayed packets are dropped
281         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
282         self.assert_packet_counter_equal(
283             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
284
285         # the window size is 64 packets
286         # in window are still accepted
287         pkt = (Ether(src=self.tra_if.remote_mac,
288                      dst=self.tra_if.local_mac) /
289                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
290                                          dst=self.tra_if.local_ip4) /
291                                       ICMP(),
292                                       seq_num=172))
293         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
294
295         # a packet that does not decrypt does not move the window forward
296         bogus_sa = SecurityAssociation(self.encryption_type,
297                                        p.vpp_tra_spi)
298         pkt = (Ether(src=self.tra_if.remote_mac,
299                      dst=self.tra_if.local_mac) /
300                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
301                                    dst=self.tra_if.local_ip4) /
302                                 ICMP(),
303                                 seq_num=350))
304         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
305
306         self.assert_packet_counter_equal(
307             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
308
309         # which we can determine since this packet is still in the window
310         pkt = (Ether(src=self.tra_if.remote_mac,
311                      dst=self.tra_if.local_mac) /
312                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
313                                          dst=self.tra_if.local_ip4) /
314                                       ICMP(),
315                                       seq_num=234))
316         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
317
318         # out of window are dropped
319         pkt = (Ether(src=self.tra_if.remote_mac,
320                      dst=self.tra_if.local_mac) /
321                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
322                                          dst=self.tra_if.local_ip4) /
323                                       ICMP(),
324                                       seq_num=17))
325         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
326
327         if use_esn:
328             # an out of window error with ESN looks like a high sequence
329             # wrap. but since it isn't then the verify will fail.
330             self.assert_packet_counter_equal(
331                 '/err/%s/Integrity check failed' %
332                 self.tra4_decrypt_node_name, 34)
333
334         else:
335             self.assert_packet_counter_equal(
336                 '/err/%s/SA replayed packet' %
337                 self.tra4_decrypt_node_name, 20)
338
339         # valid packet moves the window over to 236
340         pkt = (Ether(src=self.tra_if.remote_mac,
341                      dst=self.tra_if.local_mac) /
342                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
343                                          dst=self.tra_if.local_ip4) /
344                                       ICMP(),
345                                       seq_num=236))
346         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
347         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
348
349         # move VPP's SA to just before the seq-number wrap
350         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
351
352         # then fire in a packet that VPP should drop because it causes the
353         # seq number to wrap  unless we're using extended.
354         pkt = (Ether(src=self.tra_if.remote_mac,
355                      dst=self.tra_if.local_mac) /
356                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
357                                          dst=self.tra_if.local_ip4) /
358                                       ICMP(),
359                                       seq_num=237))
360
361         if use_esn:
362             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
363             # in order to decrpyt the high order number needs to wrap
364             p.vpp_tra_sa.seq_num = 0x100000000
365             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
366
367             # send packets with high bits set
368             p.scapy_tra_sa.seq_num = 0x100000005
369             pkt = (Ether(src=self.tra_if.remote_mac,
370                          dst=self.tra_if.local_mac) /
371                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
372                                              dst=self.tra_if.local_ip4) /
373                                           ICMP(),
374                                           seq_num=0x100000005))
375             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
376             # in order to decrpyt the high order number needs to wrap
377             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
378         else:
379             self.send_and_assert_no_replies(self.tra_if, [pkt])
380             self.assert_packet_counter_equal(
381                 '/err/%s/sequence number cycled' %
382                 self.tra4_encrypt_node_name, 1)
383
384         # move the security-associations seq number on to the last we used
385         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
386         p.scapy_tra_sa.seq_num = 351
387         p.vpp_tra_sa.seq_num = 351
388
389     def verify_tra_basic4(self, count=1):
390         """ ipsec v4 transport basic test """
391         self.vapi.cli("clear errors")
392         try:
393             p = self.params[socket.AF_INET]
394             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
395                                               src=self.tra_if.remote_ip4,
396                                               dst=self.tra_if.local_ip4,
397                                               count=count)
398             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
399                                              self.tra_if)
400             for rx in recv_pkts:
401                 try:
402                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
403                     self.assert_packet_checksums_valid(decrypted)
404                 except:
405                     self.logger.debug(ppp("Unexpected packet:", rx))
406                     raise
407         finally:
408             self.logger.info(self.vapi.ppcli("show error"))
409             self.logger.info(self.vapi.ppcli("show ipsec"))
410
411         pkts = p.tra_sa_in.get_stats()['packets']
412         self.assertEqual(pkts, count,
413                          "incorrect SA in counts: expected %d != %d" %
414                          (count, pkts))
415         pkts = p.tra_sa_out.get_stats()['packets']
416         self.assertEqual(pkts, count,
417                          "incorrect SA out counts: expected %d != %d" %
418                          (count, pkts))
419
420         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
421         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
422
423
424 class IpsecTra4Tests(IpsecTra4):
425     """ UT test methods for Transport v4 """
426     def test_tra_anti_replay(self):
427         """ ipsec v4 transport anti-reply test """
428         self.verify_tra_anti_replay(count=1)
429
430     def test_tra_basic(self, count=1):
431         """ ipsec v4 transport basic test """
432         self.verify_tra_basic4(count=1)
433
434     def test_tra_burst(self):
435         """ ipsec v4 transport burst test """
436         self.verify_tra_basic4(count=257)
437
438
439 class IpsecTra6(object):
440     """ verify methods for Transport v6 """
441     def verify_tra_basic6(self, count=1):
442         self.vapi.cli("clear errors")
443         try:
444             p = self.params[socket.AF_INET6]
445             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
446                                                src=self.tra_if.remote_ip6,
447                                                dst=self.tra_if.local_ip6,
448                                                count=count)
449             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
450                                              self.tra_if)
451             for rx in recv_pkts:
452                 try:
453                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
454                     self.assert_packet_checksums_valid(decrypted)
455                 except:
456                     self.logger.debug(ppp("Unexpected packet:", rx))
457                     raise
458         finally:
459             self.logger.info(self.vapi.ppcli("show error"))
460             self.logger.info(self.vapi.ppcli("show ipsec"))
461
462         pkts = p.tra_sa_in.get_stats()['packets']
463         self.assertEqual(pkts, count,
464                          "incorrect SA in counts: expected %d != %d" %
465                          (count, pkts))
466         pkts = p.tra_sa_out.get_stats()['packets']
467         self.assertEqual(pkts, count,
468                          "incorrect SA out counts: expected %d != %d" %
469                          (count, pkts))
470         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
471         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
472
473
474 class IpsecTra6Tests(IpsecTra6):
475     """ UT test methods for Transport v6 """
476     def test_tra_basic6(self):
477         """ ipsec v6 transport basic test """
478         self.verify_tra_basic6(count=1)
479
480     def test_tra_burst6(self):
481         """ ipsec v6 transport burst test """
482         self.verify_tra_basic6(count=257)
483
484
485 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
486     """ UT test methods for Transport v6 and v4"""
487     pass
488
489
490 class IpsecTun4(object):
491     """ verify methods for Tunnel v4 """
492     def verify_counters(self, p, count):
493         if (hasattr(p, "spd_policy_in_any")):
494             pkts = p.spd_policy_in_any.get_stats()['packets']
495             self.assertEqual(pkts, count,
496                              "incorrect SPD any policy: expected %d != %d" %
497                              (count, pkts))
498
499         if (hasattr(p, "tun_sa_in")):
500             pkts = p.tun_sa_in.get_stats()['packets']
501             self.assertEqual(pkts, count,
502                              "incorrect SA in counts: expected %d != %d" %
503                              (count, pkts))
504             pkts = p.tun_sa_out.get_stats()['packets']
505             self.assertEqual(pkts, count,
506                              "incorrect SA out counts: expected %d != %d" %
507                              (count, pkts))
508
509         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
510         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
511
512     def verify_decrypted(self, p, rxs):
513         for rx in rxs:
514             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
515             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
516             self.assert_packet_checksums_valid(rx)
517
518     def verify_encrypted(self, p, sa, rxs):
519         decrypt_pkts = []
520         for rx in rxs:
521             try:
522                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
523                 if not decrypt_pkt.haslayer(IP):
524                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
525                 decrypt_pkts.append(decrypt_pkt)
526                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
527                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
528             except:
529                 self.logger.debug(ppp("Unexpected packet:", rx))
530                 try:
531                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
532                 except:
533                     pass
534                 raise
535         pkts = reassemble4(decrypt_pkts)
536         for pkt in pkts:
537             self.assert_packet_checksums_valid(pkt)
538
539     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
540         self.vapi.cli("clear errors")
541         if not n_rx:
542             n_rx = count
543         try:
544             config_tun_params(p, self.encryption_type, self.tun_if)
545             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
546                                               src=p.remote_tun_if_host,
547                                               dst=self.pg1.remote_ip4,
548                                               count=count)
549             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
550             self.verify_decrypted(p, recv_pkts)
551
552             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
553                                       dst=p.remote_tun_if_host, count=count,
554                                       payload_size=payload_size)
555             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
556                                              self.tun_if, n_rx)
557             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
558
559         finally:
560             self.logger.info(self.vapi.ppcli("show error"))
561             self.logger.info(self.vapi.ppcli("show ipsec"))
562
563         self.verify_counters(p, count)
564
565     def verify_tun_64(self, p, count=1):
566         self.vapi.cli("clear errors")
567         try:
568             config_tun_params(p, self.encryption_type, self.tun_if)
569             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
570                                                src=p.remote_tun_if_host6,
571                                                dst=self.pg1.remote_ip6,
572                                                count=count)
573             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
574             for recv_pkt in recv_pkts:
575                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
576                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
577                 self.assert_packet_checksums_valid(recv_pkt)
578             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
579                                        dst=p.remote_tun_if_host6, count=count)
580             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
581             for recv_pkt in recv_pkts:
582                 try:
583                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
584                     if not decrypt_pkt.haslayer(IPv6):
585                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
586                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
587                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
588                     self.assert_packet_checksums_valid(decrypt_pkt)
589                 except:
590                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
591                     try:
592                         self.logger.debug(
593                             ppp("Decrypted packet:", decrypt_pkt))
594                     except:
595                         pass
596                     raise
597         finally:
598             self.logger.info(self.vapi.ppcli("show error"))
599             self.logger.info(self.vapi.ppcli("show ipsec"))
600
601         self.verify_counters(p, count)
602
603
604 class IpsecTun4Tests(IpsecTun4):
605     """ UT test methods for Tunnel v4 """
606     def test_tun_basic44(self):
607         """ ipsec 4o4 tunnel basic test """
608         self.verify_tun_44(self.params[socket.AF_INET], count=1)
609
610     def test_tun_burst44(self):
611         """ ipsec 4o4 tunnel burst test """
612         self.verify_tun_44(self.params[socket.AF_INET], count=257)
613
614
615 class IpsecTun6(object):
616     """ verify methods for Tunnel v6 """
617     def verify_counters(self, p, count):
618         if (hasattr(p, "tun_sa_in")):
619             pkts = p.tun_sa_in.get_stats()['packets']
620             self.assertEqual(pkts, count,
621                              "incorrect SA in counts: expected %d != %d" %
622                              (count, pkts))
623             pkts = p.tun_sa_out.get_stats()['packets']
624             self.assertEqual(pkts, count,
625                              "incorrect SA out counts: expected %d != %d" %
626                              (count, pkts))
627         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
628         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
629
630     def verify_tun_66(self, p, count=1):
631         """ ipsec 6o6 tunnel basic test """
632         self.vapi.cli("clear errors")
633         try:
634             config_tun_params(p, self.encryption_type, self.tun_if)
635             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
636                                                src=p.remote_tun_if_host,
637                                                dst=self.pg1.remote_ip6,
638                                                count=count)
639             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
640             for recv_pkt in recv_pkts:
641                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
642                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
643                 self.assert_packet_checksums_valid(recv_pkt)
644             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
645                                        dst=p.remote_tun_if_host,
646                                        count=count)
647             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
648             for recv_pkt in recv_pkts:
649                 try:
650                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
651                     if not decrypt_pkt.haslayer(IPv6):
652                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
653                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
654                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
655                     self.assert_packet_checksums_valid(decrypt_pkt)
656                 except:
657                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
658                     try:
659                         self.logger.debug(
660                             ppp("Decrypted packet:", decrypt_pkt))
661                     except:
662                         pass
663                     raise
664         finally:
665             self.logger.info(self.vapi.ppcli("show error"))
666             self.logger.info(self.vapi.ppcli("show ipsec"))
667         self.verify_counters(p, count)
668
669     def verify_tun_46(self, p, count=1):
670         """ ipsec 4o6 tunnel basic test """
671         self.vapi.cli("clear errors")
672         try:
673             config_tun_params(p, self.encryption_type, self.tun_if)
674             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
675                                               src=p.remote_tun_if_host4,
676                                               dst=self.pg1.remote_ip4,
677                                               count=count)
678             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
679             for recv_pkt in recv_pkts:
680                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
681                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
682                 self.assert_packet_checksums_valid(recv_pkt)
683             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
684                                       dst=p.remote_tun_if_host4,
685                                       count=count)
686             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
687             for recv_pkt in recv_pkts:
688                 try:
689                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
690                     if not decrypt_pkt.haslayer(IP):
691                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
692                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
693                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
694                     self.assert_packet_checksums_valid(decrypt_pkt)
695                 except:
696                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
697                     try:
698                         self.logger.debug(ppp("Decrypted packet:",
699                                               decrypt_pkt))
700                     except:
701                         pass
702                     raise
703         finally:
704             self.logger.info(self.vapi.ppcli("show error"))
705             self.logger.info(self.vapi.ppcli("show ipsec"))
706         self.verify_counters(p, count)
707
708
709 class IpsecTun6Tests(IpsecTun6):
710     """ UT test methods for Tunnel v6 """
711
712     def test_tun_basic66(self):
713         """ ipsec 6o6 tunnel basic test """
714         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
715
716     def test_tun_burst66(self):
717         """ ipsec 6o6 tunnel burst test """
718         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
719
720
721 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
722     """ UT test methods for Tunnel v6 & v4 """
723     pass
724
725
726 if __name__ == '__main__':
727     unittest.main(testRunner=VppTestRunner)