P2P Ethernet - API
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.l2 import Ether, ARP
12 from scapy.data import IP_PROTOS
13 from scapy.packet import bind_layers
14 from util import ppp
15 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
16 from time import sleep
17
18
19 class MethodHolder(VppTestCase):
20     """ SNAT create capture and verify method holder """
21
22     @classmethod
23     def setUpClass(cls):
24         super(MethodHolder, cls).setUpClass()
25
26     def tearDown(self):
27         super(MethodHolder, self).tearDown()
28
29     def create_stream_in(self, in_if, out_if, ttl=64):
30         """
31         Create packet stream for inside network
32
33         :param in_if: Inside interface
34         :param out_if: Outside interface
35         :param ttl: TTL of generated packets
36         """
37         pkts = []
38         # TCP
39         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
40              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
41              TCP(sport=self.tcp_port_in, dport=20))
42         pkts.append(p)
43
44         # UDP
45         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
46              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
47              UDP(sport=self.udp_port_in, dport=20))
48         pkts.append(p)
49
50         # ICMP
51         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
52              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
53              ICMP(id=self.icmp_id_in, type='echo-request'))
54         pkts.append(p)
55
56         return pkts
57
58     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
59         """
60         Create IPv6 packet stream for inside network
61
62         :param in_if: Inside interface
63         :param out_if: Outside interface
64         :param ttl: Hop Limit of generated packets
65         """
66         pkts = []
67         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
68         # TCP
69         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
70              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
71              TCP(sport=self.tcp_port_in, dport=20))
72         pkts.append(p)
73
74         # UDP
75         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
76              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
77              UDP(sport=self.udp_port_in, dport=20))
78         pkts.append(p)
79
80         # ICMP
81         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
82              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
83              ICMPv6EchoRequest(id=self.icmp_id_in))
84         pkts.append(p)
85
86         return pkts
87
88     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
89         """
90         Create packet stream for outside network
91
92         :param out_if: Outside interface
93         :param dst_ip: Destination IP address (Default use global SNAT address)
94         :param ttl: TTL of generated packets
95         """
96         if dst_ip is None:
97             dst_ip = self.snat_addr
98         pkts = []
99         # TCP
100         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
101              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
102              TCP(dport=self.tcp_port_out, sport=20))
103         pkts.append(p)
104
105         # UDP
106         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
107              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
108              UDP(dport=self.udp_port_out, sport=20))
109         pkts.append(p)
110
111         # ICMP
112         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
114              ICMP(id=self.icmp_id_out, type='echo-reply'))
115         pkts.append(p)
116
117         return pkts
118
119     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
120                            packet_num=3, dst_ip=None):
121         """
122         Verify captured packets on outside network
123
124         :param capture: Captured packets
125         :param nat_ip: Translated IP address (Default use global SNAT address)
126         :param same_port: Sorce port number is not translated (Default False)
127         :param packet_num: Expected number of packets (Default 3)
128         :param dst_ip: Destination IP address (Default do not verify)
129         """
130         if nat_ip is None:
131             nat_ip = self.snat_addr
132         self.assertEqual(packet_num, len(capture))
133         for packet in capture:
134             try:
135                 self.assertEqual(packet[IP].src, nat_ip)
136                 if dst_ip is not None:
137                     self.assertEqual(packet[IP].dst, dst_ip)
138                 if packet.haslayer(TCP):
139                     if same_port:
140                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
141                     else:
142                         self.assertNotEqual(
143                             packet[TCP].sport, self.tcp_port_in)
144                     self.tcp_port_out = packet[TCP].sport
145                 elif packet.haslayer(UDP):
146                     if same_port:
147                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
148                     else:
149                         self.assertNotEqual(
150                             packet[UDP].sport, self.udp_port_in)
151                     self.udp_port_out = packet[UDP].sport
152                 else:
153                     if same_port:
154                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
155                     else:
156                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
157                     self.icmp_id_out = packet[ICMP].id
158             except:
159                 self.logger.error(ppp("Unexpected or invalid packet "
160                                       "(outside network):", packet))
161                 raise
162
163     def verify_capture_in(self, capture, in_if, packet_num=3):
164         """
165         Verify captured packets on inside network
166
167         :param capture: Captured packets
168         :param in_if: Inside interface
169         :param packet_num: Expected number of packets (Default 3)
170         """
171         self.assertEqual(packet_num, len(capture))
172         for packet in capture:
173             try:
174                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
175                 if packet.haslayer(TCP):
176                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
177                 elif packet.haslayer(UDP):
178                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
179                 else:
180                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
181             except:
182                 self.logger.error(ppp("Unexpected or invalid packet "
183                                       "(inside network):", packet))
184                 raise
185
186     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
187         """
188         Verify captured IPv6 packets on inside network
189
190         :param capture: Captured packets
191         :param src_ip: Source IP
192         :param dst_ip: Destination IP address
193         :param packet_num: Expected number of packets (Default 3)
194         """
195         self.assertEqual(packet_num, len(capture))
196         for packet in capture:
197             try:
198                 self.assertEqual(packet[IPv6].src, src_ip)
199                 self.assertEqual(packet[IPv6].dst, dst_ip)
200                 if packet.haslayer(TCP):
201                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
202                 elif packet.haslayer(UDP):
203                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
204                 else:
205                     self.assertEqual(packet[ICMPv6EchoReply].id,
206                                      self.icmp_id_in)
207             except:
208                 self.logger.error(ppp("Unexpected or invalid packet "
209                                       "(inside network):", packet))
210                 raise
211
212     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
213         """
214         Verify captured packet that don't have to be translated
215
216         :param capture: Captured packets
217         :param ingress_if: Ingress interface
218         :param egress_if: Egress interface
219         """
220         for packet in capture:
221             try:
222                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
223                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
224                 if packet.haslayer(TCP):
225                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
226                 elif packet.haslayer(UDP):
227                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
228                 else:
229                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
230             except:
231                 self.logger.error(ppp("Unexpected or invalid packet "
232                                       "(inside network):", packet))
233                 raise
234
235     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
236                                             packet_num=3, icmp_type=11):
237         """
238         Verify captured packets with ICMP errors on outside network
239
240         :param capture: Captured packets
241         :param src_ip: Translated IP address or IP address of VPP
242                        (Default use global SNAT address)
243         :param packet_num: Expected number of packets (Default 3)
244         :param icmp_type: Type of error ICMP packet
245                           we are expecting (Default 11)
246         """
247         if src_ip is None:
248             src_ip = self.snat_addr
249         self.assertEqual(packet_num, len(capture))
250         for packet in capture:
251             try:
252                 self.assertEqual(packet[IP].src, src_ip)
253                 self.assertTrue(packet.haslayer(ICMP))
254                 icmp = packet[ICMP]
255                 self.assertEqual(icmp.type, icmp_type)
256                 self.assertTrue(icmp.haslayer(IPerror))
257                 inner_ip = icmp[IPerror]
258                 if inner_ip.haslayer(TCPerror):
259                     self.assertEqual(inner_ip[TCPerror].dport,
260                                      self.tcp_port_out)
261                 elif inner_ip.haslayer(UDPerror):
262                     self.assertEqual(inner_ip[UDPerror].dport,
263                                      self.udp_port_out)
264                 else:
265                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
266             except:
267                 self.logger.error(ppp("Unexpected or invalid packet "
268                                       "(outside network):", packet))
269                 raise
270
271     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
272                                            icmp_type=11):
273         """
274         Verify captured packets with ICMP errors on inside network
275
276         :param capture: Captured packets
277         :param in_if: Inside interface
278         :param packet_num: Expected number of packets (Default 3)
279         :param icmp_type: Type of error ICMP packet
280                           we are expecting (Default 11)
281         """
282         self.assertEqual(packet_num, len(capture))
283         for packet in capture:
284             try:
285                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
286                 self.assertTrue(packet.haslayer(ICMP))
287                 icmp = packet[ICMP]
288                 self.assertEqual(icmp.type, icmp_type)
289                 self.assertTrue(icmp.haslayer(IPerror))
290                 inner_ip = icmp[IPerror]
291                 if inner_ip.haslayer(TCPerror):
292                     self.assertEqual(inner_ip[TCPerror].sport,
293                                      self.tcp_port_in)
294                 elif inner_ip.haslayer(UDPerror):
295                     self.assertEqual(inner_ip[UDPerror].sport,
296                                      self.udp_port_in)
297                 else:
298                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
299             except:
300                 self.logger.error(ppp("Unexpected or invalid packet "
301                                       "(inside network):", packet))
302                 raise
303
304     def verify_ipfix_nat44_ses(self, data):
305         """
306         Verify IPFIX NAT44 session create/delete event
307
308         :param data: Decoded IPFIX data records
309         """
310         nat44_ses_create_num = 0
311         nat44_ses_delete_num = 0
312         self.assertEqual(6, len(data))
313         for record in data:
314             # natEvent
315             self.assertIn(ord(record[230]), [4, 5])
316             if ord(record[230]) == 4:
317                 nat44_ses_create_num += 1
318             else:
319                 nat44_ses_delete_num += 1
320             # sourceIPv4Address
321             self.assertEqual(self.pg0.remote_ip4n, record[8])
322             # postNATSourceIPv4Address
323             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
324                              record[225])
325             # ingressVRFID
326             self.assertEqual(struct.pack("!I", 0), record[234])
327             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
328             if IP_PROTOS.icmp == ord(record[4]):
329                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
330                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
331                                  record[227])
332             elif IP_PROTOS.tcp == ord(record[4]):
333                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
334                                  record[7])
335                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
336                                  record[227])
337             elif IP_PROTOS.udp == ord(record[4]):
338                 self.assertEqual(struct.pack("!H", self.udp_port_in),
339                                  record[7])
340                 self.assertEqual(struct.pack("!H", self.udp_port_out),
341                                  record[227])
342             else:
343                 self.fail("Invalid protocol")
344         self.assertEqual(3, nat44_ses_create_num)
345         self.assertEqual(3, nat44_ses_delete_num)
346
347     def verify_ipfix_addr_exhausted(self, data):
348         """
349         Verify IPFIX NAT addresses event
350
351         :param data: Decoded IPFIX data records
352         """
353         self.assertEqual(1, len(data))
354         record = data[0]
355         # natEvent
356         self.assertEqual(ord(record[230]), 3)
357         # natPoolID
358         self.assertEqual(struct.pack("!I", 0), record[283])
359
360
361 class TestSNAT(MethodHolder):
362     """ SNAT Test Cases """
363
364     @classmethod
365     def setUpClass(cls):
366         super(TestSNAT, cls).setUpClass()
367
368         try:
369             cls.tcp_port_in = 6303
370             cls.tcp_port_out = 6303
371             cls.udp_port_in = 6304
372             cls.udp_port_out = 6304
373             cls.icmp_id_in = 6305
374             cls.icmp_id_out = 6305
375             cls.snat_addr = '10.0.0.3'
376             cls.ipfix_src_port = 4739
377             cls.ipfix_domain_id = 1
378
379             cls.create_pg_interfaces(range(9))
380             cls.interfaces = list(cls.pg_interfaces[0:4])
381
382             for i in cls.interfaces:
383                 i.admin_up()
384                 i.config_ip4()
385                 i.resolve_arp()
386
387             cls.pg0.generate_remote_hosts(3)
388             cls.pg0.configure_ipv4_neighbors()
389
390             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
391
392             cls.pg4._local_ip4 = "172.16.255.1"
393             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
394             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
395             cls.pg4.set_table_ip4(10)
396             cls.pg5._local_ip4 = "172.16.255.3"
397             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
398             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
399             cls.pg5.set_table_ip4(10)
400             cls.pg6._local_ip4 = "172.16.255.1"
401             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
402             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
403             cls.pg6.set_table_ip4(20)
404             for i in cls.overlapping_interfaces:
405                 i.config_ip4()
406                 i.admin_up()
407                 i.resolve_arp()
408
409             cls.pg7.admin_up()
410             cls.pg8.admin_up()
411
412         except Exception:
413             super(TestSNAT, cls).tearDownClass()
414             raise
415
416     def clear_snat(self):
417         """
418         Clear SNAT configuration.
419         """
420         # I found no elegant way to do this
421         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
422                                    dst_address_length=32,
423                                    next_hop_address=self.pg7.remote_ip4n,
424                                    next_hop_sw_if_index=self.pg7.sw_if_index,
425                                    is_add=0)
426         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
427                                    dst_address_length=32,
428                                    next_hop_address=self.pg8.remote_ip4n,
429                                    next_hop_sw_if_index=self.pg8.sw_if_index,
430                                    is_add=0)
431
432         for intf in [self.pg7, self.pg8]:
433             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
434             for n in neighbors:
435                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
436                                               n.mac_address,
437                                               n.ip_address,
438                                               is_add=0)
439
440         if self.pg7.has_ip4_config:
441             self.pg7.unconfig_ip4()
442
443         interfaces = self.vapi.snat_interface_addr_dump()
444         for intf in interfaces:
445             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
446
447         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
448                              domain_id=self.ipfix_domain_id)
449         self.ipfix_src_port = 4739
450         self.ipfix_domain_id = 1
451
452         interfaces = self.vapi.snat_interface_dump()
453         for intf in interfaces:
454             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
455                                                      intf.is_inside,
456                                                      is_add=0)
457
458         static_mappings = self.vapi.snat_static_mapping_dump()
459         for sm in static_mappings:
460             self.vapi.snat_add_static_mapping(sm.local_ip_address,
461                                               sm.external_ip_address,
462                                               local_port=sm.local_port,
463                                               external_port=sm.external_port,
464                                               addr_only=sm.addr_only,
465                                               vrf_id=sm.vrf_id,
466                                               protocol=sm.protocol,
467                                               is_add=0)
468
469         adresses = self.vapi.snat_address_dump()
470         for addr in adresses:
471             self.vapi.snat_add_address_range(addr.ip_address,
472                                              addr.ip_address,
473                                              is_add=0)
474
475     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
476                                 local_port=0, external_port=0, vrf_id=0,
477                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
478                                 proto=0):
479         """
480         Add/delete S-NAT static mapping
481
482         :param local_ip: Local IP address
483         :param external_ip: External IP address
484         :param local_port: Local port number (Optional)
485         :param external_port: External port number (Optional)
486         :param vrf_id: VRF ID (Default 0)
487         :param is_add: 1 if add, 0 if delete (Default add)
488         :param external_sw_if_index: External interface instead of IP address
489         :param proto: IP protocol (Mandatory if port specified)
490         """
491         addr_only = 1
492         if local_port and external_port:
493             addr_only = 0
494         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
495         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
496         self.vapi.snat_add_static_mapping(
497             l_ip,
498             e_ip,
499             external_sw_if_index,
500             local_port,
501             external_port,
502             addr_only,
503             vrf_id,
504             proto,
505             is_add)
506
507     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
508         """
509         Add/delete S-NAT address
510
511         :param ip: IP address
512         :param is_add: 1 if add, 0 if delete (Default add)
513         """
514         snat_addr = socket.inet_pton(socket.AF_INET, ip)
515         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
516                                          vrf_id=vrf_id)
517
518     def test_dynamic(self):
519         """ SNAT dynamic translation test """
520
521         self.snat_add_address(self.snat_addr)
522         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
523         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
524                                                  is_inside=0)
525
526         # in2out
527         pkts = self.create_stream_in(self.pg0, self.pg1)
528         self.pg0.add_stream(pkts)
529         self.pg_enable_capture(self.pg_interfaces)
530         self.pg_start()
531         capture = self.pg1.get_capture(len(pkts))
532         self.verify_capture_out(capture)
533
534         # out2in
535         pkts = self.create_stream_out(self.pg1)
536         self.pg1.add_stream(pkts)
537         self.pg_enable_capture(self.pg_interfaces)
538         self.pg_start()
539         capture = self.pg0.get_capture(len(pkts))
540         self.verify_capture_in(capture, self.pg0)
541
542     def test_dynamic_icmp_errors_in2out_ttl_1(self):
543         """ SNAT handling of client packets with TTL=1 """
544
545         self.snat_add_address(self.snat_addr)
546         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
547         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
548                                                  is_inside=0)
549
550         # Client side - generate traffic
551         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
552         self.pg0.add_stream(pkts)
553         self.pg_enable_capture(self.pg_interfaces)
554         self.pg_start()
555
556         # Client side - verify ICMP type 11 packets
557         capture = self.pg0.get_capture(len(pkts))
558         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
559
560     def test_dynamic_icmp_errors_out2in_ttl_1(self):
561         """ SNAT handling of server packets with TTL=1 """
562
563         self.snat_add_address(self.snat_addr)
564         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
565         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
566                                                  is_inside=0)
567
568         # Client side - create sessions
569         pkts = self.create_stream_in(self.pg0, self.pg1)
570         self.pg0.add_stream(pkts)
571         self.pg_enable_capture(self.pg_interfaces)
572         self.pg_start()
573
574         # Server side - generate traffic
575         capture = self.pg1.get_capture(len(pkts))
576         self.verify_capture_out(capture)
577         pkts = self.create_stream_out(self.pg1, ttl=1)
578         self.pg1.add_stream(pkts)
579         self.pg_enable_capture(self.pg_interfaces)
580         self.pg_start()
581
582         # Server side - verify ICMP type 11 packets
583         capture = self.pg1.get_capture(len(pkts))
584         self.verify_capture_out_with_icmp_errors(capture,
585                                                  src_ip=self.pg1.local_ip4)
586
587     def test_dynamic_icmp_errors_in2out_ttl_2(self):
588         """ SNAT handling of error responses to client packets with TTL=2 """
589
590         self.snat_add_address(self.snat_addr)
591         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
592         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
593                                                  is_inside=0)
594
595         # Client side - generate traffic
596         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
597         self.pg0.add_stream(pkts)
598         self.pg_enable_capture(self.pg_interfaces)
599         self.pg_start()
600
601         # Server side - simulate ICMP type 11 response
602         capture = self.pg1.get_capture(len(pkts))
603         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
604                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
605                 ICMP(type=11) / packet[IP] for packet in capture]
606         self.pg1.add_stream(pkts)
607         self.pg_enable_capture(self.pg_interfaces)
608         self.pg_start()
609
610         # Client side - verify ICMP type 11 packets
611         capture = self.pg0.get_capture(len(pkts))
612         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
613
614     def test_dynamic_icmp_errors_out2in_ttl_2(self):
615         """ SNAT handling of error responses to server packets with TTL=2 """
616
617         self.snat_add_address(self.snat_addr)
618         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
619         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
620                                                  is_inside=0)
621
622         # Client side - create sessions
623         pkts = self.create_stream_in(self.pg0, self.pg1)
624         self.pg0.add_stream(pkts)
625         self.pg_enable_capture(self.pg_interfaces)
626         self.pg_start()
627
628         # Server side - generate traffic
629         capture = self.pg1.get_capture(len(pkts))
630         self.verify_capture_out(capture)
631         pkts = self.create_stream_out(self.pg1, ttl=2)
632         self.pg1.add_stream(pkts)
633         self.pg_enable_capture(self.pg_interfaces)
634         self.pg_start()
635
636         # Client side - simulate ICMP type 11 response
637         capture = self.pg0.get_capture(len(pkts))
638         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
639                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
640                 ICMP(type=11) / packet[IP] for packet in capture]
641         self.pg0.add_stream(pkts)
642         self.pg_enable_capture(self.pg_interfaces)
643         self.pg_start()
644
645         # Server side - verify ICMP type 11 packets
646         capture = self.pg1.get_capture(len(pkts))
647         self.verify_capture_out_with_icmp_errors(capture)
648
649     def test_ping_out_interface_from_outside(self):
650         """ Ping SNAT out interface from outside network """
651
652         self.snat_add_address(self.snat_addr)
653         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
654         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
655                                                  is_inside=0)
656
657         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
658              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
659              ICMP(id=self.icmp_id_out, type='echo-request'))
660         pkts = [p]
661         self.pg1.add_stream(pkts)
662         self.pg_enable_capture(self.pg_interfaces)
663         self.pg_start()
664         capture = self.pg1.get_capture(len(pkts))
665         self.assertEqual(1, len(capture))
666         packet = capture[0]
667         try:
668             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
669             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
670             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
671             self.assertEqual(packet[ICMP].type, 0)  # echo reply
672         except:
673             self.logger.error(ppp("Unexpected or invalid packet "
674                                   "(outside network):", packet))
675             raise
676
677     def test_ping_internal_host_from_outside(self):
678         """ Ping internal host from outside network """
679
680         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
681         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
682         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
683                                                  is_inside=0)
684
685         # out2in
686         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
687                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
688                ICMP(id=self.icmp_id_out, type='echo-request'))
689         self.pg1.add_stream(pkt)
690         self.pg_enable_capture(self.pg_interfaces)
691         self.pg_start()
692         capture = self.pg0.get_capture(1)
693         self.verify_capture_in(capture, self.pg0, packet_num=1)
694         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
695
696         # in2out
697         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
698                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
699                ICMP(id=self.icmp_id_in, type='echo-reply'))
700         self.pg0.add_stream(pkt)
701         self.pg_enable_capture(self.pg_interfaces)
702         self.pg_start()
703         capture = self.pg1.get_capture(1)
704         self.verify_capture_out(capture, same_port=True, packet_num=1)
705         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
706
707     def test_static_in(self):
708         """ SNAT 1:1 NAT initialized from inside network """
709
710         nat_ip = "10.0.0.10"
711         self.tcp_port_out = 6303
712         self.udp_port_out = 6304
713         self.icmp_id_out = 6305
714
715         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
716         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
717         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
718                                                  is_inside=0)
719
720         # in2out
721         pkts = self.create_stream_in(self.pg0, self.pg1)
722         self.pg0.add_stream(pkts)
723         self.pg_enable_capture(self.pg_interfaces)
724         self.pg_start()
725         capture = self.pg1.get_capture(len(pkts))
726         self.verify_capture_out(capture, nat_ip, True)
727
728         # out2in
729         pkts = self.create_stream_out(self.pg1, nat_ip)
730         self.pg1.add_stream(pkts)
731         self.pg_enable_capture(self.pg_interfaces)
732         self.pg_start()
733         capture = self.pg0.get_capture(len(pkts))
734         self.verify_capture_in(capture, self.pg0)
735
736     def test_static_out(self):
737         """ SNAT 1:1 NAT initialized from outside network """
738
739         nat_ip = "10.0.0.20"
740         self.tcp_port_out = 6303
741         self.udp_port_out = 6304
742         self.icmp_id_out = 6305
743
744         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
745         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
746         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
747                                                  is_inside=0)
748
749         # out2in
750         pkts = self.create_stream_out(self.pg1, nat_ip)
751         self.pg1.add_stream(pkts)
752         self.pg_enable_capture(self.pg_interfaces)
753         self.pg_start()
754         capture = self.pg0.get_capture(len(pkts))
755         self.verify_capture_in(capture, self.pg0)
756
757         # in2out
758         pkts = self.create_stream_in(self.pg0, self.pg1)
759         self.pg0.add_stream(pkts)
760         self.pg_enable_capture(self.pg_interfaces)
761         self.pg_start()
762         capture = self.pg1.get_capture(len(pkts))
763         self.verify_capture_out(capture, nat_ip, True)
764
765     def test_static_with_port_in(self):
766         """ SNAT 1:1 NAT with port initialized from inside network """
767
768         self.tcp_port_out = 3606
769         self.udp_port_out = 3607
770         self.icmp_id_out = 3608
771
772         self.snat_add_address(self.snat_addr)
773         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
774                                      self.tcp_port_in, self.tcp_port_out,
775                                      proto=IP_PROTOS.tcp)
776         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
777                                      self.udp_port_in, self.udp_port_out,
778                                      proto=IP_PROTOS.udp)
779         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
780                                      self.icmp_id_in, self.icmp_id_out,
781                                      proto=IP_PROTOS.icmp)
782         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
783         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
784                                                  is_inside=0)
785
786         # in2out
787         pkts = self.create_stream_in(self.pg0, self.pg1)
788         self.pg0.add_stream(pkts)
789         self.pg_enable_capture(self.pg_interfaces)
790         self.pg_start()
791         capture = self.pg1.get_capture(len(pkts))
792         self.verify_capture_out(capture)
793
794         # out2in
795         pkts = self.create_stream_out(self.pg1)
796         self.pg1.add_stream(pkts)
797         self.pg_enable_capture(self.pg_interfaces)
798         self.pg_start()
799         capture = self.pg0.get_capture(len(pkts))
800         self.verify_capture_in(capture, self.pg0)
801
802     def test_static_with_port_out(self):
803         """ SNAT 1:1 NAT with port initialized from outside network """
804
805         self.tcp_port_out = 30606
806         self.udp_port_out = 30607
807         self.icmp_id_out = 30608
808
809         self.snat_add_address(self.snat_addr)
810         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
811                                      self.tcp_port_in, self.tcp_port_out,
812                                      proto=IP_PROTOS.tcp)
813         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
814                                      self.udp_port_in, self.udp_port_out,
815                                      proto=IP_PROTOS.udp)
816         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
817                                      self.icmp_id_in, self.icmp_id_out,
818                                      proto=IP_PROTOS.icmp)
819         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
820         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
821                                                  is_inside=0)
822
823         # out2in
824         pkts = self.create_stream_out(self.pg1)
825         self.pg1.add_stream(pkts)
826         self.pg_enable_capture(self.pg_interfaces)
827         self.pg_start()
828         capture = self.pg0.get_capture(len(pkts))
829         self.verify_capture_in(capture, self.pg0)
830
831         # in2out
832         pkts = self.create_stream_in(self.pg0, self.pg1)
833         self.pg0.add_stream(pkts)
834         self.pg_enable_capture(self.pg_interfaces)
835         self.pg_start()
836         capture = self.pg1.get_capture(len(pkts))
837         self.verify_capture_out(capture)
838
839     def test_static_vrf_aware(self):
840         """ SNAT 1:1 NAT VRF awareness """
841
842         nat_ip1 = "10.0.0.30"
843         nat_ip2 = "10.0.0.40"
844         self.tcp_port_out = 6303
845         self.udp_port_out = 6304
846         self.icmp_id_out = 6305
847
848         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
849                                      vrf_id=10)
850         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
851                                      vrf_id=10)
852         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
853                                                  is_inside=0)
854         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
855         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
856
857         # inside interface VRF match SNAT static mapping VRF
858         pkts = self.create_stream_in(self.pg4, self.pg3)
859         self.pg4.add_stream(pkts)
860         self.pg_enable_capture(self.pg_interfaces)
861         self.pg_start()
862         capture = self.pg3.get_capture(len(pkts))
863         self.verify_capture_out(capture, nat_ip1, True)
864
865         # inside interface VRF don't match SNAT static mapping VRF (packets
866         # are dropped)
867         pkts = self.create_stream_in(self.pg0, self.pg3)
868         self.pg0.add_stream(pkts)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         self.pg3.assert_nothing_captured()
872
873     def test_multiple_inside_interfaces(self):
874         """ SNAT multiple inside interfaces (non-overlapping address space) """
875
876         self.snat_add_address(self.snat_addr)
877         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
878         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
879         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
880                                                  is_inside=0)
881
882         # between two S-NAT inside interfaces (no translation)
883         pkts = self.create_stream_in(self.pg0, self.pg1)
884         self.pg0.add_stream(pkts)
885         self.pg_enable_capture(self.pg_interfaces)
886         self.pg_start()
887         capture = self.pg1.get_capture(len(pkts))
888         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
889
890         # from S-NAT inside to interface without S-NAT feature (no translation)
891         pkts = self.create_stream_in(self.pg0, self.pg2)
892         self.pg0.add_stream(pkts)
893         self.pg_enable_capture(self.pg_interfaces)
894         self.pg_start()
895         capture = self.pg2.get_capture(len(pkts))
896         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
897
898         # in2out 1st interface
899         pkts = self.create_stream_in(self.pg0, self.pg3)
900         self.pg0.add_stream(pkts)
901         self.pg_enable_capture(self.pg_interfaces)
902         self.pg_start()
903         capture = self.pg3.get_capture(len(pkts))
904         self.verify_capture_out(capture)
905
906         # out2in 1st interface
907         pkts = self.create_stream_out(self.pg3)
908         self.pg3.add_stream(pkts)
909         self.pg_enable_capture(self.pg_interfaces)
910         self.pg_start()
911         capture = self.pg0.get_capture(len(pkts))
912         self.verify_capture_in(capture, self.pg0)
913
914         # in2out 2nd interface
915         pkts = self.create_stream_in(self.pg1, self.pg3)
916         self.pg1.add_stream(pkts)
917         self.pg_enable_capture(self.pg_interfaces)
918         self.pg_start()
919         capture = self.pg3.get_capture(len(pkts))
920         self.verify_capture_out(capture)
921
922         # out2in 2nd interface
923         pkts = self.create_stream_out(self.pg3)
924         self.pg3.add_stream(pkts)
925         self.pg_enable_capture(self.pg_interfaces)
926         self.pg_start()
927         capture = self.pg1.get_capture(len(pkts))
928         self.verify_capture_in(capture, self.pg1)
929
930     def test_inside_overlapping_interfaces(self):
931         """ SNAT multiple inside interfaces with overlapping address space """
932
933         static_nat_ip = "10.0.0.10"
934         self.snat_add_address(self.snat_addr)
935         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
936                                                  is_inside=0)
937         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
938         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
939         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
940         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
941                                      vrf_id=20)
942
943         # between S-NAT inside interfaces with same VRF (no translation)
944         pkts = self.create_stream_in(self.pg4, self.pg5)
945         self.pg4.add_stream(pkts)
946         self.pg_enable_capture(self.pg_interfaces)
947         self.pg_start()
948         capture = self.pg5.get_capture(len(pkts))
949         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
950
951         # between S-NAT inside interfaces with different VRF (hairpinning)
952         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
953              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
954              TCP(sport=1234, dport=5678))
955         self.pg4.add_stream(p)
956         self.pg_enable_capture(self.pg_interfaces)
957         self.pg_start()
958         capture = self.pg6.get_capture(1)
959         p = capture[0]
960         try:
961             ip = p[IP]
962             tcp = p[TCP]
963             self.assertEqual(ip.src, self.snat_addr)
964             self.assertEqual(ip.dst, self.pg6.remote_ip4)
965             self.assertNotEqual(tcp.sport, 1234)
966             self.assertEqual(tcp.dport, 5678)
967         except:
968             self.logger.error(ppp("Unexpected or invalid packet:", p))
969             raise
970
971         # in2out 1st interface
972         pkts = self.create_stream_in(self.pg4, self.pg3)
973         self.pg4.add_stream(pkts)
974         self.pg_enable_capture(self.pg_interfaces)
975         self.pg_start()
976         capture = self.pg3.get_capture(len(pkts))
977         self.verify_capture_out(capture)
978
979         # out2in 1st interface
980         pkts = self.create_stream_out(self.pg3)
981         self.pg3.add_stream(pkts)
982         self.pg_enable_capture(self.pg_interfaces)
983         self.pg_start()
984         capture = self.pg4.get_capture(len(pkts))
985         self.verify_capture_in(capture, self.pg4)
986
987         # in2out 2nd interface
988         pkts = self.create_stream_in(self.pg5, self.pg3)
989         self.pg5.add_stream(pkts)
990         self.pg_enable_capture(self.pg_interfaces)
991         self.pg_start()
992         capture = self.pg3.get_capture(len(pkts))
993         self.verify_capture_out(capture)
994
995         # out2in 2nd interface
996         pkts = self.create_stream_out(self.pg3)
997         self.pg3.add_stream(pkts)
998         self.pg_enable_capture(self.pg_interfaces)
999         self.pg_start()
1000         capture = self.pg5.get_capture(len(pkts))
1001         self.verify_capture_in(capture, self.pg5)
1002
1003         # pg5 session dump
1004         addresses = self.vapi.snat_address_dump()
1005         self.assertEqual(len(addresses), 1)
1006         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1007         self.assertEqual(len(sessions), 3)
1008         for session in sessions:
1009             self.assertFalse(session.is_static)
1010             self.assertEqual(session.inside_ip_address[0:4],
1011                              self.pg5.remote_ip4n)
1012             self.assertEqual(session.outside_ip_address,
1013                              addresses[0].ip_address)
1014         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1015         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1016         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1017         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1018         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1019         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1020         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1021         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1022         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1023
1024         # in2out 3rd interface
1025         pkts = self.create_stream_in(self.pg6, self.pg3)
1026         self.pg6.add_stream(pkts)
1027         self.pg_enable_capture(self.pg_interfaces)
1028         self.pg_start()
1029         capture = self.pg3.get_capture(len(pkts))
1030         self.verify_capture_out(capture, static_nat_ip, True)
1031
1032         # out2in 3rd interface
1033         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1034         self.pg3.add_stream(pkts)
1035         self.pg_enable_capture(self.pg_interfaces)
1036         self.pg_start()
1037         capture = self.pg6.get_capture(len(pkts))
1038         self.verify_capture_in(capture, self.pg6)
1039
1040         # general user and session dump verifications
1041         users = self.vapi.snat_user_dump()
1042         self.assertTrue(len(users) >= 3)
1043         addresses = self.vapi.snat_address_dump()
1044         self.assertEqual(len(addresses), 1)
1045         for user in users:
1046             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1047                                                         user.vrf_id)
1048             for session in sessions:
1049                 self.assertEqual(user.ip_address, session.inside_ip_address)
1050                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1051                 self.assertTrue(session.protocol in
1052                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1053                                  IP_PROTOS.icmp])
1054
1055         # pg4 session dump
1056         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1057         self.assertTrue(len(sessions) >= 4)
1058         for session in sessions:
1059             self.assertFalse(session.is_static)
1060             self.assertEqual(session.inside_ip_address[0:4],
1061                              self.pg4.remote_ip4n)
1062             self.assertEqual(session.outside_ip_address,
1063                              addresses[0].ip_address)
1064
1065         # pg6 session dump
1066         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1067         self.assertTrue(len(sessions) >= 3)
1068         for session in sessions:
1069             self.assertTrue(session.is_static)
1070             self.assertEqual(session.inside_ip_address[0:4],
1071                              self.pg6.remote_ip4n)
1072             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1073                              map(int, static_nat_ip.split('.')))
1074             self.assertTrue(session.inside_port in
1075                             [self.tcp_port_in, self.udp_port_in,
1076                              self.icmp_id_in])
1077
1078     def test_hairpinning(self):
1079         """ SNAT hairpinning - 1:1 NAT with port"""
1080
1081         host = self.pg0.remote_hosts[0]
1082         server = self.pg0.remote_hosts[1]
1083         host_in_port = 1234
1084         host_out_port = 0
1085         server_in_port = 5678
1086         server_out_port = 8765
1087
1088         self.snat_add_address(self.snat_addr)
1089         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1090         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1091                                                  is_inside=0)
1092         # add static mapping for server
1093         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1094                                      server_in_port, server_out_port,
1095                                      proto=IP_PROTOS.tcp)
1096
1097         # send packet from host to server
1098         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1099              IP(src=host.ip4, dst=self.snat_addr) /
1100              TCP(sport=host_in_port, dport=server_out_port))
1101         self.pg0.add_stream(p)
1102         self.pg_enable_capture(self.pg_interfaces)
1103         self.pg_start()
1104         capture = self.pg0.get_capture(1)
1105         p = capture[0]
1106         try:
1107             ip = p[IP]
1108             tcp = p[TCP]
1109             self.assertEqual(ip.src, self.snat_addr)
1110             self.assertEqual(ip.dst, server.ip4)
1111             self.assertNotEqual(tcp.sport, host_in_port)
1112             self.assertEqual(tcp.dport, server_in_port)
1113             host_out_port = tcp.sport
1114         except:
1115             self.logger.error(ppp("Unexpected or invalid packet:", p))
1116             raise
1117
1118         # send reply from server to host
1119         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1120              IP(src=server.ip4, dst=self.snat_addr) /
1121              TCP(sport=server_in_port, dport=host_out_port))
1122         self.pg0.add_stream(p)
1123         self.pg_enable_capture(self.pg_interfaces)
1124         self.pg_start()
1125         capture = self.pg0.get_capture(1)
1126         p = capture[0]
1127         try:
1128             ip = p[IP]
1129             tcp = p[TCP]
1130             self.assertEqual(ip.src, self.snat_addr)
1131             self.assertEqual(ip.dst, host.ip4)
1132             self.assertEqual(tcp.sport, server_out_port)
1133             self.assertEqual(tcp.dport, host_in_port)
1134         except:
1135             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1136             raise
1137
1138     def test_hairpinning2(self):
1139         """ SNAT hairpinning - 1:1 NAT"""
1140
1141         server1_nat_ip = "10.0.0.10"
1142         server2_nat_ip = "10.0.0.11"
1143         host = self.pg0.remote_hosts[0]
1144         server1 = self.pg0.remote_hosts[1]
1145         server2 = self.pg0.remote_hosts[2]
1146         server_tcp_port = 22
1147         server_udp_port = 20
1148
1149         self.snat_add_address(self.snat_addr)
1150         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                  is_inside=0)
1153
1154         # add static mapping for servers
1155         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1156         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1157
1158         # host to server1
1159         pkts = []
1160         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1161              IP(src=host.ip4, dst=server1_nat_ip) /
1162              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1163         pkts.append(p)
1164         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1165              IP(src=host.ip4, dst=server1_nat_ip) /
1166              UDP(sport=self.udp_port_in, dport=server_udp_port))
1167         pkts.append(p)
1168         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1169              IP(src=host.ip4, dst=server1_nat_ip) /
1170              ICMP(id=self.icmp_id_in, type='echo-request'))
1171         pkts.append(p)
1172         self.pg0.add_stream(pkts)
1173         self.pg_enable_capture(self.pg_interfaces)
1174         self.pg_start()
1175         capture = self.pg0.get_capture(len(pkts))
1176         for packet in capture:
1177             try:
1178                 self.assertEqual(packet[IP].src, self.snat_addr)
1179                 self.assertEqual(packet[IP].dst, server1.ip4)
1180                 if packet.haslayer(TCP):
1181                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1182                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1183                     self.tcp_port_out = packet[TCP].sport
1184                 elif packet.haslayer(UDP):
1185                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1186                     self.assertEqual(packet[UDP].dport, server_udp_port)
1187                     self.udp_port_out = packet[UDP].sport
1188                 else:
1189                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1190                     self.icmp_id_out = packet[ICMP].id
1191             except:
1192                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1193                 raise
1194
1195         # server1 to host
1196         pkts = []
1197         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1198              IP(src=server1.ip4, dst=self.snat_addr) /
1199              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1200         pkts.append(p)
1201         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1202              IP(src=server1.ip4, dst=self.snat_addr) /
1203              UDP(sport=server_udp_port, dport=self.udp_port_out))
1204         pkts.append(p)
1205         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1206              IP(src=server1.ip4, dst=self.snat_addr) /
1207              ICMP(id=self.icmp_id_out, type='echo-reply'))
1208         pkts.append(p)
1209         self.pg0.add_stream(pkts)
1210         self.pg_enable_capture(self.pg_interfaces)
1211         self.pg_start()
1212         capture = self.pg0.get_capture(len(pkts))
1213         for packet in capture:
1214             try:
1215                 self.assertEqual(packet[IP].src, server1_nat_ip)
1216                 self.assertEqual(packet[IP].dst, host.ip4)
1217                 if packet.haslayer(TCP):
1218                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1219                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1220                 elif packet.haslayer(UDP):
1221                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1222                     self.assertEqual(packet[UDP].sport, server_udp_port)
1223                 else:
1224                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1225             except:
1226                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1227                 raise
1228
1229         # server2 to server1
1230         pkts = []
1231         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1232              IP(src=server2.ip4, dst=server1_nat_ip) /
1233              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1234         pkts.append(p)
1235         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1236              IP(src=server2.ip4, dst=server1_nat_ip) /
1237              UDP(sport=self.udp_port_in, dport=server_udp_port))
1238         pkts.append(p)
1239         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1240              IP(src=server2.ip4, dst=server1_nat_ip) /
1241              ICMP(id=self.icmp_id_in, type='echo-request'))
1242         pkts.append(p)
1243         self.pg0.add_stream(pkts)
1244         self.pg_enable_capture(self.pg_interfaces)
1245         self.pg_start()
1246         capture = self.pg0.get_capture(len(pkts))
1247         for packet in capture:
1248             try:
1249                 self.assertEqual(packet[IP].src, server2_nat_ip)
1250                 self.assertEqual(packet[IP].dst, server1.ip4)
1251                 if packet.haslayer(TCP):
1252                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1253                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1254                     self.tcp_port_out = packet[TCP].sport
1255                 elif packet.haslayer(UDP):
1256                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1257                     self.assertEqual(packet[UDP].dport, server_udp_port)
1258                     self.udp_port_out = packet[UDP].sport
1259                 else:
1260                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1261                     self.icmp_id_out = packet[ICMP].id
1262             except:
1263                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1264                 raise
1265
1266         # server1 to server2
1267         pkts = []
1268         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1269              IP(src=server1.ip4, dst=server2_nat_ip) /
1270              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1271         pkts.append(p)
1272         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1273              IP(src=server1.ip4, dst=server2_nat_ip) /
1274              UDP(sport=server_udp_port, dport=self.udp_port_out))
1275         pkts.append(p)
1276         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1277              IP(src=server1.ip4, dst=server2_nat_ip) /
1278              ICMP(id=self.icmp_id_out, type='echo-reply'))
1279         pkts.append(p)
1280         self.pg0.add_stream(pkts)
1281         self.pg_enable_capture(self.pg_interfaces)
1282         self.pg_start()
1283         capture = self.pg0.get_capture(len(pkts))
1284         for packet in capture:
1285             try:
1286                 self.assertEqual(packet[IP].src, server1_nat_ip)
1287                 self.assertEqual(packet[IP].dst, server2.ip4)
1288                 if packet.haslayer(TCP):
1289                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1290                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1291                 elif packet.haslayer(UDP):
1292                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1293                     self.assertEqual(packet[UDP].sport, server_udp_port)
1294                 else:
1295                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1296             except:
1297                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1298                 raise
1299
1300     def test_max_translations_per_user(self):
1301         """ MAX translations per user - recycle the least recently used """
1302
1303         self.snat_add_address(self.snat_addr)
1304         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1305         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1306                                                  is_inside=0)
1307
1308         # get maximum number of translations per user
1309         snat_config = self.vapi.snat_show_config()
1310
1311         # send more than maximum number of translations per user packets
1312         pkts_num = snat_config.max_translations_per_user + 5
1313         pkts = []
1314         for port in range(0, pkts_num):
1315             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1316                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1317                  TCP(sport=1025 + port))
1318             pkts.append(p)
1319         self.pg0.add_stream(pkts)
1320         self.pg_enable_capture(self.pg_interfaces)
1321         self.pg_start()
1322
1323         # verify number of translated packet
1324         self.pg1.get_capture(pkts_num)
1325
1326     def test_interface_addr(self):
1327         """ Acquire SNAT addresses from interface """
1328         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1329
1330         # no address in NAT pool
1331         adresses = self.vapi.snat_address_dump()
1332         self.assertEqual(0, len(adresses))
1333
1334         # configure interface address and check NAT address pool
1335         self.pg7.config_ip4()
1336         adresses = self.vapi.snat_address_dump()
1337         self.assertEqual(1, len(adresses))
1338         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1339
1340         # remove interface address and check NAT address pool
1341         self.pg7.unconfig_ip4()
1342         adresses = self.vapi.snat_address_dump()
1343         self.assertEqual(0, len(adresses))
1344
1345     def test_interface_addr_static_mapping(self):
1346         """ Static mapping with addresses from interface """
1347         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1348         self.snat_add_static_mapping('1.2.3.4',
1349                                      external_sw_if_index=self.pg7.sw_if_index)
1350
1351         # static mappings with external interface
1352         static_mappings = self.vapi.snat_static_mapping_dump()
1353         self.assertEqual(1, len(static_mappings))
1354         self.assertEqual(self.pg7.sw_if_index,
1355                          static_mappings[0].external_sw_if_index)
1356
1357         # configure interface address and check static mappings
1358         self.pg7.config_ip4()
1359         static_mappings = self.vapi.snat_static_mapping_dump()
1360         self.assertEqual(1, len(static_mappings))
1361         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1362                          self.pg7.local_ip4n)
1363         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1364
1365         # remove interface address and check static mappings
1366         self.pg7.unconfig_ip4()
1367         static_mappings = self.vapi.snat_static_mapping_dump()
1368         self.assertEqual(0, len(static_mappings))
1369
1370     def test_ipfix_nat44_sess(self):
1371         """ S-NAT IPFIX logging NAT44 session created/delted """
1372         self.ipfix_domain_id = 10
1373         self.ipfix_src_port = 20202
1374         colector_port = 30303
1375         bind_layers(UDP, IPFIX, dport=30303)
1376         self.snat_add_address(self.snat_addr)
1377         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1378         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1379                                                  is_inside=0)
1380         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1381                                      src_address=self.pg3.local_ip4n,
1382                                      path_mtu=512,
1383                                      template_interval=10,
1384                                      collector_port=colector_port)
1385         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1386                              src_port=self.ipfix_src_port)
1387
1388         pkts = self.create_stream_in(self.pg0, self.pg1)
1389         self.pg0.add_stream(pkts)
1390         self.pg_enable_capture(self.pg_interfaces)
1391         self.pg_start()
1392         capture = self.pg1.get_capture(len(pkts))
1393         self.verify_capture_out(capture)
1394         self.snat_add_address(self.snat_addr, is_add=0)
1395         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1396         capture = self.pg3.get_capture(3)
1397         ipfix = IPFIXDecoder()
1398         # first load template
1399         for p in capture:
1400             self.assertTrue(p.haslayer(IPFIX))
1401             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1402             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1403             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1404             self.assertEqual(p[UDP].dport, colector_port)
1405             self.assertEqual(p[IPFIX].observationDomainID,
1406                              self.ipfix_domain_id)
1407             if p.haslayer(Template):
1408                 ipfix.add_template(p.getlayer(Template))
1409         # verify events in data set
1410         for p in capture:
1411             if p.haslayer(Data):
1412                 data = ipfix.decode_data_set(p.getlayer(Set))
1413                 self.verify_ipfix_nat44_ses(data)
1414
1415     def test_ipfix_addr_exhausted(self):
1416         """ S-NAT IPFIX logging NAT addresses exhausted """
1417         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1418         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1419                                                  is_inside=0)
1420         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1421                                      src_address=self.pg3.local_ip4n,
1422                                      path_mtu=512,
1423                                      template_interval=10)
1424         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1425                              src_port=self.ipfix_src_port)
1426
1427         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1428              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1429              TCP(sport=3025))
1430         self.pg0.add_stream(p)
1431         self.pg_enable_capture(self.pg_interfaces)
1432         self.pg_start()
1433         capture = self.pg1.get_capture(0)
1434         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1435         capture = self.pg3.get_capture(3)
1436         ipfix = IPFIXDecoder()
1437         # first load template
1438         for p in capture:
1439             self.assertTrue(p.haslayer(IPFIX))
1440             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1441             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1442             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1443             self.assertEqual(p[UDP].dport, 4739)
1444             self.assertEqual(p[IPFIX].observationDomainID,
1445                              self.ipfix_domain_id)
1446             if p.haslayer(Template):
1447                 ipfix.add_template(p.getlayer(Template))
1448         # verify events in data set
1449         for p in capture:
1450             if p.haslayer(Data):
1451                 data = ipfix.decode_data_set(p.getlayer(Set))
1452                 self.verify_ipfix_addr_exhausted(data)
1453
1454     def test_pool_addr_fib(self):
1455         """ S-NAT add pool addresses to FIB """
1456         static_addr = '10.0.0.10'
1457         self.snat_add_address(self.snat_addr)
1458         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1459         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1460                                                  is_inside=0)
1461         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1462
1463         # SNAT address
1464         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1465              ARP(op=ARP.who_has, pdst=self.snat_addr,
1466                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1467         self.pg1.add_stream(p)
1468         self.pg_enable_capture(self.pg_interfaces)
1469         self.pg_start()
1470         capture = self.pg1.get_capture(1)
1471         self.assertTrue(capture[0].haslayer(ARP))
1472         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1473
1474         # 1:1 NAT address
1475         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1476              ARP(op=ARP.who_has, pdst=static_addr,
1477                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1478         self.pg1.add_stream(p)
1479         self.pg_enable_capture(self.pg_interfaces)
1480         self.pg_start()
1481         capture = self.pg1.get_capture(1)
1482         self.assertTrue(capture[0].haslayer(ARP))
1483         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1484
1485         # send ARP to non-SNAT interface
1486         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1487              ARP(op=ARP.who_has, pdst=self.snat_addr,
1488                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1489         self.pg2.add_stream(p)
1490         self.pg_enable_capture(self.pg_interfaces)
1491         self.pg_start()
1492         capture = self.pg1.get_capture(0)
1493
1494         # remove addresses and verify
1495         self.snat_add_address(self.snat_addr, is_add=0)
1496         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1497                                      is_add=0)
1498
1499         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1500              ARP(op=ARP.who_has, pdst=self.snat_addr,
1501                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1502         self.pg1.add_stream(p)
1503         self.pg_enable_capture(self.pg_interfaces)
1504         self.pg_start()
1505         capture = self.pg1.get_capture(0)
1506
1507         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1508              ARP(op=ARP.who_has, pdst=static_addr,
1509                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1510         self.pg1.add_stream(p)
1511         self.pg_enable_capture(self.pg_interfaces)
1512         self.pg_start()
1513         capture = self.pg1.get_capture(0)
1514
1515     def test_vrf_mode(self):
1516         """ S-NAT tenant VRF aware address pool mode """
1517
1518         vrf_id1 = 1
1519         vrf_id2 = 2
1520         nat_ip1 = "10.0.0.10"
1521         nat_ip2 = "10.0.0.11"
1522
1523         self.pg0.unconfig_ip4()
1524         self.pg1.unconfig_ip4()
1525         self.pg0.set_table_ip4(vrf_id1)
1526         self.pg1.set_table_ip4(vrf_id2)
1527         self.pg0.config_ip4()
1528         self.pg1.config_ip4()
1529
1530         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1531         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1532         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1533         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1534         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1535                                                  is_inside=0)
1536
1537         # first VRF
1538         pkts = self.create_stream_in(self.pg0, self.pg2)
1539         self.pg0.add_stream(pkts)
1540         self.pg_enable_capture(self.pg_interfaces)
1541         self.pg_start()
1542         capture = self.pg2.get_capture(len(pkts))
1543         self.verify_capture_out(capture, nat_ip1)
1544
1545         # second VRF
1546         pkts = self.create_stream_in(self.pg1, self.pg2)
1547         self.pg1.add_stream(pkts)
1548         self.pg_enable_capture(self.pg_interfaces)
1549         self.pg_start()
1550         capture = self.pg2.get_capture(len(pkts))
1551         self.verify_capture_out(capture, nat_ip2)
1552
1553     def test_vrf_feature_independent(self):
1554         """ S-NAT tenant VRF independent address pool mode """
1555
1556         nat_ip1 = "10.0.0.10"
1557         nat_ip2 = "10.0.0.11"
1558
1559         self.snat_add_address(nat_ip1)
1560         self.snat_add_address(nat_ip2)
1561         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1562         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1563         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1564                                                  is_inside=0)
1565
1566         # first VRF
1567         pkts = self.create_stream_in(self.pg0, self.pg2)
1568         self.pg0.add_stream(pkts)
1569         self.pg_enable_capture(self.pg_interfaces)
1570         self.pg_start()
1571         capture = self.pg2.get_capture(len(pkts))
1572         self.verify_capture_out(capture, nat_ip1)
1573
1574         # second VRF
1575         pkts = self.create_stream_in(self.pg1, self.pg2)
1576         self.pg1.add_stream(pkts)
1577         self.pg_enable_capture(self.pg_interfaces)
1578         self.pg_start()
1579         capture = self.pg2.get_capture(len(pkts))
1580         self.verify_capture_out(capture, nat_ip1)
1581
1582     def test_dynamic_ipless_interfaces(self):
1583         """ SNAT interfaces without configured ip dynamic map """
1584
1585         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1586                                       self.pg7.remote_mac,
1587                                       self.pg7.remote_ip4n,
1588                                       is_static=1)
1589         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1590                                       self.pg8.remote_mac,
1591                                       self.pg8.remote_ip4n,
1592                                       is_static=1)
1593
1594         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1595                                    dst_address_length=32,
1596                                    next_hop_address=self.pg7.remote_ip4n,
1597                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1598         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1599                                    dst_address_length=32,
1600                                    next_hop_address=self.pg8.remote_ip4n,
1601                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1602
1603         self.snat_add_address(self.snat_addr)
1604         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1605         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1606                                                  is_inside=0)
1607
1608         # in2out
1609         pkts = self.create_stream_in(self.pg7, self.pg8)
1610         self.pg7.add_stream(pkts)
1611         self.pg_enable_capture(self.pg_interfaces)
1612         self.pg_start()
1613         capture = self.pg8.get_capture(len(pkts))
1614         self.verify_capture_out(capture)
1615
1616         # out2in
1617         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1618         self.pg8.add_stream(pkts)
1619         self.pg_enable_capture(self.pg_interfaces)
1620         self.pg_start()
1621         capture = self.pg7.get_capture(len(pkts))
1622         self.verify_capture_in(capture, self.pg7)
1623
1624     def test_static_ipless_interfaces(self):
1625         """ SNAT 1:1 NAT interfaces without configured ip """
1626
1627         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1628                                       self.pg7.remote_mac,
1629                                       self.pg7.remote_ip4n,
1630                                       is_static=1)
1631         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1632                                       self.pg8.remote_mac,
1633                                       self.pg8.remote_ip4n,
1634                                       is_static=1)
1635
1636         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1637                                    dst_address_length=32,
1638                                    next_hop_address=self.pg7.remote_ip4n,
1639                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1640         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1641                                    dst_address_length=32,
1642                                    next_hop_address=self.pg8.remote_ip4n,
1643                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1644
1645         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1646         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1647         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1648                                                  is_inside=0)
1649
1650         # out2in
1651         pkts = self.create_stream_out(self.pg8)
1652         self.pg8.add_stream(pkts)
1653         self.pg_enable_capture(self.pg_interfaces)
1654         self.pg_start()
1655         capture = self.pg7.get_capture(len(pkts))
1656         self.verify_capture_in(capture, self.pg7)
1657
1658         # in2out
1659         pkts = self.create_stream_in(self.pg7, self.pg8)
1660         self.pg7.add_stream(pkts)
1661         self.pg_enable_capture(self.pg_interfaces)
1662         self.pg_start()
1663         capture = self.pg8.get_capture(len(pkts))
1664         self.verify_capture_out(capture, self.snat_addr, True)
1665
1666     def test_static_with_port_ipless_interfaces(self):
1667         """ SNAT 1:1 NAT with port interfaces without configured ip """
1668
1669         self.tcp_port_out = 30606
1670         self.udp_port_out = 30607
1671         self.icmp_id_out = 30608
1672
1673         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1674                                       self.pg7.remote_mac,
1675                                       self.pg7.remote_ip4n,
1676                                       is_static=1)
1677         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1678                                       self.pg8.remote_mac,
1679                                       self.pg8.remote_ip4n,
1680                                       is_static=1)
1681
1682         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1683                                    dst_address_length=32,
1684                                    next_hop_address=self.pg7.remote_ip4n,
1685                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1686         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1687                                    dst_address_length=32,
1688                                    next_hop_address=self.pg8.remote_ip4n,
1689                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1690
1691         self.snat_add_address(self.snat_addr)
1692         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1693                                      self.tcp_port_in, self.tcp_port_out,
1694                                      proto=IP_PROTOS.tcp)
1695         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1696                                      self.udp_port_in, self.udp_port_out,
1697                                      proto=IP_PROTOS.udp)
1698         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1699                                      self.icmp_id_in, self.icmp_id_out,
1700                                      proto=IP_PROTOS.icmp)
1701         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1702         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1703                                                  is_inside=0)
1704
1705         # out2in
1706         pkts = self.create_stream_out(self.pg8)
1707         self.pg8.add_stream(pkts)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg7.get_capture(len(pkts))
1711         self.verify_capture_in(capture, self.pg7)
1712
1713         # in2out
1714         pkts = self.create_stream_in(self.pg7, self.pg8)
1715         self.pg7.add_stream(pkts)
1716         self.pg_enable_capture(self.pg_interfaces)
1717         self.pg_start()
1718         capture = self.pg8.get_capture(len(pkts))
1719         self.verify_capture_out(capture)
1720
1721     def tearDown(self):
1722         super(TestSNAT, self).tearDown()
1723         if not self.vpp_dead:
1724             self.logger.info(self.vapi.cli("show snat verbose"))
1725             self.clear_snat()
1726
1727
1728 class TestDeterministicNAT(MethodHolder):
1729     """ Deterministic NAT Test Cases """
1730
1731     @classmethod
1732     def setUpConstants(cls):
1733         super(TestDeterministicNAT, cls).setUpConstants()
1734         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1735
1736     @classmethod
1737     def setUpClass(cls):
1738         super(TestDeterministicNAT, cls).setUpClass()
1739
1740         try:
1741             cls.tcp_port_in = 6303
1742             cls.tcp_external_port = 6303
1743             cls.udp_port_in = 6304
1744             cls.udp_external_port = 6304
1745             cls.icmp_id_in = 6305
1746             cls.snat_addr = '10.0.0.3'
1747
1748             cls.create_pg_interfaces(range(3))
1749             cls.interfaces = list(cls.pg_interfaces)
1750
1751             for i in cls.interfaces:
1752                 i.admin_up()
1753                 i.config_ip4()
1754                 i.resolve_arp()
1755
1756             cls.pg0.generate_remote_hosts(2)
1757             cls.pg0.configure_ipv4_neighbors()
1758
1759         except Exception:
1760             super(TestDeterministicNAT, cls).tearDownClass()
1761             raise
1762
1763     def create_stream_in(self, in_if, out_if, ttl=64):
1764         """
1765         Create packet stream for inside network
1766
1767         :param in_if: Inside interface
1768         :param out_if: Outside interface
1769         :param ttl: TTL of generated packets
1770         """
1771         pkts = []
1772         # TCP
1773         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1774              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1775              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1776         pkts.append(p)
1777
1778         # UDP
1779         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1780              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1781              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1782         pkts.append(p)
1783
1784         # ICMP
1785         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1786              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1787              ICMP(id=self.icmp_id_in, type='echo-request'))
1788         pkts.append(p)
1789
1790         return pkts
1791
1792     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1793         """
1794         Create packet stream for outside network
1795
1796         :param out_if: Outside interface
1797         :param dst_ip: Destination IP address (Default use global SNAT address)
1798         :param ttl: TTL of generated packets
1799         """
1800         if dst_ip is None:
1801             dst_ip = self.snat_addr
1802         pkts = []
1803         # TCP
1804         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1805              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1806              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1807         pkts.append(p)
1808
1809         # UDP
1810         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1811              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1812              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1813         pkts.append(p)
1814
1815         # ICMP
1816         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1817              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1818              ICMP(id=self.icmp_external_id, type='echo-reply'))
1819         pkts.append(p)
1820
1821         return pkts
1822
1823     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1824         """
1825         Verify captured packets on outside network
1826
1827         :param capture: Captured packets
1828         :param nat_ip: Translated IP address (Default use global SNAT address)
1829         :param same_port: Sorce port number is not translated (Default False)
1830         :param packet_num: Expected number of packets (Default 3)
1831         """
1832         if nat_ip is None:
1833             nat_ip = self.snat_addr
1834         self.assertEqual(packet_num, len(capture))
1835         for packet in capture:
1836             try:
1837                 self.assertEqual(packet[IP].src, nat_ip)
1838                 if packet.haslayer(TCP):
1839                     self.tcp_port_out = packet[TCP].sport
1840                 elif packet.haslayer(UDP):
1841                     self.udp_port_out = packet[UDP].sport
1842                 else:
1843                     self.icmp_external_id = packet[ICMP].id
1844             except:
1845                 self.logger.error(ppp("Unexpected or invalid packet "
1846                                       "(outside network):", packet))
1847                 raise
1848
1849     def initiate_tcp_session(self, in_if, out_if):
1850         """
1851         Initiates TCP session
1852
1853         :param in_if: Inside interface
1854         :param out_if: Outside interface
1855         """
1856         try:
1857             # SYN packet in->out
1858             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1859                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1860                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1861                      flags="S"))
1862             in_if.add_stream(p)
1863             self.pg_enable_capture(self.pg_interfaces)
1864             self.pg_start()
1865             capture = out_if.get_capture(1)
1866             p = capture[0]
1867             self.tcp_port_out = p[TCP].sport
1868
1869             # SYN + ACK packet out->in
1870             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1871                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1872                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1873                      flags="SA"))
1874             out_if.add_stream(p)
1875             self.pg_enable_capture(self.pg_interfaces)
1876             self.pg_start()
1877             in_if.get_capture(1)
1878
1879             # ACK packet in->out
1880             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1881                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1882                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1883                      flags="A"))
1884             in_if.add_stream(p)
1885             self.pg_enable_capture(self.pg_interfaces)
1886             self.pg_start()
1887             out_if.get_capture(1)
1888
1889         except:
1890             self.logger.error("TCP 3 way handshake failed")
1891             raise
1892
1893     def verify_ipfix_max_entries_per_user(self, data):
1894         """
1895         Verify IPFIX maximum entries per user exceeded event
1896
1897         :param data: Decoded IPFIX data records
1898         """
1899         self.assertEqual(1, len(data))
1900         record = data[0]
1901         # natEvent
1902         self.assertEqual(ord(record[230]), 13)
1903         # natQuotaExceededEvent
1904         self.assertEqual('\x03\x00\x00\x00', record[466])
1905         # sourceIPv4Address
1906         self.assertEqual(self.pg0.remote_ip4n, record[8])
1907
1908     def test_deterministic_mode(self):
1909         """ S-NAT run deterministic mode """
1910         in_addr = '172.16.255.0'
1911         out_addr = '172.17.255.50'
1912         in_addr_t = '172.16.255.20'
1913         in_addr_n = socket.inet_aton(in_addr)
1914         out_addr_n = socket.inet_aton(out_addr)
1915         in_addr_t_n = socket.inet_aton(in_addr_t)
1916         in_plen = 24
1917         out_plen = 32
1918
1919         snat_config = self.vapi.snat_show_config()
1920         self.assertEqual(1, snat_config.deterministic)
1921
1922         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1923
1924         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1925         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1926         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1927         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1928
1929         deterministic_mappings = self.vapi.snat_det_map_dump()
1930         self.assertEqual(len(deterministic_mappings), 1)
1931         dsm = deterministic_mappings[0]
1932         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1933         self.assertEqual(in_plen, dsm.in_plen)
1934         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1935         self.assertEqual(out_plen, dsm.out_plen)
1936
1937         self.clear_snat()
1938         deterministic_mappings = self.vapi.snat_det_map_dump()
1939         self.assertEqual(len(deterministic_mappings), 0)
1940
1941     def test_set_timeouts(self):
1942         """ Set deterministic NAT timeouts """
1943         timeouts_before = self.vapi.snat_det_get_timeouts()
1944
1945         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1946                                         timeouts_before.tcp_established + 10,
1947                                         timeouts_before.tcp_transitory + 10,
1948                                         timeouts_before.icmp + 10)
1949
1950         timeouts_after = self.vapi.snat_det_get_timeouts()
1951
1952         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1953         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1954         self.assertNotEqual(timeouts_before.tcp_established,
1955                             timeouts_after.tcp_established)
1956         self.assertNotEqual(timeouts_before.tcp_transitory,
1957                             timeouts_after.tcp_transitory)
1958
1959     def test_det_in(self):
1960         """ CGNAT translation test (TCP, UDP, ICMP) """
1961
1962         nat_ip = "10.0.0.10"
1963
1964         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1965                                    32,
1966                                    socket.inet_aton(nat_ip),
1967                                    32)
1968         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1969         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1970                                                  is_inside=0)
1971
1972         # in2out
1973         pkts = self.create_stream_in(self.pg0, self.pg1)
1974         self.pg0.add_stream(pkts)
1975         self.pg_enable_capture(self.pg_interfaces)
1976         self.pg_start()
1977         capture = self.pg1.get_capture(len(pkts))
1978         self.verify_capture_out(capture, nat_ip)
1979
1980         # out2in
1981         pkts = self.create_stream_out(self.pg1, nat_ip)
1982         self.pg1.add_stream(pkts)
1983         self.pg_enable_capture(self.pg_interfaces)
1984         self.pg_start()
1985         capture = self.pg0.get_capture(len(pkts))
1986         self.verify_capture_in(capture, self.pg0)
1987
1988         # session dump test
1989         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1990         self.assertEqual(len(sessions), 3)
1991
1992         # TCP session
1993         s = sessions[0]
1994         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1995         self.assertEqual(s.in_port, self.tcp_port_in)
1996         self.assertEqual(s.out_port, self.tcp_port_out)
1997         self.assertEqual(s.ext_port, self.tcp_external_port)
1998
1999         # UDP session
2000         s = sessions[1]
2001         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2002         self.assertEqual(s.in_port, self.udp_port_in)
2003         self.assertEqual(s.out_port, self.udp_port_out)
2004         self.assertEqual(s.ext_port, self.udp_external_port)
2005
2006         # ICMP session
2007         s = sessions[2]
2008         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2009         self.assertEqual(s.in_port, self.icmp_id_in)
2010         self.assertEqual(s.out_port, self.icmp_external_id)
2011
2012     def test_multiple_users(self):
2013         """ CGNAT multiple users """
2014
2015         nat_ip = "10.0.0.10"
2016         port_in = 80
2017         external_port = 6303
2018
2019         host0 = self.pg0.remote_hosts[0]
2020         host1 = self.pg0.remote_hosts[1]
2021
2022         self.vapi.snat_add_det_map(host0.ip4n,
2023                                    24,
2024                                    socket.inet_aton(nat_ip),
2025                                    32)
2026         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2027         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2028                                                  is_inside=0)
2029
2030         # host0 to out
2031         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2032              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2033              TCP(sport=port_in, dport=external_port))
2034         self.pg0.add_stream(p)
2035         self.pg_enable_capture(self.pg_interfaces)
2036         self.pg_start()
2037         capture = self.pg1.get_capture(1)
2038         p = capture[0]
2039         try:
2040             ip = p[IP]
2041             tcp = p[TCP]
2042             self.assertEqual(ip.src, nat_ip)
2043             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2044             self.assertEqual(tcp.dport, external_port)
2045             port_out0 = tcp.sport
2046         except:
2047             self.logger.error(ppp("Unexpected or invalid packet:", p))
2048             raise
2049
2050         # host1 to out
2051         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2052              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2053              TCP(sport=port_in, dport=external_port))
2054         self.pg0.add_stream(p)
2055         self.pg_enable_capture(self.pg_interfaces)
2056         self.pg_start()
2057         capture = self.pg1.get_capture(1)
2058         p = capture[0]
2059         try:
2060             ip = p[IP]
2061             tcp = p[TCP]
2062             self.assertEqual(ip.src, nat_ip)
2063             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2064             self.assertEqual(tcp.dport, external_port)
2065             port_out1 = tcp.sport
2066         except:
2067             self.logger.error(ppp("Unexpected or invalid packet:", p))
2068             raise
2069
2070         dms = self.vapi.snat_det_map_dump()
2071         self.assertEqual(1, len(dms))
2072         self.assertEqual(2, dms[0].ses_num)
2073
2074         # out to host0
2075         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2076              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2077              TCP(sport=external_port, dport=port_out0))
2078         self.pg1.add_stream(p)
2079         self.pg_enable_capture(self.pg_interfaces)
2080         self.pg_start()
2081         capture = self.pg0.get_capture(1)
2082         p = capture[0]
2083         try:
2084             ip = p[IP]
2085             tcp = p[TCP]
2086             self.assertEqual(ip.src, self.pg1.remote_ip4)
2087             self.assertEqual(ip.dst, host0.ip4)
2088             self.assertEqual(tcp.dport, port_in)
2089             self.assertEqual(tcp.sport, external_port)
2090         except:
2091             self.logger.error(ppp("Unexpected or invalid packet:", p))
2092             raise
2093
2094         # out to host1
2095         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2096              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2097              TCP(sport=external_port, dport=port_out1))
2098         self.pg1.add_stream(p)
2099         self.pg_enable_capture(self.pg_interfaces)
2100         self.pg_start()
2101         capture = self.pg0.get_capture(1)
2102         p = capture[0]
2103         try:
2104             ip = p[IP]
2105             tcp = p[TCP]
2106             self.assertEqual(ip.src, self.pg1.remote_ip4)
2107             self.assertEqual(ip.dst, host1.ip4)
2108             self.assertEqual(tcp.dport, port_in)
2109             self.assertEqual(tcp.sport, external_port)
2110         except:
2111             self.logger.error(ppp("Unexpected or invalid packet", p))
2112             raise
2113
2114         # session close api test
2115         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2116                                              port_out1,
2117                                              self.pg1.remote_ip4n,
2118                                              external_port)
2119         dms = self.vapi.snat_det_map_dump()
2120         self.assertEqual(dms[0].ses_num, 1)
2121
2122         self.vapi.snat_det_close_session_in(host0.ip4n,
2123                                             port_in,
2124                                             self.pg1.remote_ip4n,
2125                                             external_port)
2126         dms = self.vapi.snat_det_map_dump()
2127         self.assertEqual(dms[0].ses_num, 0)
2128
2129     def test_tcp_session_close_detection_in(self):
2130         """ CGNAT TCP session close initiated from inside network """
2131         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2132                                    32,
2133                                    socket.inet_aton(self.snat_addr),
2134                                    32)
2135         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2136         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2137                                                  is_inside=0)
2138
2139         self.initiate_tcp_session(self.pg0, self.pg1)
2140
2141         # close the session from inside
2142         try:
2143             # FIN packet in -> out
2144             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2145                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2146                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2147                      flags="F"))
2148             self.pg0.add_stream(p)
2149             self.pg_enable_capture(self.pg_interfaces)
2150             self.pg_start()
2151             self.pg1.get_capture(1)
2152
2153             pkts = []
2154
2155             # ACK packet out -> in
2156             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2157                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2158                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2159                      flags="A"))
2160             pkts.append(p)
2161
2162             # FIN packet out -> in
2163             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2164                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2165                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2166                      flags="F"))
2167             pkts.append(p)
2168
2169             self.pg1.add_stream(pkts)
2170             self.pg_enable_capture(self.pg_interfaces)
2171             self.pg_start()
2172             self.pg0.get_capture(2)
2173
2174             # ACK packet in -> out
2175             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2176                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2177                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2178                      flags="A"))
2179             self.pg0.add_stream(p)
2180             self.pg_enable_capture(self.pg_interfaces)
2181             self.pg_start()
2182             self.pg1.get_capture(1)
2183
2184             # Check if snat closed the session
2185             dms = self.vapi.snat_det_map_dump()
2186             self.assertEqual(0, dms[0].ses_num)
2187         except:
2188             self.logger.error("TCP session termination failed")
2189             raise
2190
2191     def test_tcp_session_close_detection_out(self):
2192         """ CGNAT TCP session close initiated from outside network """
2193         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2194                                    32,
2195                                    socket.inet_aton(self.snat_addr),
2196                                    32)
2197         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2198         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2199                                                  is_inside=0)
2200
2201         self.initiate_tcp_session(self.pg0, self.pg1)
2202
2203         # close the session from outside
2204         try:
2205             # FIN packet out -> in
2206             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2207                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2208                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2209                      flags="F"))
2210             self.pg1.add_stream(p)
2211             self.pg_enable_capture(self.pg_interfaces)
2212             self.pg_start()
2213             self.pg0.get_capture(1)
2214
2215             pkts = []
2216
2217             # ACK packet in -> out
2218             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2219                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2220                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2221                      flags="A"))
2222             pkts.append(p)
2223
2224             # ACK packet in -> out
2225             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2226                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2227                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2228                      flags="F"))
2229             pkts.append(p)
2230
2231             self.pg0.add_stream(pkts)
2232             self.pg_enable_capture(self.pg_interfaces)
2233             self.pg_start()
2234             self.pg1.get_capture(2)
2235
2236             # ACK packet out -> in
2237             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2238                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2239                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2240                      flags="A"))
2241             self.pg1.add_stream(p)
2242             self.pg_enable_capture(self.pg_interfaces)
2243             self.pg_start()
2244             self.pg0.get_capture(1)
2245
2246             # Check if snat closed the session
2247             dms = self.vapi.snat_det_map_dump()
2248             self.assertEqual(0, dms[0].ses_num)
2249         except:
2250             self.logger.error("TCP session termination failed")
2251             raise
2252
2253     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2254     def test_session_timeout(self):
2255         """ CGNAT session timeouts """
2256         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2257                                    32,
2258                                    socket.inet_aton(self.snat_addr),
2259                                    32)
2260         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2261         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2262                                                  is_inside=0)
2263
2264         self.initiate_tcp_session(self.pg0, self.pg1)
2265         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2266         pkts = self.create_stream_in(self.pg0, self.pg1)
2267         self.pg0.add_stream(pkts)
2268         self.pg_enable_capture(self.pg_interfaces)
2269         self.pg_start()
2270         capture = self.pg1.get_capture(len(pkts))
2271         sleep(15)
2272
2273         dms = self.vapi.snat_det_map_dump()
2274         self.assertEqual(0, dms[0].ses_num)
2275
2276     def test_session_limit_per_user(self):
2277         """ CGNAT maximum 1000 sessions per user should be created """
2278         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2279                                    32,
2280                                    socket.inet_aton(self.snat_addr),
2281                                    32)
2282         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2283         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2284                                                  is_inside=0)
2285         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2286                                      src_address=self.pg2.local_ip4n,
2287                                      path_mtu=512,
2288                                      template_interval=10)
2289         self.vapi.snat_ipfix()
2290
2291         pkts = []
2292         for port in range(1025, 2025):
2293             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2294                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2295                  UDP(sport=port, dport=port))
2296             pkts.append(p)
2297
2298         self.pg0.add_stream(pkts)
2299         self.pg_enable_capture(self.pg_interfaces)
2300         self.pg_start()
2301         capture = self.pg1.get_capture(len(pkts))
2302
2303         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2304              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2305              UDP(sport=3001, dport=3002))
2306         self.pg0.add_stream(p)
2307         self.pg_enable_capture(self.pg_interfaces)
2308         self.pg_start()
2309         capture = self.pg1.assert_nothing_captured()
2310
2311         # verify ICMP error packet
2312         capture = self.pg0.get_capture(1)
2313         p = capture[0]
2314         self.assertTrue(p.haslayer(ICMP))
2315         icmp = p[ICMP]
2316         self.assertEqual(icmp.type, 3)
2317         self.assertEqual(icmp.code, 1)
2318         self.assertTrue(icmp.haslayer(IPerror))
2319         inner_ip = icmp[IPerror]
2320         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2321         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2322
2323         dms = self.vapi.snat_det_map_dump()
2324
2325         self.assertEqual(1000, dms[0].ses_num)
2326
2327         # verify IPFIX logging
2328         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2329         capture = self.pg2.get_capture(2)
2330         ipfix = IPFIXDecoder()
2331         # first load template
2332         for p in capture:
2333             self.assertTrue(p.haslayer(IPFIX))
2334             if p.haslayer(Template):
2335                 ipfix.add_template(p.getlayer(Template))
2336         # verify events in data set
2337         for p in capture:
2338             if p.haslayer(Data):
2339                 data = ipfix.decode_data_set(p.getlayer(Set))
2340                 self.verify_ipfix_max_entries_per_user(data)
2341
2342     def clear_snat(self):
2343         """
2344         Clear SNAT configuration.
2345         """
2346         self.vapi.snat_ipfix(enable=0)
2347         self.vapi.snat_det_set_timeouts()
2348         deterministic_mappings = self.vapi.snat_det_map_dump()
2349         for dsm in deterministic_mappings:
2350             self.vapi.snat_add_det_map(dsm.in_addr,
2351                                        dsm.in_plen,
2352                                        dsm.out_addr,
2353                                        dsm.out_plen,
2354                                        is_add=0)
2355
2356         interfaces = self.vapi.snat_interface_dump()
2357         for intf in interfaces:
2358             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2359                                                      intf.is_inside,
2360                                                      is_add=0)
2361
2362     def tearDown(self):
2363         super(TestDeterministicNAT, self).tearDown()
2364         if not self.vpp_dead:
2365             self.logger.info(self.vapi.cli("show snat detail"))
2366             self.clear_snat()
2367
2368
2369 class TestNAT64(MethodHolder):
2370     """ NAT64 Test Cases """
2371
2372     @classmethod
2373     def setUpClass(cls):
2374         super(TestNAT64, cls).setUpClass()
2375
2376         try:
2377             cls.tcp_port_in = 6303
2378             cls.tcp_port_out = 6303
2379             cls.udp_port_in = 6304
2380             cls.udp_port_out = 6304
2381             cls.icmp_id_in = 6305
2382             cls.icmp_id_out = 6305
2383             cls.nat_addr = '10.0.0.3'
2384             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2385
2386             cls.create_pg_interfaces(range(2))
2387             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2388             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2389
2390             for i in cls.ip6_interfaces:
2391                 i.admin_up()
2392                 i.config_ip6()
2393                 i.resolve_ndp()
2394
2395             for i in cls.ip4_interfaces:
2396                 i.admin_up()
2397                 i.config_ip4()
2398                 i.resolve_arp()
2399
2400         except Exception:
2401             super(TestNAT64, cls).tearDownClass()
2402             raise
2403
2404     def test_pool(self):
2405         """ Add/delete address to NAT64 pool """
2406         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2407
2408         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2409
2410         addresses = self.vapi.nat64_pool_addr_dump()
2411         self.assertEqual(len(addresses), 1)
2412         self.assertEqual(addresses[0].address, nat_addr)
2413
2414         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2415
2416         addresses = self.vapi.nat64_pool_addr_dump()
2417         self.assertEqual(len(addresses), 0)
2418
2419     def test_interface(self):
2420         """ Enable/disable NAT64 feature on the interface """
2421         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2422         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2423
2424         interfaces = self.vapi.nat64_interface_dump()
2425         self.assertEqual(len(interfaces), 2)
2426         pg0_found = False
2427         pg1_found = False
2428         for intf in interfaces:
2429             if intf.sw_if_index == self.pg0.sw_if_index:
2430                 self.assertEqual(intf.is_inside, 1)
2431                 pg0_found = True
2432             elif intf.sw_if_index == self.pg1.sw_if_index:
2433                 self.assertEqual(intf.is_inside, 0)
2434                 pg1_found = True
2435         self.assertTrue(pg0_found)
2436         self.assertTrue(pg1_found)
2437
2438         features = self.vapi.cli("show interface features pg0")
2439         self.assertNotEqual(features.find('nat64-in2out'), -1)
2440         features = self.vapi.cli("show interface features pg1")
2441         self.assertNotEqual(features.find('nat64-out2in'), -1)
2442
2443         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2444         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2445
2446         interfaces = self.vapi.nat64_interface_dump()
2447         self.assertEqual(len(interfaces), 0)
2448
2449     def test_static_bib(self):
2450         """ Add/delete static BIB entry """
2451         in_addr = socket.inet_pton(socket.AF_INET6,
2452                                    '2001:db8:85a3::8a2e:370:7334')
2453         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2454         in_port = 1234
2455         out_port = 5678
2456         proto = IP_PROTOS.tcp
2457
2458         self.vapi.nat64_add_del_static_bib(in_addr,
2459                                            out_addr,
2460                                            in_port,
2461                                            out_port,
2462                                            proto)
2463         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2464         static_bib_num = 0
2465         for bibe in bib:
2466             if bibe.is_static:
2467                 static_bib_num += 1
2468                 self.assertEqual(bibe.i_addr, in_addr)
2469                 self.assertEqual(bibe.o_addr, out_addr)
2470                 self.assertEqual(bibe.i_port, in_port)
2471                 self.assertEqual(bibe.o_port, out_port)
2472         self.assertEqual(static_bib_num, 1)
2473
2474         self.vapi.nat64_add_del_static_bib(in_addr,
2475                                            out_addr,
2476                                            in_port,
2477                                            out_port,
2478                                            proto,
2479                                            is_add=0)
2480         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2481         static_bib_num = 0
2482         for bibe in bib:
2483             if bibe.is_static:
2484                 static_bib_num += 1
2485         self.assertEqual(static_bib_num, 0)
2486
2487     def test_set_timeouts(self):
2488         """ Set NAT64 timeouts """
2489         # verify default values
2490         timeouts = self.vapi.nat64_get_timeouts()
2491         self.assertEqual(timeouts.udp, 300)
2492         self.assertEqual(timeouts.icmp, 60)
2493         self.assertEqual(timeouts.tcp_trans, 240)
2494         self.assertEqual(timeouts.tcp_est, 7440)
2495         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2496
2497         # set and verify custom values
2498         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2499                                      tcp_est=7450, tcp_incoming_syn=10)
2500         timeouts = self.vapi.nat64_get_timeouts()
2501         self.assertEqual(timeouts.udp, 200)
2502         self.assertEqual(timeouts.icmp, 30)
2503         self.assertEqual(timeouts.tcp_trans, 250)
2504         self.assertEqual(timeouts.tcp_est, 7450)
2505         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2506
2507     def test_dynamic(self):
2508         """ NAT64 dynamic translation test """
2509         self.tcp_port_in = 6303
2510         self.udp_port_in = 6304
2511         self.icmp_id_in = 6305
2512
2513         ses_num_start = self.nat64_get_ses_num()
2514
2515         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2516                                                 self.nat_addr_n)
2517         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2518         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2519
2520         # in2out
2521         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2522         self.pg0.add_stream(pkts)
2523         self.pg_enable_capture(self.pg_interfaces)
2524         self.pg_start()
2525         capture = self.pg1.get_capture(3)
2526         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2527                                 dst_ip=self.pg1.remote_ip4)
2528
2529         # out2in
2530         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2531         self.pg1.add_stream(pkts)
2532         self.pg_enable_capture(self.pg_interfaces)
2533         self.pg_start()
2534         capture = self.pg0.get_capture(3)
2535         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2536         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2537
2538         # in2out
2539         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2540         self.pg0.add_stream(pkts)
2541         self.pg_enable_capture(self.pg_interfaces)
2542         self.pg_start()
2543         capture = self.pg1.get_capture(3)
2544         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2545                                 dst_ip=self.pg1.remote_ip4)
2546
2547         # out2in
2548         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2549         self.pg1.add_stream(pkts)
2550         self.pg_enable_capture(self.pg_interfaces)
2551         self.pg_start()
2552         capture = self.pg0.get_capture(3)
2553         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2554         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2555
2556         ses_num_end = self.nat64_get_ses_num()
2557
2558         self.assertEqual(ses_num_end - ses_num_start, 3)
2559
2560     def test_static(self):
2561         """ NAT64 static translation test """
2562         self.tcp_port_in = 60303
2563         self.udp_port_in = 60304
2564         self.icmp_id_in = 60305
2565         self.tcp_port_out = 60303
2566         self.udp_port_out = 60304
2567         self.icmp_id_out = 60305
2568
2569         ses_num_start = self.nat64_get_ses_num()
2570
2571         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2572                                                 self.nat_addr_n)
2573         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2574         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2575
2576         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2577                                            self.nat_addr_n,
2578                                            self.tcp_port_in,
2579                                            self.tcp_port_out,
2580                                            IP_PROTOS.tcp)
2581         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2582                                            self.nat_addr_n,
2583                                            self.udp_port_in,
2584                                            self.udp_port_out,
2585                                            IP_PROTOS.udp)
2586         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2587                                            self.nat_addr_n,
2588                                            self.icmp_id_in,
2589                                            self.icmp_id_out,
2590                                            IP_PROTOS.icmp)
2591
2592         # in2out
2593         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2594         self.pg0.add_stream(pkts)
2595         self.pg_enable_capture(self.pg_interfaces)
2596         self.pg_start()
2597         capture = self.pg1.get_capture(3)
2598         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2599                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2600
2601         # out2in
2602         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2603         self.pg1.add_stream(pkts)
2604         self.pg_enable_capture(self.pg_interfaces)
2605         self.pg_start()
2606         capture = self.pg0.get_capture(3)
2607         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2608         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2609
2610         ses_num_end = self.nat64_get_ses_num()
2611
2612         self.assertEqual(ses_num_end - ses_num_start, 3)
2613
2614     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2615     def test_session_timeout(self):
2616         """ NAT64 session timeout """
2617         self.icmp_id_in = 1234
2618         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2619                                                 self.nat_addr_n)
2620         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2621         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2622         self.vapi.nat64_set_timeouts(icmp=5)
2623
2624         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2625         self.pg0.add_stream(pkts)
2626         self.pg_enable_capture(self.pg_interfaces)
2627         self.pg_start()
2628         capture = self.pg1.get_capture(3)
2629
2630         ses_num_before_timeout = self.nat64_get_ses_num()
2631
2632         sleep(15)
2633
2634         # ICMP session after timeout
2635         ses_num_after_timeout = self.nat64_get_ses_num()
2636         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2637
2638     def nat64_get_ses_num(self):
2639         """
2640         Return number of active NAT64 sessions.
2641         """
2642         ses_num = 0
2643         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2644         ses_num += len(st)
2645         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2646         ses_num += len(st)
2647         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2648         ses_num += len(st)
2649         return ses_num
2650
2651     def clear_nat64(self):
2652         """
2653         Clear NAT64 configuration.
2654         """
2655         self.vapi.nat64_set_timeouts()
2656
2657         interfaces = self.vapi.nat64_interface_dump()
2658         for intf in interfaces:
2659             self.vapi.nat64_add_del_interface(intf.sw_if_index,
2660                                               intf.is_inside,
2661                                               is_add=0)
2662
2663         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2664         for bibe in bib:
2665             if bibe.is_static:
2666                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2667                                                    bibe.o_addr,
2668                                                    bibe.i_port,
2669                                                    bibe.o_port,
2670                                                    bibe.proto,
2671                                                    bibe.vrf_id,
2672                                                    is_add=0)
2673
2674         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2675         for bibe in bib:
2676             if bibe.is_static:
2677                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2678                                                    bibe.o_addr,
2679                                                    bibe.i_port,
2680                                                    bibe.o_port,
2681                                                    bibe.proto,
2682                                                    bibe.vrf_id,
2683                                                    is_add=0)
2684
2685         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2686         for bibe in bib:
2687             if bibe.is_static:
2688                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2689                                                    bibe.o_addr,
2690                                                    bibe.i_port,
2691                                                    bibe.o_port,
2692                                                    bibe.proto,
2693                                                    bibe.vrf_id,
2694                                                    is_add=0)
2695
2696         adresses = self.vapi.nat64_pool_addr_dump()
2697         for addr in adresses:
2698             self.vapi.nat64_add_del_pool_addr_range(addr.address,
2699                                                     addr.address,
2700                                                     is_add=0)
2701
2702     def tearDown(self):
2703         super(TestNAT64, self).tearDown()
2704         if not self.vpp_dead:
2705             self.logger.info(self.vapi.cli("show nat64 pool"))
2706             self.logger.info(self.vapi.cli("show nat64 interfaces"))
2707             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2708             self.logger.info(self.vapi.cli("show nat64 bib udp"))
2709             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2710             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2711             self.logger.info(self.vapi.cli("show nat64 session table udp"))
2712             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2713             self.clear_nat64()
2714
2715 if __name__ == '__main__':
2716     unittest.main(testRunner=VppTestRunner)