IPSEC: support GCM in ESP
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp, reassemble4
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.crypt_salt = ''
46         self.flags = 0
47         self.nat_header = None
48
49
50 class IPsecIPv6Params(object):
51
52     addr_type = socket.AF_INET6
53     addr_any = "0::0"
54     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
55     addr_len = 128
56     is_ipv6 = 1
57
58     def __init__(self):
59         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
60         self.remote_tun_if_host4 = '1.1.1.1'
61
62         self.scapy_tun_sa_id = 50
63         self.scapy_tun_spi = 3001
64         self.vpp_tun_sa_id = 60
65         self.vpp_tun_spi = 3000
66
67         self.scapy_tra_sa_id = 70
68         self.scapy_tra_spi = 4001
69         self.vpp_tra_sa_id = 80
70         self.vpp_tra_spi = 4000
71
72         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
73                                  IPSEC_API_INTEG_ALG_SHA1_96)
74         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
75         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
76
77         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
78                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
79         self.crypt_algo = 'AES-CBC'  # scapy name
80         self.crypt_key = 'JPjyOWBeVEQiMe7h'
81         self.crypt_salt = ''
82         self.flags = 0
83         self.nat_header = None
84
85
86 def config_tun_params(p, encryption_type, tun_if):
87     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
88     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
89                               IPSEC_API_SAD_FLAG_USE_ESN))
90     p.scapy_tun_sa = SecurityAssociation(
91         encryption_type, spi=p.vpp_tun_spi,
92         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key + p.crypt_salt,
93         auth_algo=p.auth_algo, auth_key=p.auth_key,
94         tunnel_header=ip_class_by_addr_type[p.addr_type](
95             src=tun_if.remote_addr[p.addr_type],
96             dst=tun_if.local_addr[p.addr_type]),
97         nat_t_header=p.nat_header,
98         use_esn=use_esn)
99     p.vpp_tun_sa = SecurityAssociation(
100         encryption_type, spi=p.scapy_tun_spi,
101         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key + p.crypt_salt,
102         auth_algo=p.auth_algo, auth_key=p.auth_key,
103         tunnel_header=ip_class_by_addr_type[p.addr_type](
104             dst=tun_if.remote_addr[p.addr_type],
105             src=tun_if.local_addr[p.addr_type]),
106         nat_t_header=p.nat_header,
107         use_esn=use_esn)
108
109
110 def config_tra_params(p, encryption_type):
111     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
112                               IPSEC_API_SAD_FLAG_USE_ESN))
113     p.scapy_tra_sa = SecurityAssociation(
114         encryption_type,
115         spi=p.vpp_tra_spi,
116         crypt_algo=p.crypt_algo,
117         crypt_key=p.crypt_key + p.crypt_salt,
118         auth_algo=p.auth_algo,
119         auth_key=p.auth_key,
120         nat_t_header=p.nat_header,
121         use_esn=use_esn)
122     p.vpp_tra_sa = SecurityAssociation(
123         encryption_type,
124         spi=p.scapy_tra_spi,
125         crypt_algo=p.crypt_algo,
126         crypt_key=p.crypt_key + p.crypt_salt,
127         auth_algo=p.auth_algo,
128         auth_key=p.auth_key,
129         nat_t_header=p.nat_header,
130         use_esn=use_esn)
131
132
133 class TemplateIpsec(VppTestCase):
134     """
135     TRANSPORT MODE:
136
137      ------   encrypt   ---
138     |tra_if| <-------> |VPP|
139      ------   decrypt   ---
140
141     TUNNEL MODE:
142
143      ------   encrypt   ---   plain   ---
144     |tun_if| <-------  |VPP| <------ |pg1|
145      ------             ---           ---
146
147      ------   decrypt   ---   plain   ---
148     |tun_if| ------->  |VPP| ------> |pg1|
149      ------             ---           ---
150     """
151     tun_spd_id = 1
152     tra_spd_id = 2
153
154     def ipsec_select_backend(self):
155         """ empty method to be overloaded when necessary """
156         pass
157
158     @classmethod
159     def setUpClass(cls):
160         super(TemplateIpsec, cls).setUpClass()
161
162     @classmethod
163     def tearDownClass(cls):
164         super(TemplateIpsec, cls).tearDownClass()
165
166     def setup_params(self):
167         self.ipv4_params = IPsecIPv4Params()
168         self.ipv6_params = IPsecIPv6Params()
169         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
170                        self.ipv6_params.addr_type: self.ipv6_params}
171
172     def config_interfaces(self):
173         self.create_pg_interfaces(range(3))
174         self.interfaces = list(self.pg_interfaces)
175         for i in self.interfaces:
176             i.admin_up()
177             i.config_ip4()
178             i.resolve_arp()
179             i.config_ip6()
180             i.resolve_ndp()
181
182     def setUp(self):
183         super(TemplateIpsec, self).setUp()
184
185         self.setup_params()
186
187         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
188                                  IPSEC_API_PROTO_ESP)
189         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
190                                 IPSEC_API_PROTO_AH)
191
192         self.config_interfaces()
193
194         self.ipsec_select_backend()
195
196     def unconfig_interfaces(self):
197         for i in self.interfaces:
198             i.admin_down()
199             i.unconfig_ip4()
200             i.unconfig_ip6()
201
202     def tearDown(self):
203         super(TemplateIpsec, self).tearDown()
204
205         self.unconfig_interfaces()
206
207     def show_commands_at_teardown(self):
208         self.logger.info(self.vapi.cli("show hardware"))
209
210     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
211                          payload_size=54):
212         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
213                 sa.encrypt(IP(src=src, dst=dst) /
214                            ICMP() / Raw('X' * payload_size))
215                 for i in range(count)]
216
217     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
218                           payload_size=54):
219         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
220                 sa.encrypt(IPv6(src=src, dst=dst) /
221                            ICMPv6EchoRequest(id=0, seq=1,
222                                              data='X' * payload_size))
223                 for i in range(count)]
224
225     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
226         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
227                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
228                 for i in range(count)]
229
230     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
231         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
232                 IPv6(src=src, dst=dst) /
233                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
234                 for i in range(count)]
235
236
237 class IpsecTcp(object):
238     def verify_tcp_checksum(self):
239         self.vapi.cli("test http server")
240         p = self.params[socket.AF_INET]
241         config_tun_params(p, self.encryption_type, self.tun_if)
242         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
243                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
244                                           dst=self.tun_if.local_ip4) /
245                                        TCP(flags='S', dport=80)))
246         self.logger.debug(ppp("Sending packet:", send))
247         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
248         recv = recv[0]
249         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
250         self.assert_packet_checksums_valid(decrypted)
251
252
253 class IpsecTcpTests(IpsecTcp):
254     def test_tcp_checksum(self):
255         """ verify checksum correctness for vpp generated packets """
256         self.verify_tcp_checksum()
257
258
259 class IpsecTra4(object):
260     """ verify methods for Transport v4 """
261     def verify_tra_anti_replay(self, count=1):
262         p = self.params[socket.AF_INET]
263         use_esn = p.vpp_tra_sa.use_esn
264
265         # fire in a packet with seq number 1
266         pkt = (Ether(src=self.tra_if.remote_mac,
267                      dst=self.tra_if.local_mac) /
268                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
269                                          dst=self.tra_if.local_ip4) /
270                                       ICMP(),
271                                       seq_num=1))
272         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
273
274         # now move the window over to 235
275         pkt = (Ether(src=self.tra_if.remote_mac,
276                      dst=self.tra_if.local_mac) /
277                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
278                                          dst=self.tra_if.local_ip4) /
279                                       ICMP(),
280                                       seq_num=235))
281         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
282
283         # replayed packets are dropped
284         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
285         self.assert_packet_counter_equal(
286             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
287
288         # the window size is 64 packets
289         # in window are still accepted
290         pkt = (Ether(src=self.tra_if.remote_mac,
291                      dst=self.tra_if.local_mac) /
292                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
293                                          dst=self.tra_if.local_ip4) /
294                                       ICMP(),
295                                       seq_num=172))
296         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
297
298         # a packet that does not decrypt does not move the window forward
299         bogus_sa = SecurityAssociation(self.encryption_type,
300                                        p.vpp_tra_spi)
301         pkt = (Ether(src=self.tra_if.remote_mac,
302                      dst=self.tra_if.local_mac) /
303                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
304                                    dst=self.tra_if.local_ip4) /
305                                 ICMP(),
306                                 seq_num=350))
307         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
308
309         self.assert_packet_counter_equal(
310             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
311
312         # which we can determine since this packet is still in the window
313         pkt = (Ether(src=self.tra_if.remote_mac,
314                      dst=self.tra_if.local_mac) /
315                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
316                                          dst=self.tra_if.local_ip4) /
317                                       ICMP(),
318                                       seq_num=234))
319         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
320
321         # out of window are dropped
322         pkt = (Ether(src=self.tra_if.remote_mac,
323                      dst=self.tra_if.local_mac) /
324                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
325                                          dst=self.tra_if.local_ip4) /
326                                       ICMP(),
327                                       seq_num=17))
328         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
329
330         if use_esn:
331             # an out of window error with ESN looks like a high sequence
332             # wrap. but since it isn't then the verify will fail.
333             self.assert_packet_counter_equal(
334                 '/err/%s/Integrity check failed' %
335                 self.tra4_decrypt_node_name, 34)
336
337         else:
338             self.assert_packet_counter_equal(
339                 '/err/%s/SA replayed packet' %
340                 self.tra4_decrypt_node_name, 20)
341
342         # valid packet moves the window over to 236
343         pkt = (Ether(src=self.tra_if.remote_mac,
344                      dst=self.tra_if.local_mac) /
345                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
346                                          dst=self.tra_if.local_ip4) /
347                                       ICMP(),
348                                       seq_num=236))
349         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
350         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
351
352         # move VPP's SA to just before the seq-number wrap
353         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
354
355         # then fire in a packet that VPP should drop because it causes the
356         # seq number to wrap  unless we're using extended.
357         pkt = (Ether(src=self.tra_if.remote_mac,
358                      dst=self.tra_if.local_mac) /
359                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
360                                          dst=self.tra_if.local_ip4) /
361                                       ICMP(),
362                                       seq_num=237))
363
364         if use_esn:
365             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
366             # in order to decrpyt the high order number needs to wrap
367             p.vpp_tra_sa.seq_num = 0x100000000
368             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
369
370             # send packets with high bits set
371             p.scapy_tra_sa.seq_num = 0x100000005
372             pkt = (Ether(src=self.tra_if.remote_mac,
373                          dst=self.tra_if.local_mac) /
374                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
375                                              dst=self.tra_if.local_ip4) /
376                                           ICMP(),
377                                           seq_num=0x100000005))
378             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
379             # in order to decrpyt the high order number needs to wrap
380             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
381         else:
382             self.send_and_assert_no_replies(self.tra_if, [pkt])
383             self.assert_packet_counter_equal(
384                 '/err/%s/sequence number cycled' %
385                 self.tra4_encrypt_node_name, 1)
386
387         # move the security-associations seq number on to the last we used
388         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
389         p.scapy_tra_sa.seq_num = 351
390         p.vpp_tra_sa.seq_num = 351
391
392     def verify_tra_basic4(self, count=1):
393         """ ipsec v4 transport basic test """
394         self.vapi.cli("clear errors")
395         try:
396             p = self.params[socket.AF_INET]
397             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
398                                               src=self.tra_if.remote_ip4,
399                                               dst=self.tra_if.local_ip4,
400                                               count=count)
401             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
402                                              self.tra_if)
403             for rx in recv_pkts:
404                 try:
405                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
406                     self.assert_packet_checksums_valid(decrypted)
407                 except:
408                     self.logger.debug(ppp("Unexpected packet:", rx))
409                     raise
410         finally:
411             self.logger.info(self.vapi.ppcli("show error"))
412             self.logger.info(self.vapi.ppcli("show ipsec"))
413
414         pkts = p.tra_sa_in.get_stats()['packets']
415         self.assertEqual(pkts, count,
416                          "incorrect SA in counts: expected %d != %d" %
417                          (count, pkts))
418         pkts = p.tra_sa_out.get_stats()['packets']
419         self.assertEqual(pkts, count,
420                          "incorrect SA out counts: expected %d != %d" %
421                          (count, pkts))
422
423         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
424         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
425
426
427 class IpsecTra4Tests(IpsecTra4):
428     """ UT test methods for Transport v4 """
429     def test_tra_anti_replay(self):
430         """ ipsec v4 transport anti-reply test """
431         self.verify_tra_anti_replay(count=1)
432
433     def test_tra_basic(self, count=1):
434         """ ipsec v4 transport basic test """
435         self.verify_tra_basic4(count=1)
436
437     def test_tra_burst(self):
438         """ ipsec v4 transport burst test """
439         self.verify_tra_basic4(count=257)
440
441
442 class IpsecTra6(object):
443     """ verify methods for Transport v6 """
444     def verify_tra_basic6(self, count=1):
445         self.vapi.cli("clear errors")
446         try:
447             p = self.params[socket.AF_INET6]
448             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
449                                                src=self.tra_if.remote_ip6,
450                                                dst=self.tra_if.local_ip6,
451                                                count=count)
452             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
453                                              self.tra_if)
454             for rx in recv_pkts:
455                 try:
456                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
457                     self.assert_packet_checksums_valid(decrypted)
458                 except:
459                     self.logger.debug(ppp("Unexpected packet:", rx))
460                     raise
461         finally:
462             self.logger.info(self.vapi.ppcli("show error"))
463             self.logger.info(self.vapi.ppcli("show ipsec"))
464
465         pkts = p.tra_sa_in.get_stats()['packets']
466         self.assertEqual(pkts, count,
467                          "incorrect SA in counts: expected %d != %d" %
468                          (count, pkts))
469         pkts = p.tra_sa_out.get_stats()['packets']
470         self.assertEqual(pkts, count,
471                          "incorrect SA out counts: expected %d != %d" %
472                          (count, pkts))
473         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
474         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
475
476
477 class IpsecTra6Tests(IpsecTra6):
478     """ UT test methods for Transport v6 """
479     def test_tra_basic6(self):
480         """ ipsec v6 transport basic test """
481         self.verify_tra_basic6(count=1)
482
483     def test_tra_burst6(self):
484         """ ipsec v6 transport burst test """
485         self.verify_tra_basic6(count=257)
486
487
488 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
489     """ UT test methods for Transport v6 and v4"""
490     pass
491
492
493 class IpsecTun4(object):
494     """ verify methods for Tunnel v4 """
495     def verify_counters(self, p, count):
496         if (hasattr(p, "spd_policy_in_any")):
497             pkts = p.spd_policy_in_any.get_stats()['packets']
498             self.assertEqual(pkts, count,
499                              "incorrect SPD any policy: expected %d != %d" %
500                              (count, pkts))
501
502         if (hasattr(p, "tun_sa_in")):
503             pkts = p.tun_sa_in.get_stats()['packets']
504             self.assertEqual(pkts, count,
505                              "incorrect SA in counts: expected %d != %d" %
506                              (count, pkts))
507             pkts = p.tun_sa_out.get_stats()['packets']
508             self.assertEqual(pkts, count,
509                              "incorrect SA out counts: expected %d != %d" %
510                              (count, pkts))
511
512         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
513         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
514
515     def verify_decrypted(self, p, rxs):
516         for rx in rxs:
517             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
518             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
519             self.assert_packet_checksums_valid(rx)
520
521     def verify_encrypted(self, p, sa, rxs):
522         decrypt_pkts = []
523         for rx in rxs:
524             try:
525                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
526                 if not decrypt_pkt.haslayer(IP):
527                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
528                 decrypt_pkts.append(decrypt_pkt)
529                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
530                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
531             except:
532                 self.logger.debug(ppp("Unexpected packet:", rx))
533                 try:
534                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
535                 except:
536                     pass
537                 raise
538         pkts = reassemble4(decrypt_pkts)
539         for pkt in pkts:
540             self.assert_packet_checksums_valid(pkt)
541
542     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
543         self.vapi.cli("clear errors")
544         if not n_rx:
545             n_rx = count
546         try:
547             config_tun_params(p, self.encryption_type, self.tun_if)
548             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
549                                               src=p.remote_tun_if_host,
550                                               dst=self.pg1.remote_ip4,
551                                               count=count)
552             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
553             self.verify_decrypted(p, recv_pkts)
554
555             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
556                                       dst=p.remote_tun_if_host, count=count,
557                                       payload_size=payload_size)
558             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
559                                              self.tun_if, n_rx)
560             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
561
562         finally:
563             self.logger.info(self.vapi.ppcli("show error"))
564             self.logger.info(self.vapi.ppcli("show ipsec"))
565
566         self.verify_counters(p, count)
567
568     def verify_tun_64(self, p, count=1):
569         self.vapi.cli("clear errors")
570         try:
571             config_tun_params(p, self.encryption_type, self.tun_if)
572             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
573                                                src=p.remote_tun_if_host6,
574                                                dst=self.pg1.remote_ip6,
575                                                count=count)
576             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
577             for recv_pkt in recv_pkts:
578                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
579                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
580                 self.assert_packet_checksums_valid(recv_pkt)
581             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
582                                        dst=p.remote_tun_if_host6, count=count)
583             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
584             for recv_pkt in recv_pkts:
585                 try:
586                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
587                     if not decrypt_pkt.haslayer(IPv6):
588                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
589                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
590                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
591                     self.assert_packet_checksums_valid(decrypt_pkt)
592                 except:
593                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
594                     try:
595                         self.logger.debug(
596                             ppp("Decrypted packet:", decrypt_pkt))
597                     except:
598                         pass
599                     raise
600         finally:
601             self.logger.info(self.vapi.ppcli("show error"))
602             self.logger.info(self.vapi.ppcli("show ipsec"))
603
604         self.verify_counters(p, count)
605
606
607 class IpsecTun4Tests(IpsecTun4):
608     """ UT test methods for Tunnel v4 """
609     def test_tun_basic44(self):
610         """ ipsec 4o4 tunnel basic test """
611         self.verify_tun_44(self.params[socket.AF_INET], count=1)
612
613     def test_tun_burst44(self):
614         """ ipsec 4o4 tunnel burst test """
615         self.verify_tun_44(self.params[socket.AF_INET], count=257)
616
617
618 class IpsecTun6(object):
619     """ verify methods for Tunnel v6 """
620     def verify_counters(self, p, count):
621         if (hasattr(p, "tun_sa_in")):
622             pkts = p.tun_sa_in.get_stats()['packets']
623             self.assertEqual(pkts, count,
624                              "incorrect SA in counts: expected %d != %d" %
625                              (count, pkts))
626             pkts = p.tun_sa_out.get_stats()['packets']
627             self.assertEqual(pkts, count,
628                              "incorrect SA out counts: expected %d != %d" %
629                              (count, pkts))
630         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
631         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
632
633     def verify_tun_66(self, p, count=1):
634         """ ipsec 6o6 tunnel basic test """
635         self.vapi.cli("clear errors")
636         try:
637             config_tun_params(p, self.encryption_type, self.tun_if)
638             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
639                                                src=p.remote_tun_if_host,
640                                                dst=self.pg1.remote_ip6,
641                                                count=count)
642             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
643             for recv_pkt in recv_pkts:
644                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
645                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
646                 self.assert_packet_checksums_valid(recv_pkt)
647             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
648                                        dst=p.remote_tun_if_host,
649                                        count=count)
650             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
651             for recv_pkt in recv_pkts:
652                 try:
653                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
654                     if not decrypt_pkt.haslayer(IPv6):
655                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
656                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
657                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
658                     self.assert_packet_checksums_valid(decrypt_pkt)
659                 except:
660                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
661                     try:
662                         self.logger.debug(
663                             ppp("Decrypted packet:", decrypt_pkt))
664                     except:
665                         pass
666                     raise
667         finally:
668             self.logger.info(self.vapi.ppcli("show error"))
669             self.logger.info(self.vapi.ppcli("show ipsec"))
670         self.verify_counters(p, count)
671
672     def verify_tun_46(self, p, count=1):
673         """ ipsec 4o6 tunnel basic test """
674         self.vapi.cli("clear errors")
675         try:
676             config_tun_params(p, self.encryption_type, self.tun_if)
677             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
678                                               src=p.remote_tun_if_host4,
679                                               dst=self.pg1.remote_ip4,
680                                               count=count)
681             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
682             for recv_pkt in recv_pkts:
683                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
684                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
685                 self.assert_packet_checksums_valid(recv_pkt)
686             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
687                                       dst=p.remote_tun_if_host4,
688                                       count=count)
689             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
690             for recv_pkt in recv_pkts:
691                 try:
692                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
693                     if not decrypt_pkt.haslayer(IP):
694                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
695                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
696                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
697                     self.assert_packet_checksums_valid(decrypt_pkt)
698                 except:
699                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
700                     try:
701                         self.logger.debug(ppp("Decrypted packet:",
702                                               decrypt_pkt))
703                     except:
704                         pass
705                     raise
706         finally:
707             self.logger.info(self.vapi.ppcli("show error"))
708             self.logger.info(self.vapi.ppcli("show ipsec"))
709         self.verify_counters(p, count)
710
711
712 class IpsecTun6Tests(IpsecTun6):
713     """ UT test methods for Tunnel v6 """
714
715     def test_tun_basic66(self):
716         """ ipsec 6o6 tunnel basic test """
717         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
718
719     def test_tun_burst66(self):
720         """ ipsec 6o6 tunnel burst test """
721         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
722
723
724 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
725     """ UT test methods for Tunnel v6 & v4 """
726     pass
727
728
729 if __name__ == '__main__':
730     unittest.main(testRunner=VppTestRunner)