TEST:add l2bd nd term tests
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def compose_ip6(self, ip4, pref, plen):
161         """
162         Compose IPv4-embedded IPv6 addresses
163
164         :param ip4: IPv4 address
165         :param pref: IPv6 prefix
166         :param plen: IPv6 prefix length
167         :returns: IPv4-embedded IPv6 addresses
168         """
169         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
171         if plen == 32:
172             pref_n[4] = ip4_n[0]
173             pref_n[5] = ip4_n[1]
174             pref_n[6] = ip4_n[2]
175             pref_n[7] = ip4_n[3]
176         elif plen == 40:
177             pref_n[5] = ip4_n[0]
178             pref_n[6] = ip4_n[1]
179             pref_n[7] = ip4_n[2]
180             pref_n[9] = ip4_n[3]
181         elif plen == 48:
182             pref_n[6] = ip4_n[0]
183             pref_n[7] = ip4_n[1]
184             pref_n[9] = ip4_n[2]
185             pref_n[10] = ip4_n[3]
186         elif plen == 56:
187             pref_n[7] = ip4_n[0]
188             pref_n[9] = ip4_n[1]
189             pref_n[10] = ip4_n[2]
190             pref_n[11] = ip4_n[3]
191         elif plen == 64:
192             pref_n[9] = ip4_n[0]
193             pref_n[10] = ip4_n[1]
194             pref_n[11] = ip4_n[2]
195             pref_n[12] = ip4_n[3]
196         elif plen == 96:
197             pref_n[12] = ip4_n[0]
198             pref_n[13] = ip4_n[1]
199             pref_n[14] = ip4_n[2]
200             pref_n[15] = ip4_n[3]
201         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
202
203     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
204         """
205         Create IPv6 packet stream for inside network
206
207         :param in_if: Inside interface
208         :param out_if: Outside interface
209         :param ttl: Hop Limit of generated packets
210         :param pref: NAT64 prefix
211         :param plen: NAT64 prefix length
212         """
213         pkts = []
214         if pref is None:
215             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
216         else:
217             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
218
219         # TCP
220         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222              TCP(sport=self.tcp_port_in, dport=20))
223         pkts.append(p)
224
225         # UDP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              UDP(sport=self.udp_port_in, dport=20))
229         pkts.append(p)
230
231         # ICMP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              ICMPv6EchoRequest(id=self.icmp_id_in))
235         pkts.append(p)
236
237         return pkts
238
239     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
240         """
241         Create packet stream for outside network
242
243         :param out_if: Outside interface
244         :param dst_ip: Destination IP address (Default use global SNAT address)
245         :param ttl: TTL of generated packets
246         """
247         if dst_ip is None:
248             dst_ip = self.snat_addr
249         pkts = []
250         # TCP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              TCP(dport=self.tcp_port_out, sport=20))
254         pkts.append(p)
255
256         # UDP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              UDP(dport=self.udp_port_out, sport=20))
260         pkts.append(p)
261
262         # ICMP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              ICMP(id=self.icmp_id_out, type='echo-reply'))
266         pkts.append(p)
267
268         return pkts
269
270     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271                            packet_num=3, dst_ip=None):
272         """
273         Verify captured packets on outside network
274
275         :param capture: Captured packets
276         :param nat_ip: Translated IP address (Default use global SNAT address)
277         :param same_port: Sorce port number is not translated (Default False)
278         :param packet_num: Expected number of packets (Default 3)
279         :param dst_ip: Destination IP address (Default do not verify)
280         """
281         if nat_ip is None:
282             nat_ip = self.snat_addr
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.check_ip_checksum(packet)
287                 self.assertEqual(packet[IP].src, nat_ip)
288                 if dst_ip is not None:
289                     self.assertEqual(packet[IP].dst, dst_ip)
290                 if packet.haslayer(TCP):
291                     if same_port:
292                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
293                     else:
294                         self.assertNotEqual(
295                             packet[TCP].sport, self.tcp_port_in)
296                     self.tcp_port_out = packet[TCP].sport
297                     self.check_tcp_checksum(packet)
298                 elif packet.haslayer(UDP):
299                     if same_port:
300                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
301                     else:
302                         self.assertNotEqual(
303                             packet[UDP].sport, self.udp_port_in)
304                     self.udp_port_out = packet[UDP].sport
305                 else:
306                     if same_port:
307                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
308                     else:
309                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310                     self.icmp_id_out = packet[ICMP].id
311                     self.check_icmp_checksum(packet)
312             except:
313                 self.logger.error(ppp("Unexpected or invalid packet "
314                                       "(outside network):", packet))
315                 raise
316
317     def verify_capture_in(self, capture, in_if, packet_num=3):
318         """
319         Verify captured packets on inside network
320
321         :param capture: Captured packets
322         :param in_if: Inside interface
323         :param packet_num: Expected number of packets (Default 3)
324         """
325         self.assertEqual(packet_num, len(capture))
326         for packet in capture:
327             try:
328                 self.check_ip_checksum(packet)
329                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330                 if packet.haslayer(TCP):
331                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332                     self.check_tcp_checksum(packet)
333                 elif packet.haslayer(UDP):
334                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
335                 else:
336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337                     self.check_icmp_checksum(packet)
338             except:
339                 self.logger.error(ppp("Unexpected or invalid packet "
340                                       "(inside network):", packet))
341                 raise
342
343     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
344         """
345         Verify captured IPv6 packets on inside network
346
347         :param capture: Captured packets
348         :param src_ip: Source IP
349         :param dst_ip: Destination IP address
350         :param packet_num: Expected number of packets (Default 3)
351         """
352         self.assertEqual(packet_num, len(capture))
353         for packet in capture:
354             try:
355                 self.assertEqual(packet[IPv6].src, src_ip)
356                 self.assertEqual(packet[IPv6].dst, dst_ip)
357                 if packet.haslayer(TCP):
358                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359                     self.check_tcp_checksum(packet)
360                 elif packet.haslayer(UDP):
361                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
362                     self.check_udp_checksum(packet)
363                 else:
364                     self.assertEqual(packet[ICMPv6EchoReply].id,
365                                      self.icmp_id_in)
366                     self.check_icmpv6_checksum(packet)
367             except:
368                 self.logger.error(ppp("Unexpected or invalid packet "
369                                       "(inside network):", packet))
370                 raise
371
372     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
373         """
374         Verify captured packet that don't have to be translated
375
376         :param capture: Captured packets
377         :param ingress_if: Ingress interface
378         :param egress_if: Egress interface
379         """
380         for packet in capture:
381             try:
382                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384                 if packet.haslayer(TCP):
385                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386                 elif packet.haslayer(UDP):
387                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
388                 else:
389                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
390             except:
391                 self.logger.error(ppp("Unexpected or invalid packet "
392                                       "(inside network):", packet))
393                 raise
394
395     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396                                             packet_num=3, icmp_type=11):
397         """
398         Verify captured packets with ICMP errors on outside network
399
400         :param capture: Captured packets
401         :param src_ip: Translated IP address or IP address of VPP
402                        (Default use global SNAT address)
403         :param packet_num: Expected number of packets (Default 3)
404         :param icmp_type: Type of error ICMP packet
405                           we are expecting (Default 11)
406         """
407         if src_ip is None:
408             src_ip = self.snat_addr
409         self.assertEqual(packet_num, len(capture))
410         for packet in capture:
411             try:
412                 self.assertEqual(packet[IP].src, src_ip)
413                 self.assertTrue(packet.haslayer(ICMP))
414                 icmp = packet[ICMP]
415                 self.assertEqual(icmp.type, icmp_type)
416                 self.assertTrue(icmp.haslayer(IPerror))
417                 inner_ip = icmp[IPerror]
418                 if inner_ip.haslayer(TCPerror):
419                     self.assertEqual(inner_ip[TCPerror].dport,
420                                      self.tcp_port_out)
421                 elif inner_ip.haslayer(UDPerror):
422                     self.assertEqual(inner_ip[UDPerror].dport,
423                                      self.udp_port_out)
424                 else:
425                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
426             except:
427                 self.logger.error(ppp("Unexpected or invalid packet "
428                                       "(outside network):", packet))
429                 raise
430
431     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
432                                            icmp_type=11):
433         """
434         Verify captured packets with ICMP errors on inside network
435
436         :param capture: Captured packets
437         :param in_if: Inside interface
438         :param packet_num: Expected number of packets (Default 3)
439         :param icmp_type: Type of error ICMP packet
440                           we are expecting (Default 11)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446                 self.assertTrue(packet.haslayer(ICMP))
447                 icmp = packet[ICMP]
448                 self.assertEqual(icmp.type, icmp_type)
449                 self.assertTrue(icmp.haslayer(IPerror))
450                 inner_ip = icmp[IPerror]
451                 if inner_ip.haslayer(TCPerror):
452                     self.assertEqual(inner_ip[TCPerror].sport,
453                                      self.tcp_port_in)
454                 elif inner_ip.haslayer(UDPerror):
455                     self.assertEqual(inner_ip[UDPerror].sport,
456                                      self.udp_port_in)
457                 else:
458                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
459             except:
460                 self.logger.error(ppp("Unexpected or invalid packet "
461                                       "(inside network):", packet))
462                 raise
463
464     def verify_ipfix_nat44_ses(self, data):
465         """
466         Verify IPFIX NAT44 session create/delete event
467
468         :param data: Decoded IPFIX data records
469         """
470         nat44_ses_create_num = 0
471         nat44_ses_delete_num = 0
472         self.assertEqual(6, len(data))
473         for record in data:
474             # natEvent
475             self.assertIn(ord(record[230]), [4, 5])
476             if ord(record[230]) == 4:
477                 nat44_ses_create_num += 1
478             else:
479                 nat44_ses_delete_num += 1
480             # sourceIPv4Address
481             self.assertEqual(self.pg0.remote_ip4n, record[8])
482             # postNATSourceIPv4Address
483             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
484                              record[225])
485             # ingressVRFID
486             self.assertEqual(struct.pack("!I", 0), record[234])
487             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488             if IP_PROTOS.icmp == ord(record[4]):
489                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
491                                  record[227])
492             elif IP_PROTOS.tcp == ord(record[4]):
493                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
494                                  record[7])
495                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
496                                  record[227])
497             elif IP_PROTOS.udp == ord(record[4]):
498                 self.assertEqual(struct.pack("!H", self.udp_port_in),
499                                  record[7])
500                 self.assertEqual(struct.pack("!H", self.udp_port_out),
501                                  record[227])
502             else:
503                 self.fail("Invalid protocol")
504         self.assertEqual(3, nat44_ses_create_num)
505         self.assertEqual(3, nat44_ses_delete_num)
506
507     def verify_ipfix_addr_exhausted(self, data):
508         """
509         Verify IPFIX NAT addresses event
510
511         :param data: Decoded IPFIX data records
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         self.assertEqual(ord(record[230]), 3)
517         # natPoolID
518         self.assertEqual(struct.pack("!I", 0), record[283])
519
520
521 class TestSNAT(MethodHolder):
522     """ SNAT Test Cases """
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestSNAT, cls).setUpClass()
527
528         try:
529             cls.tcp_port_in = 6303
530             cls.tcp_port_out = 6303
531             cls.udp_port_in = 6304
532             cls.udp_port_out = 6304
533             cls.icmp_id_in = 6305
534             cls.icmp_id_out = 6305
535             cls.snat_addr = '10.0.0.3'
536             cls.ipfix_src_port = 4739
537             cls.ipfix_domain_id = 1
538
539             cls.create_pg_interfaces(range(9))
540             cls.interfaces = list(cls.pg_interfaces[0:4])
541
542             for i in cls.interfaces:
543                 i.admin_up()
544                 i.config_ip4()
545                 i.resolve_arp()
546
547             cls.pg0.generate_remote_hosts(3)
548             cls.pg0.configure_ipv4_neighbors()
549
550             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
551
552             cls.pg4._local_ip4 = "172.16.255.1"
553             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555             cls.pg4.set_table_ip4(10)
556             cls.pg5._local_ip4 = "172.16.255.3"
557             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
559             cls.pg5.set_table_ip4(10)
560             cls.pg6._local_ip4 = "172.16.255.1"
561             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563             cls.pg6.set_table_ip4(20)
564             for i in cls.overlapping_interfaces:
565                 i.config_ip4()
566                 i.admin_up()
567                 i.resolve_arp()
568
569             cls.pg7.admin_up()
570             cls.pg8.admin_up()
571
572         except Exception:
573             super(TestSNAT, cls).tearDownClass()
574             raise
575
576     def clear_snat(self):
577         """
578         Clear SNAT configuration.
579         """
580         # I found no elegant way to do this
581         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582                                    dst_address_length=32,
583                                    next_hop_address=self.pg7.remote_ip4n,
584                                    next_hop_sw_if_index=self.pg7.sw_if_index,
585                                    is_add=0)
586         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587                                    dst_address_length=32,
588                                    next_hop_address=self.pg8.remote_ip4n,
589                                    next_hop_sw_if_index=self.pg8.sw_if_index,
590                                    is_add=0)
591
592         for intf in [self.pg7, self.pg8]:
593             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
594             for n in neighbors:
595                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
596                                               n.mac_address,
597                                               n.ip_address,
598                                               is_add=0)
599
600         if self.pg7.has_ip4_config:
601             self.pg7.unconfig_ip4()
602
603         interfaces = self.vapi.snat_interface_addr_dump()
604         for intf in interfaces:
605             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
606
607         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
608                              domain_id=self.ipfix_domain_id)
609         self.ipfix_src_port = 4739
610         self.ipfix_domain_id = 1
611
612         interfaces = self.vapi.snat_interface_dump()
613         for intf in interfaces:
614             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
615                                                      intf.is_inside,
616                                                      is_add=0)
617
618         static_mappings = self.vapi.snat_static_mapping_dump()
619         for sm in static_mappings:
620             self.vapi.snat_add_static_mapping(sm.local_ip_address,
621                                               sm.external_ip_address,
622                                               local_port=sm.local_port,
623                                               external_port=sm.external_port,
624                                               addr_only=sm.addr_only,
625                                               vrf_id=sm.vrf_id,
626                                               protocol=sm.protocol,
627                                               is_add=0)
628
629         adresses = self.vapi.snat_address_dump()
630         for addr in adresses:
631             self.vapi.snat_add_address_range(addr.ip_address,
632                                              addr.ip_address,
633                                              is_add=0)
634
635     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
636                                 local_port=0, external_port=0, vrf_id=0,
637                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
638                                 proto=0):
639         """
640         Add/delete S-NAT static mapping
641
642         :param local_ip: Local IP address
643         :param external_ip: External IP address
644         :param local_port: Local port number (Optional)
645         :param external_port: External port number (Optional)
646         :param vrf_id: VRF ID (Default 0)
647         :param is_add: 1 if add, 0 if delete (Default add)
648         :param external_sw_if_index: External interface instead of IP address
649         :param proto: IP protocol (Mandatory if port specified)
650         """
651         addr_only = 1
652         if local_port and external_port:
653             addr_only = 0
654         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
655         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
656         self.vapi.snat_add_static_mapping(
657             l_ip,
658             e_ip,
659             external_sw_if_index,
660             local_port,
661             external_port,
662             addr_only,
663             vrf_id,
664             proto,
665             is_add)
666
667     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
668         """
669         Add/delete S-NAT address
670
671         :param ip: IP address
672         :param is_add: 1 if add, 0 if delete (Default add)
673         """
674         snat_addr = socket.inet_pton(socket.AF_INET, ip)
675         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
676                                          vrf_id=vrf_id)
677
678     def test_dynamic(self):
679         """ SNAT dynamic translation test """
680
681         self.snat_add_address(self.snat_addr)
682         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
683         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
684                                                  is_inside=0)
685
686         # in2out
687         pkts = self.create_stream_in(self.pg0, self.pg1)
688         self.pg0.add_stream(pkts)
689         self.pg_enable_capture(self.pg_interfaces)
690         self.pg_start()
691         capture = self.pg1.get_capture(len(pkts))
692         self.verify_capture_out(capture)
693
694         # out2in
695         pkts = self.create_stream_out(self.pg1)
696         self.pg1.add_stream(pkts)
697         self.pg_enable_capture(self.pg_interfaces)
698         self.pg_start()
699         capture = self.pg0.get_capture(len(pkts))
700         self.verify_capture_in(capture, self.pg0)
701
702     def test_dynamic_icmp_errors_in2out_ttl_1(self):
703         """ SNAT handling of client packets with TTL=1 """
704
705         self.snat_add_address(self.snat_addr)
706         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
707         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
708                                                  is_inside=0)
709
710         # Client side - generate traffic
711         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
712         self.pg0.add_stream(pkts)
713         self.pg_enable_capture(self.pg_interfaces)
714         self.pg_start()
715
716         # Client side - verify ICMP type 11 packets
717         capture = self.pg0.get_capture(len(pkts))
718         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
719
720     def test_dynamic_icmp_errors_out2in_ttl_1(self):
721         """ SNAT handling of server packets with TTL=1 """
722
723         self.snat_add_address(self.snat_addr)
724         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
725         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
726                                                  is_inside=0)
727
728         # Client side - create sessions
729         pkts = self.create_stream_in(self.pg0, self.pg1)
730         self.pg0.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733
734         # Server side - generate traffic
735         capture = self.pg1.get_capture(len(pkts))
736         self.verify_capture_out(capture)
737         pkts = self.create_stream_out(self.pg1, ttl=1)
738         self.pg1.add_stream(pkts)
739         self.pg_enable_capture(self.pg_interfaces)
740         self.pg_start()
741
742         # Server side - verify ICMP type 11 packets
743         capture = self.pg1.get_capture(len(pkts))
744         self.verify_capture_out_with_icmp_errors(capture,
745                                                  src_ip=self.pg1.local_ip4)
746
747     def test_dynamic_icmp_errors_in2out_ttl_2(self):
748         """ SNAT handling of error responses to client packets with TTL=2 """
749
750         self.snat_add_address(self.snat_addr)
751         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
752         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
753                                                  is_inside=0)
754
755         # Client side - generate traffic
756         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
757         self.pg0.add_stream(pkts)
758         self.pg_enable_capture(self.pg_interfaces)
759         self.pg_start()
760
761         # Server side - simulate ICMP type 11 response
762         capture = self.pg1.get_capture(len(pkts))
763         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
764                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
765                 ICMP(type=11) / packet[IP] for packet in capture]
766         self.pg1.add_stream(pkts)
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769
770         # Client side - verify ICMP type 11 packets
771         capture = self.pg0.get_capture(len(pkts))
772         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
773
774     def test_dynamic_icmp_errors_out2in_ttl_2(self):
775         """ SNAT handling of error responses to server packets with TTL=2 """
776
777         self.snat_add_address(self.snat_addr)
778         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
779         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
780                                                  is_inside=0)
781
782         # Client side - create sessions
783         pkts = self.create_stream_in(self.pg0, self.pg1)
784         self.pg0.add_stream(pkts)
785         self.pg_enable_capture(self.pg_interfaces)
786         self.pg_start()
787
788         # Server side - generate traffic
789         capture = self.pg1.get_capture(len(pkts))
790         self.verify_capture_out(capture)
791         pkts = self.create_stream_out(self.pg1, ttl=2)
792         self.pg1.add_stream(pkts)
793         self.pg_enable_capture(self.pg_interfaces)
794         self.pg_start()
795
796         # Client side - simulate ICMP type 11 response
797         capture = self.pg0.get_capture(len(pkts))
798         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
799                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
800                 ICMP(type=11) / packet[IP] for packet in capture]
801         self.pg0.add_stream(pkts)
802         self.pg_enable_capture(self.pg_interfaces)
803         self.pg_start()
804
805         # Server side - verify ICMP type 11 packets
806         capture = self.pg1.get_capture(len(pkts))
807         self.verify_capture_out_with_icmp_errors(capture)
808
809     def test_ping_out_interface_from_outside(self):
810         """ Ping SNAT out interface from outside network """
811
812         self.snat_add_address(self.snat_addr)
813         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
814         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
815                                                  is_inside=0)
816
817         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
818              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
819              ICMP(id=self.icmp_id_out, type='echo-request'))
820         pkts = [p]
821         self.pg1.add_stream(pkts)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         capture = self.pg1.get_capture(len(pkts))
825         self.assertEqual(1, len(capture))
826         packet = capture[0]
827         try:
828             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
829             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
830             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
831             self.assertEqual(packet[ICMP].type, 0)  # echo reply
832         except:
833             self.logger.error(ppp("Unexpected or invalid packet "
834                                   "(outside network):", packet))
835             raise
836
837     def test_ping_internal_host_from_outside(self):
838         """ Ping internal host from outside network """
839
840         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
841         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
842         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
843                                                  is_inside=0)
844
845         # out2in
846         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
847                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
848                ICMP(id=self.icmp_id_out, type='echo-request'))
849         self.pg1.add_stream(pkt)
850         self.pg_enable_capture(self.pg_interfaces)
851         self.pg_start()
852         capture = self.pg0.get_capture(1)
853         self.verify_capture_in(capture, self.pg0, packet_num=1)
854         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
855
856         # in2out
857         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
858                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
859                ICMP(id=self.icmp_id_in, type='echo-reply'))
860         self.pg0.add_stream(pkt)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg1.get_capture(1)
864         self.verify_capture_out(capture, same_port=True, packet_num=1)
865         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
866
867     def test_static_in(self):
868         """ SNAT 1:1 NAT initialized from inside network """
869
870         nat_ip = "10.0.0.10"
871         self.tcp_port_out = 6303
872         self.udp_port_out = 6304
873         self.icmp_id_out = 6305
874
875         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
876         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
877         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
878                                                  is_inside=0)
879
880         # in2out
881         pkts = self.create_stream_in(self.pg0, self.pg1)
882         self.pg0.add_stream(pkts)
883         self.pg_enable_capture(self.pg_interfaces)
884         self.pg_start()
885         capture = self.pg1.get_capture(len(pkts))
886         self.verify_capture_out(capture, nat_ip, True)
887
888         # out2in
889         pkts = self.create_stream_out(self.pg1, nat_ip)
890         self.pg1.add_stream(pkts)
891         self.pg_enable_capture(self.pg_interfaces)
892         self.pg_start()
893         capture = self.pg0.get_capture(len(pkts))
894         self.verify_capture_in(capture, self.pg0)
895
896     def test_static_out(self):
897         """ SNAT 1:1 NAT initialized from outside network """
898
899         nat_ip = "10.0.0.20"
900         self.tcp_port_out = 6303
901         self.udp_port_out = 6304
902         self.icmp_id_out = 6305
903
904         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
905         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
906         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
907                                                  is_inside=0)
908
909         # out2in
910         pkts = self.create_stream_out(self.pg1, nat_ip)
911         self.pg1.add_stream(pkts)
912         self.pg_enable_capture(self.pg_interfaces)
913         self.pg_start()
914         capture = self.pg0.get_capture(len(pkts))
915         self.verify_capture_in(capture, self.pg0)
916
917         # in2out
918         pkts = self.create_stream_in(self.pg0, self.pg1)
919         self.pg0.add_stream(pkts)
920         self.pg_enable_capture(self.pg_interfaces)
921         self.pg_start()
922         capture = self.pg1.get_capture(len(pkts))
923         self.verify_capture_out(capture, nat_ip, True)
924
925     def test_static_with_port_in(self):
926         """ SNAT 1:1 NAT with port initialized from inside network """
927
928         self.tcp_port_out = 3606
929         self.udp_port_out = 3607
930         self.icmp_id_out = 3608
931
932         self.snat_add_address(self.snat_addr)
933         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
934                                      self.tcp_port_in, self.tcp_port_out,
935                                      proto=IP_PROTOS.tcp)
936         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
937                                      self.udp_port_in, self.udp_port_out,
938                                      proto=IP_PROTOS.udp)
939         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
940                                      self.icmp_id_in, self.icmp_id_out,
941                                      proto=IP_PROTOS.icmp)
942         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
943         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
944                                                  is_inside=0)
945
946         # in2out
947         pkts = self.create_stream_in(self.pg0, self.pg1)
948         self.pg0.add_stream(pkts)
949         self.pg_enable_capture(self.pg_interfaces)
950         self.pg_start()
951         capture = self.pg1.get_capture(len(pkts))
952         self.verify_capture_out(capture)
953
954         # out2in
955         pkts = self.create_stream_out(self.pg1)
956         self.pg1.add_stream(pkts)
957         self.pg_enable_capture(self.pg_interfaces)
958         self.pg_start()
959         capture = self.pg0.get_capture(len(pkts))
960         self.verify_capture_in(capture, self.pg0)
961
962     def test_static_with_port_out(self):
963         """ SNAT 1:1 NAT with port initialized from outside network """
964
965         self.tcp_port_out = 30606
966         self.udp_port_out = 30607
967         self.icmp_id_out = 30608
968
969         self.snat_add_address(self.snat_addr)
970         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
971                                      self.tcp_port_in, self.tcp_port_out,
972                                      proto=IP_PROTOS.tcp)
973         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
974                                      self.udp_port_in, self.udp_port_out,
975                                      proto=IP_PROTOS.udp)
976         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
977                                      self.icmp_id_in, self.icmp_id_out,
978                                      proto=IP_PROTOS.icmp)
979         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
980         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
981                                                  is_inside=0)
982
983         # out2in
984         pkts = self.create_stream_out(self.pg1)
985         self.pg1.add_stream(pkts)
986         self.pg_enable_capture(self.pg_interfaces)
987         self.pg_start()
988         capture = self.pg0.get_capture(len(pkts))
989         self.verify_capture_in(capture, self.pg0)
990
991         # in2out
992         pkts = self.create_stream_in(self.pg0, self.pg1)
993         self.pg0.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture = self.pg1.get_capture(len(pkts))
997         self.verify_capture_out(capture)
998
999     def test_static_vrf_aware(self):
1000         """ SNAT 1:1 NAT VRF awareness """
1001
1002         nat_ip1 = "10.0.0.30"
1003         nat_ip2 = "10.0.0.40"
1004         self.tcp_port_out = 6303
1005         self.udp_port_out = 6304
1006         self.icmp_id_out = 6305
1007
1008         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1009                                      vrf_id=10)
1010         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1011                                      vrf_id=10)
1012         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1013                                                  is_inside=0)
1014         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1015         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1016
1017         # inside interface VRF match SNAT static mapping VRF
1018         pkts = self.create_stream_in(self.pg4, self.pg3)
1019         self.pg4.add_stream(pkts)
1020         self.pg_enable_capture(self.pg_interfaces)
1021         self.pg_start()
1022         capture = self.pg3.get_capture(len(pkts))
1023         self.verify_capture_out(capture, nat_ip1, True)
1024
1025         # inside interface VRF don't match SNAT static mapping VRF (packets
1026         # are dropped)
1027         pkts = self.create_stream_in(self.pg0, self.pg3)
1028         self.pg0.add_stream(pkts)
1029         self.pg_enable_capture(self.pg_interfaces)
1030         self.pg_start()
1031         self.pg3.assert_nothing_captured()
1032
1033     def test_multiple_inside_interfaces(self):
1034         """ SNAT multiple inside interfaces (non-overlapping address space) """
1035
1036         self.snat_add_address(self.snat_addr)
1037         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1038         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1039         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1040                                                  is_inside=0)
1041
1042         # between two S-NAT inside interfaces (no translation)
1043         pkts = self.create_stream_in(self.pg0, self.pg1)
1044         self.pg0.add_stream(pkts)
1045         self.pg_enable_capture(self.pg_interfaces)
1046         self.pg_start()
1047         capture = self.pg1.get_capture(len(pkts))
1048         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1049
1050         # from S-NAT inside to interface without S-NAT feature (no translation)
1051         pkts = self.create_stream_in(self.pg0, self.pg2)
1052         self.pg0.add_stream(pkts)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg2.get_capture(len(pkts))
1056         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1057
1058         # in2out 1st interface
1059         pkts = self.create_stream_in(self.pg0, self.pg3)
1060         self.pg0.add_stream(pkts)
1061         self.pg_enable_capture(self.pg_interfaces)
1062         self.pg_start()
1063         capture = self.pg3.get_capture(len(pkts))
1064         self.verify_capture_out(capture)
1065
1066         # out2in 1st interface
1067         pkts = self.create_stream_out(self.pg3)
1068         self.pg3.add_stream(pkts)
1069         self.pg_enable_capture(self.pg_interfaces)
1070         self.pg_start()
1071         capture = self.pg0.get_capture(len(pkts))
1072         self.verify_capture_in(capture, self.pg0)
1073
1074         # in2out 2nd interface
1075         pkts = self.create_stream_in(self.pg1, self.pg3)
1076         self.pg1.add_stream(pkts)
1077         self.pg_enable_capture(self.pg_interfaces)
1078         self.pg_start()
1079         capture = self.pg3.get_capture(len(pkts))
1080         self.verify_capture_out(capture)
1081
1082         # out2in 2nd interface
1083         pkts = self.create_stream_out(self.pg3)
1084         self.pg3.add_stream(pkts)
1085         self.pg_enable_capture(self.pg_interfaces)
1086         self.pg_start()
1087         capture = self.pg1.get_capture(len(pkts))
1088         self.verify_capture_in(capture, self.pg1)
1089
1090     def test_inside_overlapping_interfaces(self):
1091         """ SNAT multiple inside interfaces with overlapping address space """
1092
1093         static_nat_ip = "10.0.0.10"
1094         self.snat_add_address(self.snat_addr)
1095         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
1096                                                  is_inside=0)
1097         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
1098         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
1099         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
1100         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1101                                      vrf_id=20)
1102
1103         # between S-NAT inside interfaces with same VRF (no translation)
1104         pkts = self.create_stream_in(self.pg4, self.pg5)
1105         self.pg4.add_stream(pkts)
1106         self.pg_enable_capture(self.pg_interfaces)
1107         self.pg_start()
1108         capture = self.pg5.get_capture(len(pkts))
1109         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1110
1111         # between S-NAT inside interfaces with different VRF (hairpinning)
1112         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1113              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1114              TCP(sport=1234, dport=5678))
1115         self.pg4.add_stream(p)
1116         self.pg_enable_capture(self.pg_interfaces)
1117         self.pg_start()
1118         capture = self.pg6.get_capture(1)
1119         p = capture[0]
1120         try:
1121             ip = p[IP]
1122             tcp = p[TCP]
1123             self.assertEqual(ip.src, self.snat_addr)
1124             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1125             self.assertNotEqual(tcp.sport, 1234)
1126             self.assertEqual(tcp.dport, 5678)
1127         except:
1128             self.logger.error(ppp("Unexpected or invalid packet:", p))
1129             raise
1130
1131         # in2out 1st interface
1132         pkts = self.create_stream_in(self.pg4, self.pg3)
1133         self.pg4.add_stream(pkts)
1134         self.pg_enable_capture(self.pg_interfaces)
1135         self.pg_start()
1136         capture = self.pg3.get_capture(len(pkts))
1137         self.verify_capture_out(capture)
1138
1139         # out2in 1st interface
1140         pkts = self.create_stream_out(self.pg3)
1141         self.pg3.add_stream(pkts)
1142         self.pg_enable_capture(self.pg_interfaces)
1143         self.pg_start()
1144         capture = self.pg4.get_capture(len(pkts))
1145         self.verify_capture_in(capture, self.pg4)
1146
1147         # in2out 2nd interface
1148         pkts = self.create_stream_in(self.pg5, self.pg3)
1149         self.pg5.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152         capture = self.pg3.get_capture(len(pkts))
1153         self.verify_capture_out(capture)
1154
1155         # out2in 2nd interface
1156         pkts = self.create_stream_out(self.pg3)
1157         self.pg3.add_stream(pkts)
1158         self.pg_enable_capture(self.pg_interfaces)
1159         self.pg_start()
1160         capture = self.pg5.get_capture(len(pkts))
1161         self.verify_capture_in(capture, self.pg5)
1162
1163         # pg5 session dump
1164         addresses = self.vapi.snat_address_dump()
1165         self.assertEqual(len(addresses), 1)
1166         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1167         self.assertEqual(len(sessions), 3)
1168         for session in sessions:
1169             self.assertFalse(session.is_static)
1170             self.assertEqual(session.inside_ip_address[0:4],
1171                              self.pg5.remote_ip4n)
1172             self.assertEqual(session.outside_ip_address,
1173                              addresses[0].ip_address)
1174         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1175         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1176         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1177         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1178         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1179         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1180         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1181         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1182         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1183
1184         # in2out 3rd interface
1185         pkts = self.create_stream_in(self.pg6, self.pg3)
1186         self.pg6.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg3.get_capture(len(pkts))
1190         self.verify_capture_out(capture, static_nat_ip, True)
1191
1192         # out2in 3rd interface
1193         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1194         self.pg3.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197         capture = self.pg6.get_capture(len(pkts))
1198         self.verify_capture_in(capture, self.pg6)
1199
1200         # general user and session dump verifications
1201         users = self.vapi.snat_user_dump()
1202         self.assertTrue(len(users) >= 3)
1203         addresses = self.vapi.snat_address_dump()
1204         self.assertEqual(len(addresses), 1)
1205         for user in users:
1206             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1207                                                         user.vrf_id)
1208             for session in sessions:
1209                 self.assertEqual(user.ip_address, session.inside_ip_address)
1210                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1211                 self.assertTrue(session.protocol in
1212                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1213                                  IP_PROTOS.icmp])
1214
1215         # pg4 session dump
1216         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1217         self.assertTrue(len(sessions) >= 4)
1218         for session in sessions:
1219             self.assertFalse(session.is_static)
1220             self.assertEqual(session.inside_ip_address[0:4],
1221                              self.pg4.remote_ip4n)
1222             self.assertEqual(session.outside_ip_address,
1223                              addresses[0].ip_address)
1224
1225         # pg6 session dump
1226         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1227         self.assertTrue(len(sessions) >= 3)
1228         for session in sessions:
1229             self.assertTrue(session.is_static)
1230             self.assertEqual(session.inside_ip_address[0:4],
1231                              self.pg6.remote_ip4n)
1232             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1233                              map(int, static_nat_ip.split('.')))
1234             self.assertTrue(session.inside_port in
1235                             [self.tcp_port_in, self.udp_port_in,
1236                              self.icmp_id_in])
1237
1238     def test_hairpinning(self):
1239         """ SNAT hairpinning - 1:1 NAT with port"""
1240
1241         host = self.pg0.remote_hosts[0]
1242         server = self.pg0.remote_hosts[1]
1243         host_in_port = 1234
1244         host_out_port = 0
1245         server_in_port = 5678
1246         server_out_port = 8765
1247
1248         self.snat_add_address(self.snat_addr)
1249         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1250         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1251                                                  is_inside=0)
1252         # add static mapping for server
1253         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1254                                      server_in_port, server_out_port,
1255                                      proto=IP_PROTOS.tcp)
1256
1257         # send packet from host to server
1258         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1259              IP(src=host.ip4, dst=self.snat_addr) /
1260              TCP(sport=host_in_port, dport=server_out_port))
1261         self.pg0.add_stream(p)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg0.get_capture(1)
1265         p = capture[0]
1266         try:
1267             ip = p[IP]
1268             tcp = p[TCP]
1269             self.assertEqual(ip.src, self.snat_addr)
1270             self.assertEqual(ip.dst, server.ip4)
1271             self.assertNotEqual(tcp.sport, host_in_port)
1272             self.assertEqual(tcp.dport, server_in_port)
1273             self.check_tcp_checksum(p)
1274             host_out_port = tcp.sport
1275         except:
1276             self.logger.error(ppp("Unexpected or invalid packet:", p))
1277             raise
1278
1279         # send reply from server to host
1280         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1281              IP(src=server.ip4, dst=self.snat_addr) /
1282              TCP(sport=server_in_port, dport=host_out_port))
1283         self.pg0.add_stream(p)
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         capture = self.pg0.get_capture(1)
1287         p = capture[0]
1288         try:
1289             ip = p[IP]
1290             tcp = p[TCP]
1291             self.assertEqual(ip.src, self.snat_addr)
1292             self.assertEqual(ip.dst, host.ip4)
1293             self.assertEqual(tcp.sport, server_out_port)
1294             self.assertEqual(tcp.dport, host_in_port)
1295             self.check_tcp_checksum(p)
1296         except:
1297             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1298             raise
1299
1300     def test_hairpinning2(self):
1301         """ SNAT hairpinning - 1:1 NAT"""
1302
1303         server1_nat_ip = "10.0.0.10"
1304         server2_nat_ip = "10.0.0.11"
1305         host = self.pg0.remote_hosts[0]
1306         server1 = self.pg0.remote_hosts[1]
1307         server2 = self.pg0.remote_hosts[2]
1308         server_tcp_port = 22
1309         server_udp_port = 20
1310
1311         self.snat_add_address(self.snat_addr)
1312         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1313         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1314                                                  is_inside=0)
1315
1316         # add static mapping for servers
1317         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1318         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1319
1320         # host to server1
1321         pkts = []
1322         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1323              IP(src=host.ip4, dst=server1_nat_ip) /
1324              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1325         pkts.append(p)
1326         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1327              IP(src=host.ip4, dst=server1_nat_ip) /
1328              UDP(sport=self.udp_port_in, dport=server_udp_port))
1329         pkts.append(p)
1330         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1331              IP(src=host.ip4, dst=server1_nat_ip) /
1332              ICMP(id=self.icmp_id_in, type='echo-request'))
1333         pkts.append(p)
1334         self.pg0.add_stream(pkts)
1335         self.pg_enable_capture(self.pg_interfaces)
1336         self.pg_start()
1337         capture = self.pg0.get_capture(len(pkts))
1338         for packet in capture:
1339             try:
1340                 self.assertEqual(packet[IP].src, self.snat_addr)
1341                 self.assertEqual(packet[IP].dst, server1.ip4)
1342                 if packet.haslayer(TCP):
1343                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1344                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1345                     self.tcp_port_out = packet[TCP].sport
1346                     self.check_tcp_checksum(packet)
1347                 elif packet.haslayer(UDP):
1348                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1349                     self.assertEqual(packet[UDP].dport, server_udp_port)
1350                     self.udp_port_out = packet[UDP].sport
1351                 else:
1352                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1353                     self.icmp_id_out = packet[ICMP].id
1354             except:
1355                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1356                 raise
1357
1358         # server1 to host
1359         pkts = []
1360         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1361              IP(src=server1.ip4, dst=self.snat_addr) /
1362              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1363         pkts.append(p)
1364         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1365              IP(src=server1.ip4, dst=self.snat_addr) /
1366              UDP(sport=server_udp_port, dport=self.udp_port_out))
1367         pkts.append(p)
1368         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1369              IP(src=server1.ip4, dst=self.snat_addr) /
1370              ICMP(id=self.icmp_id_out, type='echo-reply'))
1371         pkts.append(p)
1372         self.pg0.add_stream(pkts)
1373         self.pg_enable_capture(self.pg_interfaces)
1374         self.pg_start()
1375         capture = self.pg0.get_capture(len(pkts))
1376         for packet in capture:
1377             try:
1378                 self.assertEqual(packet[IP].src, server1_nat_ip)
1379                 self.assertEqual(packet[IP].dst, host.ip4)
1380                 if packet.haslayer(TCP):
1381                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1382                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1383                     self.check_tcp_checksum(packet)
1384                 elif packet.haslayer(UDP):
1385                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1386                     self.assertEqual(packet[UDP].sport, server_udp_port)
1387                 else:
1388                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1389             except:
1390                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1391                 raise
1392
1393         # server2 to server1
1394         pkts = []
1395         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1396              IP(src=server2.ip4, dst=server1_nat_ip) /
1397              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1398         pkts.append(p)
1399         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1400              IP(src=server2.ip4, dst=server1_nat_ip) /
1401              UDP(sport=self.udp_port_in, dport=server_udp_port))
1402         pkts.append(p)
1403         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1404              IP(src=server2.ip4, dst=server1_nat_ip) /
1405              ICMP(id=self.icmp_id_in, type='echo-request'))
1406         pkts.append(p)
1407         self.pg0.add_stream(pkts)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410         capture = self.pg0.get_capture(len(pkts))
1411         for packet in capture:
1412             try:
1413                 self.assertEqual(packet[IP].src, server2_nat_ip)
1414                 self.assertEqual(packet[IP].dst, server1.ip4)
1415                 if packet.haslayer(TCP):
1416                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1417                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1418                     self.tcp_port_out = packet[TCP].sport
1419                     self.check_tcp_checksum(packet)
1420                 elif packet.haslayer(UDP):
1421                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1422                     self.assertEqual(packet[UDP].dport, server_udp_port)
1423                     self.udp_port_out = packet[UDP].sport
1424                 else:
1425                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1426                     self.icmp_id_out = packet[ICMP].id
1427             except:
1428                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1429                 raise
1430
1431         # server1 to server2
1432         pkts = []
1433         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1434              IP(src=server1.ip4, dst=server2_nat_ip) /
1435              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1436         pkts.append(p)
1437         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1438              IP(src=server1.ip4, dst=server2_nat_ip) /
1439              UDP(sport=server_udp_port, dport=self.udp_port_out))
1440         pkts.append(p)
1441         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1442              IP(src=server1.ip4, dst=server2_nat_ip) /
1443              ICMP(id=self.icmp_id_out, type='echo-reply'))
1444         pkts.append(p)
1445         self.pg0.add_stream(pkts)
1446         self.pg_enable_capture(self.pg_interfaces)
1447         self.pg_start()
1448         capture = self.pg0.get_capture(len(pkts))
1449         for packet in capture:
1450             try:
1451                 self.assertEqual(packet[IP].src, server1_nat_ip)
1452                 self.assertEqual(packet[IP].dst, server2.ip4)
1453                 if packet.haslayer(TCP):
1454                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1455                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1456                     self.check_tcp_checksum(packet)
1457                 elif packet.haslayer(UDP):
1458                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1459                     self.assertEqual(packet[UDP].sport, server_udp_port)
1460                 else:
1461                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1462             except:
1463                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1464                 raise
1465
1466     def test_max_translations_per_user(self):
1467         """ MAX translations per user - recycle the least recently used """
1468
1469         self.snat_add_address(self.snat_addr)
1470         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1471         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1472                                                  is_inside=0)
1473
1474         # get maximum number of translations per user
1475         snat_config = self.vapi.snat_show_config()
1476
1477         # send more than maximum number of translations per user packets
1478         pkts_num = snat_config.max_translations_per_user + 5
1479         pkts = []
1480         for port in range(0, pkts_num):
1481             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1482                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1483                  TCP(sport=1025 + port))
1484             pkts.append(p)
1485         self.pg0.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488
1489         # verify number of translated packet
1490         self.pg1.get_capture(pkts_num)
1491
1492     def test_interface_addr(self):
1493         """ Acquire SNAT addresses from interface """
1494         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1495
1496         # no address in NAT pool
1497         adresses = self.vapi.snat_address_dump()
1498         self.assertEqual(0, len(adresses))
1499
1500         # configure interface address and check NAT address pool
1501         self.pg7.config_ip4()
1502         adresses = self.vapi.snat_address_dump()
1503         self.assertEqual(1, len(adresses))
1504         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1505
1506         # remove interface address and check NAT address pool
1507         self.pg7.unconfig_ip4()
1508         adresses = self.vapi.snat_address_dump()
1509         self.assertEqual(0, len(adresses))
1510
1511     def test_interface_addr_static_mapping(self):
1512         """ Static mapping with addresses from interface """
1513         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1514         self.snat_add_static_mapping('1.2.3.4',
1515                                      external_sw_if_index=self.pg7.sw_if_index)
1516
1517         # static mappings with external interface
1518         static_mappings = self.vapi.snat_static_mapping_dump()
1519         self.assertEqual(1, len(static_mappings))
1520         self.assertEqual(self.pg7.sw_if_index,
1521                          static_mappings[0].external_sw_if_index)
1522
1523         # configure interface address and check static mappings
1524         self.pg7.config_ip4()
1525         static_mappings = self.vapi.snat_static_mapping_dump()
1526         self.assertEqual(1, len(static_mappings))
1527         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1528                          self.pg7.local_ip4n)
1529         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1530
1531         # remove interface address and check static mappings
1532         self.pg7.unconfig_ip4()
1533         static_mappings = self.vapi.snat_static_mapping_dump()
1534         self.assertEqual(0, len(static_mappings))
1535
1536     def test_ipfix_nat44_sess(self):
1537         """ S-NAT IPFIX logging NAT44 session created/delted """
1538         self.ipfix_domain_id = 10
1539         self.ipfix_src_port = 20202
1540         colector_port = 30303
1541         bind_layers(UDP, IPFIX, dport=30303)
1542         self.snat_add_address(self.snat_addr)
1543         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1544         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1545                                                  is_inside=0)
1546         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1547                                      src_address=self.pg3.local_ip4n,
1548                                      path_mtu=512,
1549                                      template_interval=10,
1550                                      collector_port=colector_port)
1551         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1552                              src_port=self.ipfix_src_port)
1553
1554         pkts = self.create_stream_in(self.pg0, self.pg1)
1555         self.pg0.add_stream(pkts)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         capture = self.pg1.get_capture(len(pkts))
1559         self.verify_capture_out(capture)
1560         self.snat_add_address(self.snat_addr, is_add=0)
1561         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1562         capture = self.pg3.get_capture(3)
1563         ipfix = IPFIXDecoder()
1564         # first load template
1565         for p in capture:
1566             self.assertTrue(p.haslayer(IPFIX))
1567             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1568             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1569             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1570             self.assertEqual(p[UDP].dport, colector_port)
1571             self.assertEqual(p[IPFIX].observationDomainID,
1572                              self.ipfix_domain_id)
1573             if p.haslayer(Template):
1574                 ipfix.add_template(p.getlayer(Template))
1575         # verify events in data set
1576         for p in capture:
1577             if p.haslayer(Data):
1578                 data = ipfix.decode_data_set(p.getlayer(Set))
1579                 self.verify_ipfix_nat44_ses(data)
1580
1581     def test_ipfix_addr_exhausted(self):
1582         """ S-NAT IPFIX logging NAT addresses exhausted """
1583         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1584         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1585                                                  is_inside=0)
1586         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1587                                      src_address=self.pg3.local_ip4n,
1588                                      path_mtu=512,
1589                                      template_interval=10)
1590         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1591                              src_port=self.ipfix_src_port)
1592
1593         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1594              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1595              TCP(sport=3025))
1596         self.pg0.add_stream(p)
1597         self.pg_enable_capture(self.pg_interfaces)
1598         self.pg_start()
1599         capture = self.pg1.get_capture(0)
1600         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1601         capture = self.pg3.get_capture(3)
1602         ipfix = IPFIXDecoder()
1603         # first load template
1604         for p in capture:
1605             self.assertTrue(p.haslayer(IPFIX))
1606             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1607             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1608             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1609             self.assertEqual(p[UDP].dport, 4739)
1610             self.assertEqual(p[IPFIX].observationDomainID,
1611                              self.ipfix_domain_id)
1612             if p.haslayer(Template):
1613                 ipfix.add_template(p.getlayer(Template))
1614         # verify events in data set
1615         for p in capture:
1616             if p.haslayer(Data):
1617                 data = ipfix.decode_data_set(p.getlayer(Set))
1618                 self.verify_ipfix_addr_exhausted(data)
1619
1620     def test_pool_addr_fib(self):
1621         """ S-NAT add pool addresses to FIB """
1622         static_addr = '10.0.0.10'
1623         self.snat_add_address(self.snat_addr)
1624         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1625         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1626                                                  is_inside=0)
1627         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1628
1629         # SNAT address
1630         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1631              ARP(op=ARP.who_has, pdst=self.snat_addr,
1632                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1633         self.pg1.add_stream(p)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         capture = self.pg1.get_capture(1)
1637         self.assertTrue(capture[0].haslayer(ARP))
1638         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1639
1640         # 1:1 NAT address
1641         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1642              ARP(op=ARP.who_has, pdst=static_addr,
1643                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1644         self.pg1.add_stream(p)
1645         self.pg_enable_capture(self.pg_interfaces)
1646         self.pg_start()
1647         capture = self.pg1.get_capture(1)
1648         self.assertTrue(capture[0].haslayer(ARP))
1649         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1650
1651         # send ARP to non-SNAT interface
1652         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1653              ARP(op=ARP.who_has, pdst=self.snat_addr,
1654                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1655         self.pg2.add_stream(p)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658         capture = self.pg1.get_capture(0)
1659
1660         # remove addresses and verify
1661         self.snat_add_address(self.snat_addr, is_add=0)
1662         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1663                                      is_add=0)
1664
1665         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1666              ARP(op=ARP.who_has, pdst=self.snat_addr,
1667                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1668         self.pg1.add_stream(p)
1669         self.pg_enable_capture(self.pg_interfaces)
1670         self.pg_start()
1671         capture = self.pg1.get_capture(0)
1672
1673         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674              ARP(op=ARP.who_has, pdst=static_addr,
1675                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676         self.pg1.add_stream(p)
1677         self.pg_enable_capture(self.pg_interfaces)
1678         self.pg_start()
1679         capture = self.pg1.get_capture(0)
1680
1681     def test_vrf_mode(self):
1682         """ S-NAT tenant VRF aware address pool mode """
1683
1684         vrf_id1 = 1
1685         vrf_id2 = 2
1686         nat_ip1 = "10.0.0.10"
1687         nat_ip2 = "10.0.0.11"
1688
1689         self.pg0.unconfig_ip4()
1690         self.pg1.unconfig_ip4()
1691         self.pg0.set_table_ip4(vrf_id1)
1692         self.pg1.set_table_ip4(vrf_id2)
1693         self.pg0.config_ip4()
1694         self.pg1.config_ip4()
1695
1696         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1697         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1698         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1699         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1700         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1701                                                  is_inside=0)
1702
1703         # first VRF
1704         pkts = self.create_stream_in(self.pg0, self.pg2)
1705         self.pg0.add_stream(pkts)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg2.get_capture(len(pkts))
1709         self.verify_capture_out(capture, nat_ip1)
1710
1711         # second VRF
1712         pkts = self.create_stream_in(self.pg1, self.pg2)
1713         self.pg1.add_stream(pkts)
1714         self.pg_enable_capture(self.pg_interfaces)
1715         self.pg_start()
1716         capture = self.pg2.get_capture(len(pkts))
1717         self.verify_capture_out(capture, nat_ip2)
1718
1719     def test_vrf_feature_independent(self):
1720         """ S-NAT tenant VRF independent address pool mode """
1721
1722         nat_ip1 = "10.0.0.10"
1723         nat_ip2 = "10.0.0.11"
1724
1725         self.snat_add_address(nat_ip1)
1726         self.snat_add_address(nat_ip2)
1727         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1728         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1729         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1730                                                  is_inside=0)
1731
1732         # first VRF
1733         pkts = self.create_stream_in(self.pg0, self.pg2)
1734         self.pg0.add_stream(pkts)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         capture = self.pg2.get_capture(len(pkts))
1738         self.verify_capture_out(capture, nat_ip1)
1739
1740         # second VRF
1741         pkts = self.create_stream_in(self.pg1, self.pg2)
1742         self.pg1.add_stream(pkts)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg2.get_capture(len(pkts))
1746         self.verify_capture_out(capture, nat_ip1)
1747
1748     def test_dynamic_ipless_interfaces(self):
1749         """ SNAT interfaces without configured ip dynamic map """
1750
1751         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1752                                       self.pg7.remote_mac,
1753                                       self.pg7.remote_ip4n,
1754                                       is_static=1)
1755         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1756                                       self.pg8.remote_mac,
1757                                       self.pg8.remote_ip4n,
1758                                       is_static=1)
1759
1760         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1761                                    dst_address_length=32,
1762                                    next_hop_address=self.pg7.remote_ip4n,
1763                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1764         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1765                                    dst_address_length=32,
1766                                    next_hop_address=self.pg8.remote_ip4n,
1767                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1768
1769         self.snat_add_address(self.snat_addr)
1770         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1771         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1772                                                  is_inside=0)
1773
1774         # in2out
1775         pkts = self.create_stream_in(self.pg7, self.pg8)
1776         self.pg7.add_stream(pkts)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779         capture = self.pg8.get_capture(len(pkts))
1780         self.verify_capture_out(capture)
1781
1782         # out2in
1783         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1784         self.pg8.add_stream(pkts)
1785         self.pg_enable_capture(self.pg_interfaces)
1786         self.pg_start()
1787         capture = self.pg7.get_capture(len(pkts))
1788         self.verify_capture_in(capture, self.pg7)
1789
1790     def test_static_ipless_interfaces(self):
1791         """ SNAT 1:1 NAT interfaces without configured ip """
1792
1793         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1794                                       self.pg7.remote_mac,
1795                                       self.pg7.remote_ip4n,
1796                                       is_static=1)
1797         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1798                                       self.pg8.remote_mac,
1799                                       self.pg8.remote_ip4n,
1800                                       is_static=1)
1801
1802         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1803                                    dst_address_length=32,
1804                                    next_hop_address=self.pg7.remote_ip4n,
1805                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1806         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1807                                    dst_address_length=32,
1808                                    next_hop_address=self.pg8.remote_ip4n,
1809                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1810
1811         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1812         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1813         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1814                                                  is_inside=0)
1815
1816         # out2in
1817         pkts = self.create_stream_out(self.pg8)
1818         self.pg8.add_stream(pkts)
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.pg_start()
1821         capture = self.pg7.get_capture(len(pkts))
1822         self.verify_capture_in(capture, self.pg7)
1823
1824         # in2out
1825         pkts = self.create_stream_in(self.pg7, self.pg8)
1826         self.pg7.add_stream(pkts)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg8.get_capture(len(pkts))
1830         self.verify_capture_out(capture, self.snat_addr, True)
1831
1832     def test_static_with_port_ipless_interfaces(self):
1833         """ SNAT 1:1 NAT with port interfaces without configured ip """
1834
1835         self.tcp_port_out = 30606
1836         self.udp_port_out = 30607
1837         self.icmp_id_out = 30608
1838
1839         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1840                                       self.pg7.remote_mac,
1841                                       self.pg7.remote_ip4n,
1842                                       is_static=1)
1843         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1844                                       self.pg8.remote_mac,
1845                                       self.pg8.remote_ip4n,
1846                                       is_static=1)
1847
1848         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1849                                    dst_address_length=32,
1850                                    next_hop_address=self.pg7.remote_ip4n,
1851                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1852         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1853                                    dst_address_length=32,
1854                                    next_hop_address=self.pg8.remote_ip4n,
1855                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1856
1857         self.snat_add_address(self.snat_addr)
1858         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1859                                      self.tcp_port_in, self.tcp_port_out,
1860                                      proto=IP_PROTOS.tcp)
1861         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1862                                      self.udp_port_in, self.udp_port_out,
1863                                      proto=IP_PROTOS.udp)
1864         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1865                                      self.icmp_id_in, self.icmp_id_out,
1866                                      proto=IP_PROTOS.icmp)
1867         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1868         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1869                                                  is_inside=0)
1870
1871         # out2in
1872         pkts = self.create_stream_out(self.pg8)
1873         self.pg8.add_stream(pkts)
1874         self.pg_enable_capture(self.pg_interfaces)
1875         self.pg_start()
1876         capture = self.pg7.get_capture(len(pkts))
1877         self.verify_capture_in(capture, self.pg7)
1878
1879         # in2out
1880         pkts = self.create_stream_in(self.pg7, self.pg8)
1881         self.pg7.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg8.get_capture(len(pkts))
1885         self.verify_capture_out(capture)
1886
1887     def test_static_unknown_proto(self):
1888         """ 1:1 NAT translate packet with unknown protocol """
1889         nat_ip = "10.0.0.10"
1890         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1891         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1892         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1893                                                  is_inside=0)
1894
1895         # in2out
1896         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1898              GRE() /
1899              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1900              TCP(sport=1234, dport=1234))
1901         self.pg0.add_stream(p)
1902         self.pg_enable_capture(self.pg_interfaces)
1903         self.pg_start()
1904         p = self.pg1.get_capture(1)
1905         packet = p[0]
1906         try:
1907             self.assertEqual(packet[IP].src, nat_ip)
1908             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1909             self.assertTrue(packet.haslayer(GRE))
1910             self.check_ip_checksum(packet)
1911         except:
1912             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1913             raise
1914
1915         # out2in
1916         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1917              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1918              GRE() /
1919              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1920              TCP(sport=1234, dport=1234))
1921         self.pg1.add_stream(p)
1922         self.pg_enable_capture(self.pg_interfaces)
1923         self.pg_start()
1924         p = self.pg0.get_capture(1)
1925         packet = p[0]
1926         try:
1927             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1928             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1929             self.assertTrue(packet.haslayer(GRE))
1930             self.check_ip_checksum(packet)
1931         except:
1932             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1933             raise
1934
1935     def test_hairpinning_static_unknown_proto(self):
1936         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1937
1938         host = self.pg0.remote_hosts[0]
1939         server = self.pg0.remote_hosts[1]
1940
1941         host_nat_ip = "10.0.0.10"
1942         server_nat_ip = "10.0.0.11"
1943
1944         self.snat_add_static_mapping(host.ip4, host_nat_ip)
1945         self.snat_add_static_mapping(server.ip4, server_nat_ip)
1946         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1947         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1948                                                  is_inside=0)
1949
1950         # host to server
1951         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1952              IP(src=host.ip4, dst=server_nat_ip) /
1953              GRE() /
1954              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1955              TCP(sport=1234, dport=1234))
1956         self.pg0.add_stream(p)
1957         self.pg_enable_capture(self.pg_interfaces)
1958         self.pg_start()
1959         p = self.pg0.get_capture(1)
1960         packet = p[0]
1961         try:
1962             self.assertEqual(packet[IP].src, host_nat_ip)
1963             self.assertEqual(packet[IP].dst, server.ip4)
1964             self.assertTrue(packet.haslayer(GRE))
1965             self.check_ip_checksum(packet)
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1968             raise
1969
1970         # server to host
1971         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1972              IP(src=server.ip4, dst=host_nat_ip) /
1973              GRE() /
1974              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1975              TCP(sport=1234, dport=1234))
1976         self.pg0.add_stream(p)
1977         self.pg_enable_capture(self.pg_interfaces)
1978         self.pg_start()
1979         p = self.pg0.get_capture(1)
1980         packet = p[0]
1981         try:
1982             self.assertEqual(packet[IP].src, server_nat_ip)
1983             self.assertEqual(packet[IP].dst, host.ip4)
1984             self.assertTrue(packet.haslayer(GRE))
1985             self.check_ip_checksum(packet)
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1988             raise
1989
1990     def test_unknown_proto(self):
1991         """ SNAT translate packet with unknown protocol """
1992         self.snat_add_address(self.snat_addr)
1993         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1994         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1995                                                  is_inside=0)
1996
1997         # in2out
1998         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1999              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2000              TCP(sport=self.tcp_port_in, dport=20))
2001         self.pg0.add_stream(p)
2002         self.pg_enable_capture(self.pg_interfaces)
2003         self.pg_start()
2004         p = self.pg1.get_capture(1)
2005
2006         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2008              GRE() /
2009              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2010              TCP(sport=1234, dport=1234))
2011         self.pg0.add_stream(p)
2012         self.pg_enable_capture(self.pg_interfaces)
2013         self.pg_start()
2014         p = self.pg1.get_capture(1)
2015         packet = p[0]
2016         try:
2017             self.assertEqual(packet[IP].src, self.snat_addr)
2018             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2019             self.assertTrue(packet.haslayer(GRE))
2020             self.check_ip_checksum(packet)
2021         except:
2022             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2023             raise
2024
2025         # out2in
2026         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2027              IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2028              GRE() /
2029              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2030              TCP(sport=1234, dport=1234))
2031         self.pg1.add_stream(p)
2032         self.pg_enable_capture(self.pg_interfaces)
2033         self.pg_start()
2034         p = self.pg0.get_capture(1)
2035         packet = p[0]
2036         try:
2037             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2038             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2039             self.assertTrue(packet.haslayer(GRE))
2040             self.check_ip_checksum(packet)
2041         except:
2042             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2043             raise
2044
2045     def test_hairpinning_unknown_proto(self):
2046         """ SNAT translate packet with unknown protocol - hairpinning """
2047         host = self.pg0.remote_hosts[0]
2048         server = self.pg0.remote_hosts[1]
2049         host_in_port = 1234
2050         host_out_port = 0
2051         server_in_port = 5678
2052         server_out_port = 8765
2053         server_nat_ip = "10.0.0.11"
2054
2055         self.snat_add_address(self.snat_addr)
2056         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2057         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2058                                                  is_inside=0)
2059
2060         # add static mapping for server
2061         self.snat_add_static_mapping(server.ip4, server_nat_ip)
2062
2063         # host to server
2064         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2065              IP(src=host.ip4, dst=server_nat_ip) /
2066              TCP(sport=host_in_port, dport=server_out_port))
2067         self.pg0.add_stream(p)
2068         self.pg_enable_capture(self.pg_interfaces)
2069         self.pg_start()
2070         capture = self.pg0.get_capture(1)
2071
2072         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2073              IP(src=host.ip4, dst=server_nat_ip) /
2074              GRE() /
2075              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2076              TCP(sport=1234, dport=1234))
2077         self.pg0.add_stream(p)
2078         self.pg_enable_capture(self.pg_interfaces)
2079         self.pg_start()
2080         p = self.pg0.get_capture(1)
2081         packet = p[0]
2082         try:
2083             self.assertEqual(packet[IP].src, self.snat_addr)
2084             self.assertEqual(packet[IP].dst, server.ip4)
2085             self.assertTrue(packet.haslayer(GRE))
2086             self.check_ip_checksum(packet)
2087         except:
2088             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2089             raise
2090
2091         # server to host
2092         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2093              IP(src=server.ip4, dst=self.snat_addr) /
2094              GRE() /
2095              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2096              TCP(sport=1234, dport=1234))
2097         self.pg0.add_stream(p)
2098         self.pg_enable_capture(self.pg_interfaces)
2099         self.pg_start()
2100         p = self.pg0.get_capture(1)
2101         packet = p[0]
2102         try:
2103             self.assertEqual(packet[IP].src, server_nat_ip)
2104             self.assertEqual(packet[IP].dst, host.ip4)
2105             self.assertTrue(packet.haslayer(GRE))
2106             self.check_ip_checksum(packet)
2107         except:
2108             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2109             raise
2110
2111     def tearDown(self):
2112         super(TestSNAT, self).tearDown()
2113         if not self.vpp_dead:
2114             self.logger.info(self.vapi.cli("show snat verbose"))
2115             self.clear_snat()
2116
2117
2118 class TestDeterministicNAT(MethodHolder):
2119     """ Deterministic NAT Test Cases """
2120
2121     @classmethod
2122     def setUpConstants(cls):
2123         super(TestDeterministicNAT, cls).setUpConstants()
2124         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
2125
2126     @classmethod
2127     def setUpClass(cls):
2128         super(TestDeterministicNAT, cls).setUpClass()
2129
2130         try:
2131             cls.tcp_port_in = 6303
2132             cls.tcp_external_port = 6303
2133             cls.udp_port_in = 6304
2134             cls.udp_external_port = 6304
2135             cls.icmp_id_in = 6305
2136             cls.snat_addr = '10.0.0.3'
2137
2138             cls.create_pg_interfaces(range(3))
2139             cls.interfaces = list(cls.pg_interfaces)
2140
2141             for i in cls.interfaces:
2142                 i.admin_up()
2143                 i.config_ip4()
2144                 i.resolve_arp()
2145
2146             cls.pg0.generate_remote_hosts(2)
2147             cls.pg0.configure_ipv4_neighbors()
2148
2149         except Exception:
2150             super(TestDeterministicNAT, cls).tearDownClass()
2151             raise
2152
2153     def create_stream_in(self, in_if, out_if, ttl=64):
2154         """
2155         Create packet stream for inside network
2156
2157         :param in_if: Inside interface
2158         :param out_if: Outside interface
2159         :param ttl: TTL of generated packets
2160         """
2161         pkts = []
2162         # TCP
2163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2164              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2165              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2166         pkts.append(p)
2167
2168         # UDP
2169         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2170              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2171              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2172         pkts.append(p)
2173
2174         # ICMP
2175         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2176              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2177              ICMP(id=self.icmp_id_in, type='echo-request'))
2178         pkts.append(p)
2179
2180         return pkts
2181
2182     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2183         """
2184         Create packet stream for outside network
2185
2186         :param out_if: Outside interface
2187         :param dst_ip: Destination IP address (Default use global SNAT address)
2188         :param ttl: TTL of generated packets
2189         """
2190         if dst_ip is None:
2191             dst_ip = self.snat_addr
2192         pkts = []
2193         # TCP
2194         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2195              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2196              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2197         pkts.append(p)
2198
2199         # UDP
2200         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2201              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2202              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2203         pkts.append(p)
2204
2205         # ICMP
2206         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2207              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2208              ICMP(id=self.icmp_external_id, type='echo-reply'))
2209         pkts.append(p)
2210
2211         return pkts
2212
2213     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2214         """
2215         Verify captured packets on outside network
2216
2217         :param capture: Captured packets
2218         :param nat_ip: Translated IP address (Default use global SNAT address)
2219         :param same_port: Sorce port number is not translated (Default False)
2220         :param packet_num: Expected number of packets (Default 3)
2221         """
2222         if nat_ip is None:
2223             nat_ip = self.snat_addr
2224         self.assertEqual(packet_num, len(capture))
2225         for packet in capture:
2226             try:
2227                 self.assertEqual(packet[IP].src, nat_ip)
2228                 if packet.haslayer(TCP):
2229                     self.tcp_port_out = packet[TCP].sport
2230                 elif packet.haslayer(UDP):
2231                     self.udp_port_out = packet[UDP].sport
2232                 else:
2233                     self.icmp_external_id = packet[ICMP].id
2234             except:
2235                 self.logger.error(ppp("Unexpected or invalid packet "
2236                                       "(outside network):", packet))
2237                 raise
2238
2239     def initiate_tcp_session(self, in_if, out_if):
2240         """
2241         Initiates TCP session
2242
2243         :param in_if: Inside interface
2244         :param out_if: Outside interface
2245         """
2246         try:
2247             # SYN packet in->out
2248             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2249                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2250                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2251                      flags="S"))
2252             in_if.add_stream(p)
2253             self.pg_enable_capture(self.pg_interfaces)
2254             self.pg_start()
2255             capture = out_if.get_capture(1)
2256             p = capture[0]
2257             self.tcp_port_out = p[TCP].sport
2258
2259             # SYN + ACK packet out->in
2260             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2261                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
2262                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2263                      flags="SA"))
2264             out_if.add_stream(p)
2265             self.pg_enable_capture(self.pg_interfaces)
2266             self.pg_start()
2267             in_if.get_capture(1)
2268
2269             # ACK packet in->out
2270             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2271                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2272                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2273                      flags="A"))
2274             in_if.add_stream(p)
2275             self.pg_enable_capture(self.pg_interfaces)
2276             self.pg_start()
2277             out_if.get_capture(1)
2278
2279         except:
2280             self.logger.error("TCP 3 way handshake failed")
2281             raise
2282
2283     def verify_ipfix_max_entries_per_user(self, data):
2284         """
2285         Verify IPFIX maximum entries per user exceeded event
2286
2287         :param data: Decoded IPFIX data records
2288         """
2289         self.assertEqual(1, len(data))
2290         record = data[0]
2291         # natEvent
2292         self.assertEqual(ord(record[230]), 13)
2293         # natQuotaExceededEvent
2294         self.assertEqual('\x03\x00\x00\x00', record[466])
2295         # sourceIPv4Address
2296         self.assertEqual(self.pg0.remote_ip4n, record[8])
2297
2298     def test_deterministic_mode(self):
2299         """ S-NAT run deterministic mode """
2300         in_addr = '172.16.255.0'
2301         out_addr = '172.17.255.50'
2302         in_addr_t = '172.16.255.20'
2303         in_addr_n = socket.inet_aton(in_addr)
2304         out_addr_n = socket.inet_aton(out_addr)
2305         in_addr_t_n = socket.inet_aton(in_addr_t)
2306         in_plen = 24
2307         out_plen = 32
2308
2309         snat_config = self.vapi.snat_show_config()
2310         self.assertEqual(1, snat_config.deterministic)
2311
2312         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
2313
2314         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
2315         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2316         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
2317         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2318
2319         deterministic_mappings = self.vapi.snat_det_map_dump()
2320         self.assertEqual(len(deterministic_mappings), 1)
2321         dsm = deterministic_mappings[0]
2322         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2323         self.assertEqual(in_plen, dsm.in_plen)
2324         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2325         self.assertEqual(out_plen, dsm.out_plen)
2326
2327         self.clear_snat()
2328         deterministic_mappings = self.vapi.snat_det_map_dump()
2329         self.assertEqual(len(deterministic_mappings), 0)
2330
2331     def test_set_timeouts(self):
2332         """ Set deterministic NAT timeouts """
2333         timeouts_before = self.vapi.snat_det_get_timeouts()
2334
2335         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
2336                                         timeouts_before.tcp_established + 10,
2337                                         timeouts_before.tcp_transitory + 10,
2338                                         timeouts_before.icmp + 10)
2339
2340         timeouts_after = self.vapi.snat_det_get_timeouts()
2341
2342         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2343         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2344         self.assertNotEqual(timeouts_before.tcp_established,
2345                             timeouts_after.tcp_established)
2346         self.assertNotEqual(timeouts_before.tcp_transitory,
2347                             timeouts_after.tcp_transitory)
2348
2349     def test_det_in(self):
2350         """ CGNAT translation test (TCP, UDP, ICMP) """
2351
2352         nat_ip = "10.0.0.10"
2353
2354         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2355                                    32,
2356                                    socket.inet_aton(nat_ip),
2357                                    32)
2358         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2359         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2360                                                  is_inside=0)
2361
2362         # in2out
2363         pkts = self.create_stream_in(self.pg0, self.pg1)
2364         self.pg0.add_stream(pkts)
2365         self.pg_enable_capture(self.pg_interfaces)
2366         self.pg_start()
2367         capture = self.pg1.get_capture(len(pkts))
2368         self.verify_capture_out(capture, nat_ip)
2369
2370         # out2in
2371         pkts = self.create_stream_out(self.pg1, nat_ip)
2372         self.pg1.add_stream(pkts)
2373         self.pg_enable_capture(self.pg_interfaces)
2374         self.pg_start()
2375         capture = self.pg0.get_capture(len(pkts))
2376         self.verify_capture_in(capture, self.pg0)
2377
2378         # session dump test
2379         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2380         self.assertEqual(len(sessions), 3)
2381
2382         # TCP session
2383         s = sessions[0]
2384         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2385         self.assertEqual(s.in_port, self.tcp_port_in)
2386         self.assertEqual(s.out_port, self.tcp_port_out)
2387         self.assertEqual(s.ext_port, self.tcp_external_port)
2388
2389         # UDP session
2390         s = sessions[1]
2391         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2392         self.assertEqual(s.in_port, self.udp_port_in)
2393         self.assertEqual(s.out_port, self.udp_port_out)
2394         self.assertEqual(s.ext_port, self.udp_external_port)
2395
2396         # ICMP session
2397         s = sessions[2]
2398         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2399         self.assertEqual(s.in_port, self.icmp_id_in)
2400         self.assertEqual(s.out_port, self.icmp_external_id)
2401
2402     def test_multiple_users(self):
2403         """ CGNAT multiple users """
2404
2405         nat_ip = "10.0.0.10"
2406         port_in = 80
2407         external_port = 6303
2408
2409         host0 = self.pg0.remote_hosts[0]
2410         host1 = self.pg0.remote_hosts[1]
2411
2412         self.vapi.snat_add_det_map(host0.ip4n,
2413                                    24,
2414                                    socket.inet_aton(nat_ip),
2415                                    32)
2416         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2417         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2418                                                  is_inside=0)
2419
2420         # host0 to out
2421         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2422              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2423              TCP(sport=port_in, dport=external_port))
2424         self.pg0.add_stream(p)
2425         self.pg_enable_capture(self.pg_interfaces)
2426         self.pg_start()
2427         capture = self.pg1.get_capture(1)
2428         p = capture[0]
2429         try:
2430             ip = p[IP]
2431             tcp = p[TCP]
2432             self.assertEqual(ip.src, nat_ip)
2433             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2434             self.assertEqual(tcp.dport, external_port)
2435             port_out0 = tcp.sport
2436         except:
2437             self.logger.error(ppp("Unexpected or invalid packet:", p))
2438             raise
2439
2440         # host1 to out
2441         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2442              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2443              TCP(sport=port_in, dport=external_port))
2444         self.pg0.add_stream(p)
2445         self.pg_enable_capture(self.pg_interfaces)
2446         self.pg_start()
2447         capture = self.pg1.get_capture(1)
2448         p = capture[0]
2449         try:
2450             ip = p[IP]
2451             tcp = p[TCP]
2452             self.assertEqual(ip.src, nat_ip)
2453             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2454             self.assertEqual(tcp.dport, external_port)
2455             port_out1 = tcp.sport
2456         except:
2457             self.logger.error(ppp("Unexpected or invalid packet:", p))
2458             raise
2459
2460         dms = self.vapi.snat_det_map_dump()
2461         self.assertEqual(1, len(dms))
2462         self.assertEqual(2, dms[0].ses_num)
2463
2464         # out to host0
2465         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2466              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2467              TCP(sport=external_port, dport=port_out0))
2468         self.pg1.add_stream(p)
2469         self.pg_enable_capture(self.pg_interfaces)
2470         self.pg_start()
2471         capture = self.pg0.get_capture(1)
2472         p = capture[0]
2473         try:
2474             ip = p[IP]
2475             tcp = p[TCP]
2476             self.assertEqual(ip.src, self.pg1.remote_ip4)
2477             self.assertEqual(ip.dst, host0.ip4)
2478             self.assertEqual(tcp.dport, port_in)
2479             self.assertEqual(tcp.sport, external_port)
2480         except:
2481             self.logger.error(ppp("Unexpected or invalid packet:", p))
2482             raise
2483
2484         # out to host1
2485         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2486              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2487              TCP(sport=external_port, dport=port_out1))
2488         self.pg1.add_stream(p)
2489         self.pg_enable_capture(self.pg_interfaces)
2490         self.pg_start()
2491         capture = self.pg0.get_capture(1)
2492         p = capture[0]
2493         try:
2494             ip = p[IP]
2495             tcp = p[TCP]
2496             self.assertEqual(ip.src, self.pg1.remote_ip4)
2497             self.assertEqual(ip.dst, host1.ip4)
2498             self.assertEqual(tcp.dport, port_in)
2499             self.assertEqual(tcp.sport, external_port)
2500         except:
2501             self.logger.error(ppp("Unexpected or invalid packet", p))
2502             raise
2503
2504         # session close api test
2505         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2506                                              port_out1,
2507                                              self.pg1.remote_ip4n,
2508                                              external_port)
2509         dms = self.vapi.snat_det_map_dump()
2510         self.assertEqual(dms[0].ses_num, 1)
2511
2512         self.vapi.snat_det_close_session_in(host0.ip4n,
2513                                             port_in,
2514                                             self.pg1.remote_ip4n,
2515                                             external_port)
2516         dms = self.vapi.snat_det_map_dump()
2517         self.assertEqual(dms[0].ses_num, 0)
2518
2519     def test_tcp_session_close_detection_in(self):
2520         """ CGNAT TCP session close initiated from inside network """
2521         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2522                                    32,
2523                                    socket.inet_aton(self.snat_addr),
2524                                    32)
2525         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2526         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2527                                                  is_inside=0)
2528
2529         self.initiate_tcp_session(self.pg0, self.pg1)
2530
2531         # close the session from inside
2532         try:
2533             # FIN packet in -> out
2534             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2535                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2536                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2537                      flags="F"))
2538             self.pg0.add_stream(p)
2539             self.pg_enable_capture(self.pg_interfaces)
2540             self.pg_start()
2541             self.pg1.get_capture(1)
2542
2543             pkts = []
2544
2545             # ACK packet out -> in
2546             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2547                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2548                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2549                      flags="A"))
2550             pkts.append(p)
2551
2552             # FIN packet out -> in
2553             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2554                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2555                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2556                      flags="F"))
2557             pkts.append(p)
2558
2559             self.pg1.add_stream(pkts)
2560             self.pg_enable_capture(self.pg_interfaces)
2561             self.pg_start()
2562             self.pg0.get_capture(2)
2563
2564             # ACK packet in -> out
2565             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2566                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2567                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2568                      flags="A"))
2569             self.pg0.add_stream(p)
2570             self.pg_enable_capture(self.pg_interfaces)
2571             self.pg_start()
2572             self.pg1.get_capture(1)
2573
2574             # Check if snat closed the session
2575             dms = self.vapi.snat_det_map_dump()
2576             self.assertEqual(0, dms[0].ses_num)
2577         except:
2578             self.logger.error("TCP session termination failed")
2579             raise
2580
2581     def test_tcp_session_close_detection_out(self):
2582         """ CGNAT TCP session close initiated from outside network """
2583         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2584                                    32,
2585                                    socket.inet_aton(self.snat_addr),
2586                                    32)
2587         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2588         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2589                                                  is_inside=0)
2590
2591         self.initiate_tcp_session(self.pg0, self.pg1)
2592
2593         # close the session from outside
2594         try:
2595             # FIN packet out -> in
2596             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2597                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2598                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2599                      flags="F"))
2600             self.pg1.add_stream(p)
2601             self.pg_enable_capture(self.pg_interfaces)
2602             self.pg_start()
2603             self.pg0.get_capture(1)
2604
2605             pkts = []
2606
2607             # ACK packet in -> out
2608             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2609                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2610                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2611                      flags="A"))
2612             pkts.append(p)
2613
2614             # ACK packet in -> out
2615             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2616                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2617                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2618                      flags="F"))
2619             pkts.append(p)
2620
2621             self.pg0.add_stream(pkts)
2622             self.pg_enable_capture(self.pg_interfaces)
2623             self.pg_start()
2624             self.pg1.get_capture(2)
2625
2626             # ACK packet out -> in
2627             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2628                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2629                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2630                      flags="A"))
2631             self.pg1.add_stream(p)
2632             self.pg_enable_capture(self.pg_interfaces)
2633             self.pg_start()
2634             self.pg0.get_capture(1)
2635
2636             # Check if snat closed the session
2637             dms = self.vapi.snat_det_map_dump()
2638             self.assertEqual(0, dms[0].ses_num)
2639         except:
2640             self.logger.error("TCP session termination failed")
2641             raise
2642
2643     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2644     def test_session_timeout(self):
2645         """ CGNAT session timeouts """
2646         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2647                                    32,
2648                                    socket.inet_aton(self.snat_addr),
2649                                    32)
2650         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2651         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2652                                                  is_inside=0)
2653
2654         self.initiate_tcp_session(self.pg0, self.pg1)
2655         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2656         pkts = self.create_stream_in(self.pg0, self.pg1)
2657         self.pg0.add_stream(pkts)
2658         self.pg_enable_capture(self.pg_interfaces)
2659         self.pg_start()
2660         capture = self.pg1.get_capture(len(pkts))
2661         sleep(15)
2662
2663         dms = self.vapi.snat_det_map_dump()
2664         self.assertEqual(0, dms[0].ses_num)
2665
2666     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2667     def test_session_limit_per_user(self):
2668         """ CGNAT maximum 1000 sessions per user should be created """
2669         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2670                                    32,
2671                                    socket.inet_aton(self.snat_addr),
2672                                    32)
2673         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2674         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2675                                                  is_inside=0)
2676         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2677                                      src_address=self.pg2.local_ip4n,
2678                                      path_mtu=512,
2679                                      template_interval=10)
2680         self.vapi.snat_ipfix()
2681
2682         pkts = []
2683         for port in range(1025, 2025):
2684             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2685                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2686                  UDP(sport=port, dport=port))
2687             pkts.append(p)
2688
2689         self.pg0.add_stream(pkts)
2690         self.pg_enable_capture(self.pg_interfaces)
2691         self.pg_start()
2692         capture = self.pg1.get_capture(len(pkts))
2693
2694         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2695              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2696              UDP(sport=3001, dport=3002))
2697         self.pg0.add_stream(p)
2698         self.pg_enable_capture(self.pg_interfaces)
2699         self.pg_start()
2700         capture = self.pg1.assert_nothing_captured()
2701
2702         # verify ICMP error packet
2703         capture = self.pg0.get_capture(1)
2704         p = capture[0]
2705         self.assertTrue(p.haslayer(ICMP))
2706         icmp = p[ICMP]
2707         self.assertEqual(icmp.type, 3)
2708         self.assertEqual(icmp.code, 1)
2709         self.assertTrue(icmp.haslayer(IPerror))
2710         inner_ip = icmp[IPerror]
2711         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2712         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2713
2714         dms = self.vapi.snat_det_map_dump()
2715
2716         self.assertEqual(1000, dms[0].ses_num)
2717
2718         # verify IPFIX logging
2719         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2720         sleep(1)
2721         capture = self.pg2.get_capture(2)
2722         ipfix = IPFIXDecoder()
2723         # first load template
2724         for p in capture:
2725             self.assertTrue(p.haslayer(IPFIX))
2726             if p.haslayer(Template):
2727                 ipfix.add_template(p.getlayer(Template))
2728         # verify events in data set
2729         for p in capture:
2730             if p.haslayer(Data):
2731                 data = ipfix.decode_data_set(p.getlayer(Set))
2732                 self.verify_ipfix_max_entries_per_user(data)
2733
2734     def clear_snat(self):
2735         """
2736         Clear SNAT configuration.
2737         """
2738         self.vapi.snat_ipfix(enable=0)
2739         self.vapi.snat_det_set_timeouts()
2740         deterministic_mappings = self.vapi.snat_det_map_dump()
2741         for dsm in deterministic_mappings:
2742             self.vapi.snat_add_det_map(dsm.in_addr,
2743                                        dsm.in_plen,
2744                                        dsm.out_addr,
2745                                        dsm.out_plen,
2746                                        is_add=0)
2747
2748         interfaces = self.vapi.snat_interface_dump()
2749         for intf in interfaces:
2750             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2751                                                      intf.is_inside,
2752                                                      is_add=0)
2753
2754     def tearDown(self):
2755         super(TestDeterministicNAT, self).tearDown()
2756         if not self.vpp_dead:
2757             self.logger.info(self.vapi.cli("show snat detail"))
2758             self.clear_snat()
2759
2760
2761 class TestNAT64(MethodHolder):
2762     """ NAT64 Test Cases """
2763
2764     @classmethod
2765     def setUpClass(cls):
2766         super(TestNAT64, cls).setUpClass()
2767
2768         try:
2769             cls.tcp_port_in = 6303
2770             cls.tcp_port_out = 6303
2771             cls.udp_port_in = 6304
2772             cls.udp_port_out = 6304
2773             cls.icmp_id_in = 6305
2774             cls.icmp_id_out = 6305
2775             cls.nat_addr = '10.0.0.3'
2776             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2777             cls.vrf1_id = 10
2778             cls.vrf1_nat_addr = '10.0.10.3'
2779             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2780                                                    cls.vrf1_nat_addr)
2781
2782             cls.create_pg_interfaces(range(3))
2783             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2784             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2785             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2786
2787             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2788
2789             cls.pg0.generate_remote_hosts(2)
2790
2791             for i in cls.ip6_interfaces:
2792                 i.admin_up()
2793                 i.config_ip6()
2794                 i.configure_ipv6_neighbors()
2795
2796             for i in cls.ip4_interfaces:
2797                 i.admin_up()
2798                 i.config_ip4()
2799                 i.resolve_arp()
2800
2801         except Exception:
2802             super(TestNAT64, cls).tearDownClass()
2803             raise
2804
2805     def test_pool(self):
2806         """ Add/delete address to NAT64 pool """
2807         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2808
2809         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2810
2811         addresses = self.vapi.nat64_pool_addr_dump()
2812         self.assertEqual(len(addresses), 1)
2813         self.assertEqual(addresses[0].address, nat_addr)
2814
2815         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2816
2817         addresses = self.vapi.nat64_pool_addr_dump()
2818         self.assertEqual(len(addresses), 0)
2819
2820     def test_interface(self):
2821         """ Enable/disable NAT64 feature on the interface """
2822         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2823         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2824
2825         interfaces = self.vapi.nat64_interface_dump()
2826         self.assertEqual(len(interfaces), 2)
2827         pg0_found = False
2828         pg1_found = False
2829         for intf in interfaces:
2830             if intf.sw_if_index == self.pg0.sw_if_index:
2831                 self.assertEqual(intf.is_inside, 1)
2832                 pg0_found = True
2833             elif intf.sw_if_index == self.pg1.sw_if_index:
2834                 self.assertEqual(intf.is_inside, 0)
2835                 pg1_found = True
2836         self.assertTrue(pg0_found)
2837         self.assertTrue(pg1_found)
2838
2839         features = self.vapi.cli("show interface features pg0")
2840         self.assertNotEqual(features.find('nat64-in2out'), -1)
2841         features = self.vapi.cli("show interface features pg1")
2842         self.assertNotEqual(features.find('nat64-out2in'), -1)
2843
2844         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2845         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2846
2847         interfaces = self.vapi.nat64_interface_dump()
2848         self.assertEqual(len(interfaces), 0)
2849
2850     def test_static_bib(self):
2851         """ Add/delete static BIB entry """
2852         in_addr = socket.inet_pton(socket.AF_INET6,
2853                                    '2001:db8:85a3::8a2e:370:7334')
2854         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2855         in_port = 1234
2856         out_port = 5678
2857         proto = IP_PROTOS.tcp
2858
2859         self.vapi.nat64_add_del_static_bib(in_addr,
2860                                            out_addr,
2861                                            in_port,
2862                                            out_port,
2863                                            proto)
2864         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2865         static_bib_num = 0
2866         for bibe in bib:
2867             if bibe.is_static:
2868                 static_bib_num += 1
2869                 self.assertEqual(bibe.i_addr, in_addr)
2870                 self.assertEqual(bibe.o_addr, out_addr)
2871                 self.assertEqual(bibe.i_port, in_port)
2872                 self.assertEqual(bibe.o_port, out_port)
2873         self.assertEqual(static_bib_num, 1)
2874
2875         self.vapi.nat64_add_del_static_bib(in_addr,
2876                                            out_addr,
2877                                            in_port,
2878                                            out_port,
2879                                            proto,
2880                                            is_add=0)
2881         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2882         static_bib_num = 0
2883         for bibe in bib:
2884             if bibe.is_static:
2885                 static_bib_num += 1
2886         self.assertEqual(static_bib_num, 0)
2887
2888     def test_set_timeouts(self):
2889         """ Set NAT64 timeouts """
2890         # verify default values
2891         timeouts = self.vapi.nat64_get_timeouts()
2892         self.assertEqual(timeouts.udp, 300)
2893         self.assertEqual(timeouts.icmp, 60)
2894         self.assertEqual(timeouts.tcp_trans, 240)
2895         self.assertEqual(timeouts.tcp_est, 7440)
2896         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2897
2898         # set and verify custom values
2899         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2900                                      tcp_est=7450, tcp_incoming_syn=10)
2901         timeouts = self.vapi.nat64_get_timeouts()
2902         self.assertEqual(timeouts.udp, 200)
2903         self.assertEqual(timeouts.icmp, 30)
2904         self.assertEqual(timeouts.tcp_trans, 250)
2905         self.assertEqual(timeouts.tcp_est, 7450)
2906         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2907
2908     def test_dynamic(self):
2909         """ NAT64 dynamic translation test """
2910         self.tcp_port_in = 6303
2911         self.udp_port_in = 6304
2912         self.icmp_id_in = 6305
2913
2914         ses_num_start = self.nat64_get_ses_num()
2915
2916         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2917                                                 self.nat_addr_n)
2918         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2919         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2920
2921         # in2out
2922         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2923         self.pg0.add_stream(pkts)
2924         self.pg_enable_capture(self.pg_interfaces)
2925         self.pg_start()
2926         capture = self.pg1.get_capture(len(pkts))
2927         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2928                                 dst_ip=self.pg1.remote_ip4)
2929
2930         # out2in
2931         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2932         self.pg1.add_stream(pkts)
2933         self.pg_enable_capture(self.pg_interfaces)
2934         self.pg_start()
2935         capture = self.pg0.get_capture(len(pkts))
2936         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2937         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2938
2939         # in2out
2940         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2941         self.pg0.add_stream(pkts)
2942         self.pg_enable_capture(self.pg_interfaces)
2943         self.pg_start()
2944         capture = self.pg1.get_capture(len(pkts))
2945         self.verify_capture_out(capture, nat_ip=self.nat_addr,
2946                                 dst_ip=self.pg1.remote_ip4)
2947
2948         # out2in
2949         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2950         self.pg1.add_stream(pkts)
2951         self.pg_enable_capture(self.pg_interfaces)
2952         self.pg_start()
2953         capture = self.pg0.get_capture(len(pkts))
2954         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2955
2956         ses_num_end = self.nat64_get_ses_num()
2957
2958         self.assertEqual(ses_num_end - ses_num_start, 3)
2959
2960         # tenant with specific VRF
2961         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
2962                                                 self.vrf1_nat_addr_n,
2963                                                 vrf_id=self.vrf1_id)
2964         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
2965
2966         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
2967         self.pg2.add_stream(pkts)
2968         self.pg_enable_capture(self.pg_interfaces)
2969         self.pg_start()
2970         capture = self.pg1.get_capture(len(pkts))
2971         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
2972                                 dst_ip=self.pg1.remote_ip4)
2973
2974         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
2975         self.pg1.add_stream(pkts)
2976         self.pg_enable_capture(self.pg_interfaces)
2977         self.pg_start()
2978         capture = self.pg2.get_capture(len(pkts))
2979         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
2980
2981     def test_static(self):
2982         """ NAT64 static translation test """
2983         self.tcp_port_in = 60303
2984         self.udp_port_in = 60304
2985         self.icmp_id_in = 60305
2986         self.tcp_port_out = 60303
2987         self.udp_port_out = 60304
2988         self.icmp_id_out = 60305
2989
2990         ses_num_start = self.nat64_get_ses_num()
2991
2992         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2993                                                 self.nat_addr_n)
2994         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2995         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2996
2997         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2998                                            self.nat_addr_n,
2999                                            self.tcp_port_in,
3000                                            self.tcp_port_out,
3001                                            IP_PROTOS.tcp)
3002         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3003                                            self.nat_addr_n,
3004                                            self.udp_port_in,
3005                                            self.udp_port_out,
3006                                            IP_PROTOS.udp)
3007         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3008                                            self.nat_addr_n,
3009                                            self.icmp_id_in,
3010                                            self.icmp_id_out,
3011                                            IP_PROTOS.icmp)
3012
3013         # in2out
3014         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3015         self.pg0.add_stream(pkts)
3016         self.pg_enable_capture(self.pg_interfaces)
3017         self.pg_start()
3018         capture = self.pg1.get_capture(len(pkts))
3019         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3020                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3021
3022         # out2in
3023         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3024         self.pg1.add_stream(pkts)
3025         self.pg_enable_capture(self.pg_interfaces)
3026         self.pg_start()
3027         capture = self.pg0.get_capture(len(pkts))
3028         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3029         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3030
3031         ses_num_end = self.nat64_get_ses_num()
3032
3033         self.assertEqual(ses_num_end - ses_num_start, 3)
3034
3035     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3036     def test_session_timeout(self):
3037         """ NAT64 session timeout """
3038         self.icmp_id_in = 1234
3039         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3040                                                 self.nat_addr_n)
3041         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3042         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3043         self.vapi.nat64_set_timeouts(icmp=5)
3044
3045         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3046         self.pg0.add_stream(pkts)
3047         self.pg_enable_capture(self.pg_interfaces)
3048         self.pg_start()
3049         capture = self.pg1.get_capture(len(pkts))
3050
3051         ses_num_before_timeout = self.nat64_get_ses_num()
3052
3053         sleep(15)
3054
3055         # ICMP session after timeout
3056         ses_num_after_timeout = self.nat64_get_ses_num()
3057         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3058
3059     def test_icmp_error(self):
3060         """ NAT64 ICMP Error message translation """
3061         self.tcp_port_in = 6303
3062         self.udp_port_in = 6304
3063         self.icmp_id_in = 6305
3064
3065         ses_num_start = self.nat64_get_ses_num()
3066
3067         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3068                                                 self.nat_addr_n)
3069         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3070         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3071
3072         # send some packets to create sessions
3073         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3074         self.pg0.add_stream(pkts)
3075         self.pg_enable_capture(self.pg_interfaces)
3076         self.pg_start()
3077         capture_ip4 = self.pg1.get_capture(len(pkts))
3078         self.verify_capture_out(capture_ip4,
3079                                 nat_ip=self.nat_addr,
3080                                 dst_ip=self.pg1.remote_ip4)
3081
3082         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3083         self.pg1.add_stream(pkts)
3084         self.pg_enable_capture(self.pg_interfaces)
3085         self.pg_start()
3086         capture_ip6 = self.pg0.get_capture(len(pkts))
3087         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3088         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3089                                    self.pg0.remote_ip6)
3090
3091         # in2out
3092         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3093                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3094                 ICMPv6DestUnreach(code=1) /
3095                 packet[IPv6] for packet in capture_ip6]
3096         self.pg0.add_stream(pkts)
3097         self.pg_enable_capture(self.pg_interfaces)
3098         self.pg_start()
3099         capture = self.pg1.get_capture(len(pkts))
3100         for packet in capture:
3101             try:
3102                 self.assertEqual(packet[IP].src, self.nat_addr)
3103                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3104                 self.assertEqual(packet[ICMP].type, 3)
3105                 self.assertEqual(packet[ICMP].code, 13)
3106                 inner = packet[IPerror]
3107                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3108                 self.assertEqual(inner.dst, self.nat_addr)
3109                 self.check_icmp_checksum(packet)
3110                 if inner.haslayer(TCPerror):
3111                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3112                 elif inner.haslayer(UDPerror):
3113                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3114                 else:
3115                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3116             except:
3117                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3118                 raise
3119
3120         # out2in
3121         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3122                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3123                 ICMP(type=3, code=13) /
3124                 packet[IP] for packet in capture_ip4]
3125         self.pg1.add_stream(pkts)
3126         self.pg_enable_capture(self.pg_interfaces)
3127         self.pg_start()
3128         capture = self.pg0.get_capture(len(pkts))
3129         for packet in capture:
3130             try:
3131                 self.assertEqual(packet[IPv6].src, ip.src)
3132                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3133                 icmp = packet[ICMPv6DestUnreach]
3134                 self.assertEqual(icmp.code, 1)
3135                 inner = icmp[IPerror6]
3136                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3137                 self.assertEqual(inner.dst, ip.src)
3138                 self.check_icmpv6_checksum(packet)
3139                 if inner.haslayer(TCPerror):
3140                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3141                 elif inner.haslayer(UDPerror):
3142                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3143                 else:
3144                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3145                                      self.icmp_id_in)
3146             except:
3147                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3148                 raise
3149
3150     def test_hairpinning(self):
3151         """ NAT64 hairpinning """
3152
3153         client = self.pg0.remote_hosts[0]
3154         server = self.pg0.remote_hosts[1]
3155         server_tcp_in_port = 22
3156         server_tcp_out_port = 4022
3157         server_udp_in_port = 23
3158         server_udp_out_port = 4023
3159         client_tcp_in_port = 1234
3160         client_udp_in_port = 1235
3161         client_tcp_out_port = 0
3162         client_udp_out_port = 0
3163         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3164         nat_addr_ip6 = ip.src
3165
3166         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3167                                                 self.nat_addr_n)
3168         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3169         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3170
3171         self.vapi.nat64_add_del_static_bib(server.ip6n,
3172                                            self.nat_addr_n,
3173                                            server_tcp_in_port,
3174                                            server_tcp_out_port,
3175                                            IP_PROTOS.tcp)
3176         self.vapi.nat64_add_del_static_bib(server.ip6n,
3177                                            self.nat_addr_n,
3178                                            server_udp_in_port,
3179                                            server_udp_out_port,
3180                                            IP_PROTOS.udp)
3181
3182         # client to server
3183         pkts = []
3184         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3185              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3186              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3187         pkts.append(p)
3188         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3189              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3190              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3191         pkts.append(p)
3192         self.pg0.add_stream(pkts)
3193         self.pg_enable_capture(self.pg_interfaces)
3194         self.pg_start()
3195         capture = self.pg0.get_capture(len(pkts))
3196         for packet in capture:
3197             try:
3198                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3199                 self.assertEqual(packet[IPv6].dst, server.ip6)
3200                 if packet.haslayer(TCP):
3201                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3202                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3203                     self.check_tcp_checksum(packet)
3204                     client_tcp_out_port = packet[TCP].sport
3205                 else:
3206                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3207                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3208                     self.check_udp_checksum(packet)
3209                     client_udp_out_port = packet[UDP].sport
3210             except:
3211                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3212                 raise
3213
3214         # server to client
3215         pkts = []
3216         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3217              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3218              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3219         pkts.append(p)
3220         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3221              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3222              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3223         pkts.append(p)
3224         self.pg0.add_stream(pkts)
3225         self.pg_enable_capture(self.pg_interfaces)
3226         self.pg_start()
3227         capture = self.pg0.get_capture(len(pkts))
3228         for packet in capture:
3229             try:
3230                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3231                 self.assertEqual(packet[IPv6].dst, client.ip6)
3232                 if packet.haslayer(TCP):
3233                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3234                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3235                     self.check_tcp_checksum(packet)
3236                 else:
3237                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3238                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3239                     self.check_udp_checksum(packet)
3240             except:
3241                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3242                 raise
3243
3244         # ICMP error
3245         pkts = []
3246         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3247                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3248                 ICMPv6DestUnreach(code=1) /
3249                 packet[IPv6] for packet in capture]
3250         self.pg0.add_stream(pkts)
3251         self.pg_enable_capture(self.pg_interfaces)
3252         self.pg_start()
3253         capture = self.pg0.get_capture(len(pkts))
3254         for packet in capture:
3255             try:
3256                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3257                 self.assertEqual(packet[IPv6].dst, server.ip6)
3258                 icmp = packet[ICMPv6DestUnreach]
3259                 self.assertEqual(icmp.code, 1)
3260                 inner = icmp[IPerror6]
3261                 self.assertEqual(inner.src, server.ip6)
3262                 self.assertEqual(inner.dst, nat_addr_ip6)
3263                 self.check_icmpv6_checksum(packet)
3264                 if inner.haslayer(TCPerror):
3265                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3266                     self.assertEqual(inner[TCPerror].dport,
3267                                      client_tcp_out_port)
3268                 else:
3269                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3270                     self.assertEqual(inner[UDPerror].dport,
3271                                      client_udp_out_port)
3272             except:
3273                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3274                 raise
3275
3276     def test_prefix(self):
3277         """ NAT64 Network-Specific Prefix """
3278
3279         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3280                                                 self.nat_addr_n)
3281         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3282         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3283         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3284                                                 self.vrf1_nat_addr_n,
3285                                                 vrf_id=self.vrf1_id)
3286         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3287
3288         # Add global prefix
3289         global_pref64 = "2001:db8::"
3290         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3291         global_pref64_len = 32
3292         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3293
3294         prefix = self.vapi.nat64_prefix_dump()
3295         self.assertEqual(len(prefix), 1)
3296         self.assertEqual(prefix[0].prefix, global_pref64_n)
3297         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3298         self.assertEqual(prefix[0].vrf_id, 0)
3299
3300         # Add tenant specific prefix
3301         vrf1_pref64 = "2001:db8:122:300::"
3302         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3303         vrf1_pref64_len = 56
3304         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3305                                        vrf1_pref64_len,
3306                                        vrf_id=self.vrf1_id)
3307         prefix = self.vapi.nat64_prefix_dump()
3308         self.assertEqual(len(prefix), 2)
3309
3310         # Global prefix
3311         pkts = self.create_stream_in_ip6(self.pg0,
3312                                          self.pg1,
3313                                          pref=global_pref64,
3314                                          plen=global_pref64_len)
3315         self.pg0.add_stream(pkts)
3316         self.pg_enable_capture(self.pg_interfaces)
3317         self.pg_start()
3318         capture = self.pg1.get_capture(len(pkts))
3319         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3320                                 dst_ip=self.pg1.remote_ip4)
3321
3322         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3323         self.pg1.add_stream(pkts)
3324         self.pg_enable_capture(self.pg_interfaces)
3325         self.pg_start()
3326         capture = self.pg0.get_capture(len(pkts))
3327         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3328                                   global_pref64,
3329                                   global_pref64_len)
3330         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3331
3332         # Tenant specific prefix
3333         pkts = self.create_stream_in_ip6(self.pg2,
3334                                          self.pg1,
3335                                          pref=vrf1_pref64,
3336                                          plen=vrf1_pref64_len)
3337         self.pg2.add_stream(pkts)
3338         self.pg_enable_capture(self.pg_interfaces)
3339         self.pg_start()
3340         capture = self.pg1.get_capture(len(pkts))
3341         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3342                                 dst_ip=self.pg1.remote_ip4)
3343
3344         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3345         self.pg1.add_stream(pkts)
3346         self.pg_enable_capture(self.pg_interfaces)
3347         self.pg_start()
3348         capture = self.pg2.get_capture(len(pkts))
3349         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3350                                   vrf1_pref64,
3351                                   vrf1_pref64_len)
3352         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3353
3354     def _test_unknown_proto(self):
3355         """ NAT64 translate packet with unknown protocol """
3356
3357         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3358                                                 self.nat_addr_n)
3359         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3360         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3361         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3362
3363         # in2out
3364         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3365              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3366              TCP(sport=self.tcp_port_in, dport=20))
3367         self.pg0.add_stream(p)
3368         self.pg_enable_capture(self.pg_interfaces)
3369         self.pg_start()
3370         p = self.pg1.get_capture(1)
3371
3372         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3373              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3374              GRE() /
3375              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3376              TCP(sport=1234, dport=1234))
3377         self.pg0.add_stream(p)
3378         self.pg_enable_capture(self.pg_interfaces)
3379         self.pg_start()
3380         p = self.pg1.get_capture(1)
3381         packet = p[0]
3382         try:
3383             self.assertEqual(packet[IP].src, self.nat_addr)
3384             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3385             self.assertTrue(packet.haslayer(GRE))
3386             self.check_ip_checksum(packet)
3387         except:
3388             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3389             raise
3390
3391         # out2in
3392         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3393              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3394              GRE() /
3395              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3396              TCP(sport=1234, dport=1234))
3397         self.pg1.add_stream(p)
3398         self.pg_enable_capture(self.pg_interfaces)
3399         self.pg_start()
3400         p = self.pg0.get_capture(1)
3401         packet = p[0]
3402         try:
3403             self.assertEqual(packet[IPv6].src, remote_ip6)
3404             self.assertEqual(packet[IPv6].dst, self.pgi0.remote_ip6)
3405             self.assertTrue(packet.haslayer(GRE))
3406         except:
3407             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3408             raise
3409
3410     def _test_hairpinning_unknown_proto(self):
3411         """ NAT64 translate packet with unknown protocol - hairpinning """
3412
3413         client = self.pg0.remote_hosts[0]
3414         server = self.pg0.remote_hosts[1]
3415         server_tcp_in_port = 22
3416         server_tcp_out_port = 4022
3417         client_tcp_in_port = 1234
3418         client_udp_in_port = 1235
3419         nat_addr_ip6 = self.compose_ip6(self.nat_addr, '64:ff9b::', 96)
3420
3421         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3422                                                 self.nat_addr_n)
3423         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3424         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3425
3426         self.vapi.nat64_add_del_static_bib(server.ip6n,
3427                                            self.nat_addr_n,
3428                                            server_tcp_in_port,
3429                                            server_tcp_out_port,
3430                                            IP_PROTOS.tcp)
3431
3432         # client to server
3433         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3434              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3435              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3436         self.pg0.add_stream(p)
3437         self.pg_enable_capture(self.pg_interfaces)
3438         self.pg_start()
3439         p = self.pg0.get_capture(1)
3440
3441         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3442              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3443              GRE() /
3444              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3445              TCP(sport=1234, dport=1234))
3446         self.pg0.add_stream(p)
3447         self.pg_enable_capture(self.pg_interfaces)
3448         self.pg_start()
3449         p = self.pg0.get_capture(1)
3450         packet = p[0]
3451         try:
3452             self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3453             self.assertEqual(packet[IPv6].dst, server.ip6)
3454             self.assertTrue(packet.haslayer(GRE))
3455         except:
3456             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3457             raise
3458
3459         # server to client
3460         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3461              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3462              GRE() /
3463              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3464              TCP(sport=1234, dport=1234))
3465         self.pg0.add_stream(p)
3466         self.pg_enable_capture(self.pg_interfaces)
3467         self.pg_start()
3468         p = self.pg0.get_capture(1)
3469         packet = p[0]
3470         try:
3471             self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3472             self.assertEqual(packet[IPv6].dst, client.ip6)
3473             self.assertTrue(packet.haslayer(GRE))
3474         except:
3475             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3476             raise
3477
3478     def nat64_get_ses_num(self):
3479         """
3480         Return number of active NAT64 sessions.
3481         """
3482         ses_num = 0
3483         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
3484         ses_num += len(st)
3485         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
3486         ses_num += len(st)
3487         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
3488         ses_num += len(st)
3489         return ses_num
3490
3491     def clear_nat64(self):
3492         """
3493         Clear NAT64 configuration.
3494         """
3495         self.vapi.nat64_set_timeouts()
3496
3497         interfaces = self.vapi.nat64_interface_dump()
3498         for intf in interfaces:
3499             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3500                                               intf.is_inside,
3501                                               is_add=0)
3502
3503         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3504         for bibe in bib:
3505             if bibe.is_static:
3506                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3507                                                    bibe.o_addr,
3508                                                    bibe.i_port,
3509                                                    bibe.o_port,
3510                                                    bibe.proto,
3511                                                    bibe.vrf_id,
3512                                                    is_add=0)
3513
3514         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3515         for bibe in bib:
3516             if bibe.is_static:
3517                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3518                                                    bibe.o_addr,
3519                                                    bibe.i_port,
3520                                                    bibe.o_port,
3521                                                    bibe.proto,
3522                                                    bibe.vrf_id,
3523                                                    is_add=0)
3524
3525         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3526         for bibe in bib:
3527             if bibe.is_static:
3528                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3529                                                    bibe.o_addr,
3530                                                    bibe.i_port,
3531                                                    bibe.o_port,
3532                                                    bibe.proto,
3533                                                    bibe.vrf_id,
3534                                                    is_add=0)
3535
3536         adresses = self.vapi.nat64_pool_addr_dump()
3537         for addr in adresses:
3538             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3539                                                     addr.address,
3540                                                     vrf_id=addr.vrf_id,
3541                                                     is_add=0)
3542
3543         prefixes = self.vapi.nat64_prefix_dump()
3544         for prefix in prefixes:
3545             self.vapi.nat64_add_del_prefix(prefix.prefix,
3546                                            prefix.prefix_len,
3547                                            vrf_id=prefix.vrf_id,
3548                                            is_add=0)
3549
3550     def tearDown(self):
3551         super(TestNAT64, self).tearDown()
3552         if not self.vpp_dead:
3553             self.logger.info(self.vapi.cli("show nat64 pool"))
3554             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3555             self.logger.info(self.vapi.cli("show nat64 prefix"))
3556             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
3557             self.logger.info(self.vapi.cli("show nat64 bib udp"))
3558             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
3559             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
3560             self.logger.info(self.vapi.cli("show nat64 session table udp"))
3561             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
3562             self.clear_nat64()
3563
3564 if __name__ == '__main__':
3565     unittest.main(testRunner=VppTestRunner)