CGN: additional tests
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from util import ppp
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from time import sleep
15
16
17 class MethodHolder(VppTestCase):
18     """ SNAT create capture and verify method holder """
19
20     @classmethod
21     def setUpClass(cls):
22         super(MethodHolder, cls).setUpClass()
23
24     def tearDown(self):
25         super(MethodHolder, self).tearDown()
26
27     def create_stream_in(self, in_if, out_if, ttl=64):
28         """
29         Create packet stream for inside network
30
31         :param in_if: Inside interface
32         :param out_if: Outside interface
33         :param ttl: TTL of generated packets
34         """
35         pkts = []
36         # TCP
37         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
38              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
39              TCP(sport=self.tcp_port_in))
40         pkts.append(p)
41
42         # UDP
43         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
44              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
45              UDP(sport=self.udp_port_in))
46         pkts.append(p)
47
48         # ICMP
49         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
50              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
51              ICMP(id=self.icmp_id_in, type='echo-request'))
52         pkts.append(p)
53
54         return pkts
55
56     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
57         """
58         Create packet stream for outside network
59
60         :param out_if: Outside interface
61         :param dst_ip: Destination IP address (Default use global SNAT address)
62         :param ttl: TTL of generated packets
63         """
64         if dst_ip is None:
65             dst_ip = self.snat_addr
66         pkts = []
67         # TCP
68         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
69              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
70              TCP(dport=self.tcp_port_out))
71         pkts.append(p)
72
73         # UDP
74         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
75              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
76              UDP(dport=self.udp_port_out))
77         pkts.append(p)
78
79         # ICMP
80         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
81              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
82              ICMP(id=self.icmp_id_out, type='echo-reply'))
83         pkts.append(p)
84
85         return pkts
86
87     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
88                            packet_num=3):
89         """
90         Verify captured packets on outside network
91
92         :param capture: Captured packets
93         :param nat_ip: Translated IP address (Default use global SNAT address)
94         :param same_port: Sorce port number is not translated (Default False)
95         :param packet_num: Expected number of packets (Default 3)
96         """
97         if nat_ip is None:
98             nat_ip = self.snat_addr
99         self.assertEqual(packet_num, len(capture))
100         for packet in capture:
101             try:
102                 self.assertEqual(packet[IP].src, nat_ip)
103                 if packet.haslayer(TCP):
104                     if same_port:
105                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
106                     else:
107                         self.assertNotEqual(
108                             packet[TCP].sport, self.tcp_port_in)
109                     self.tcp_port_out = packet[TCP].sport
110                 elif packet.haslayer(UDP):
111                     if same_port:
112                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
113                     else:
114                         self.assertNotEqual(
115                             packet[UDP].sport, self.udp_port_in)
116                     self.udp_port_out = packet[UDP].sport
117                 else:
118                     if same_port:
119                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
120                     else:
121                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
122                     self.icmp_id_out = packet[ICMP].id
123             except:
124                 self.logger.error(ppp("Unexpected or invalid packet "
125                                       "(outside network):", packet))
126                 raise
127
128     def verify_capture_in(self, capture, in_if, packet_num=3):
129         """
130         Verify captured packets on inside network
131
132         :param capture: Captured packets
133         :param in_if: Inside interface
134         :param packet_num: Expected number of packets (Default 3)
135         """
136         self.assertEqual(packet_num, len(capture))
137         for packet in capture:
138             try:
139                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
140                 if packet.haslayer(TCP):
141                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
142                 elif packet.haslayer(UDP):
143                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
144                 else:
145                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
146             except:
147                 self.logger.error(ppp("Unexpected or invalid packet "
148                                       "(inside network):", packet))
149                 raise
150
151     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
152         """
153         Verify captured packet that don't have to be translated
154
155         :param capture: Captured packets
156         :param ingress_if: Ingress interface
157         :param egress_if: Egress interface
158         """
159         for packet in capture:
160             try:
161                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
162                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
163                 if packet.haslayer(TCP):
164                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
165                 elif packet.haslayer(UDP):
166                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
167                 else:
168                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
169             except:
170                 self.logger.error(ppp("Unexpected or invalid packet "
171                                       "(inside network):", packet))
172                 raise
173
174     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
175                                             packet_num=3, icmp_type=11):
176         """
177         Verify captured packets with ICMP errors on outside network
178
179         :param capture: Captured packets
180         :param src_ip: Translated IP address or IP address of VPP
181                        (Default use global SNAT address)
182         :param packet_num: Expected number of packets (Default 3)
183         :param icmp_type: Type of error ICMP packet
184                           we are expecting (Default 11)
185         """
186         if src_ip is None:
187             src_ip = self.snat_addr
188         self.assertEqual(packet_num, len(capture))
189         for packet in capture:
190             try:
191                 self.assertEqual(packet[IP].src, src_ip)
192                 self.assertTrue(packet.haslayer(ICMP))
193                 icmp = packet[ICMP]
194                 self.assertEqual(icmp.type, icmp_type)
195                 self.assertTrue(icmp.haslayer(IPerror))
196                 inner_ip = icmp[IPerror]
197                 if inner_ip.haslayer(TCPerror):
198                     self.assertEqual(inner_ip[TCPerror].dport,
199                                      self.tcp_port_out)
200                 elif inner_ip.haslayer(UDPerror):
201                     self.assertEqual(inner_ip[UDPerror].dport,
202                                      self.udp_port_out)
203                 else:
204                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
205             except:
206                 self.logger.error(ppp("Unexpected or invalid packet "
207                                       "(outside network):", packet))
208                 raise
209
210     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
211                                            icmp_type=11):
212         """
213         Verify captured packets with ICMP errors on inside network
214
215         :param capture: Captured packets
216         :param in_if: Inside interface
217         :param packet_num: Expected number of packets (Default 3)
218         :param icmp_type: Type of error ICMP packet
219                           we are expecting (Default 11)
220         """
221         self.assertEqual(packet_num, len(capture))
222         for packet in capture:
223             try:
224                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
225                 self.assertTrue(packet.haslayer(ICMP))
226                 icmp = packet[ICMP]
227                 self.assertEqual(icmp.type, icmp_type)
228                 self.assertTrue(icmp.haslayer(IPerror))
229                 inner_ip = icmp[IPerror]
230                 if inner_ip.haslayer(TCPerror):
231                     self.assertEqual(inner_ip[TCPerror].sport,
232                                      self.tcp_port_in)
233                 elif inner_ip.haslayer(UDPerror):
234                     self.assertEqual(inner_ip[UDPerror].sport,
235                                      self.udp_port_in)
236                 else:
237                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
238             except:
239                 self.logger.error(ppp("Unexpected or invalid packet "
240                                       "(inside network):", packet))
241                 raise
242
243     def verify_ipfix_nat44_ses(self, data):
244         """
245         Verify IPFIX NAT44 session create/delete event
246
247         :param data: Decoded IPFIX data records
248         """
249         nat44_ses_create_num = 0
250         nat44_ses_delete_num = 0
251         self.assertEqual(6, len(data))
252         for record in data:
253             # natEvent
254             self.assertIn(ord(record[230]), [4, 5])
255             if ord(record[230]) == 4:
256                 nat44_ses_create_num += 1
257             else:
258                 nat44_ses_delete_num += 1
259             # sourceIPv4Address
260             self.assertEqual(self.pg0.remote_ip4n, record[8])
261             # postNATSourceIPv4Address
262             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
263                              record[225])
264             # ingressVRFID
265             self.assertEqual(struct.pack("!I", 0), record[234])
266             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
267             if IP_PROTOS.icmp == ord(record[4]):
268                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
269                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
270                                  record[227])
271             elif IP_PROTOS.tcp == ord(record[4]):
272                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
273                                  record[7])
274                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
275                                  record[227])
276             elif IP_PROTOS.udp == ord(record[4]):
277                 self.assertEqual(struct.pack("!H", self.udp_port_in),
278                                  record[7])
279                 self.assertEqual(struct.pack("!H", self.udp_port_out),
280                                  record[227])
281             else:
282                 self.fail("Invalid protocol")
283         self.assertEqual(3, nat44_ses_create_num)
284         self.assertEqual(3, nat44_ses_delete_num)
285
286     def verify_ipfix_addr_exhausted(self, data):
287         """
288         Verify IPFIX NAT addresses event
289
290         :param data: Decoded IPFIX data records
291         """
292         self.assertEqual(1, len(data))
293         record = data[0]
294         # natEvent
295         self.assertEqual(ord(record[230]), 3)
296         # natPoolID
297         self.assertEqual(struct.pack("!I", 0), record[283])
298
299
300 class TestSNAT(MethodHolder):
301     """ SNAT Test Cases """
302
303     @classmethod
304     def setUpClass(cls):
305         super(TestSNAT, cls).setUpClass()
306
307         try:
308             cls.tcp_port_in = 6303
309             cls.tcp_port_out = 6303
310             cls.udp_port_in = 6304
311             cls.udp_port_out = 6304
312             cls.icmp_id_in = 6305
313             cls.icmp_id_out = 6305
314             cls.snat_addr = '10.0.0.3'
315
316             cls.create_pg_interfaces(range(8))
317             cls.interfaces = list(cls.pg_interfaces[0:4])
318
319             for i in cls.interfaces:
320                 i.admin_up()
321                 i.config_ip4()
322                 i.resolve_arp()
323
324             cls.pg0.generate_remote_hosts(2)
325             cls.pg0.configure_ipv4_neighbors()
326
327             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
328
329             cls.pg4._local_ip4 = "172.16.255.1"
330             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
331             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
332             cls.pg4.set_table_ip4(10)
333             cls.pg5._local_ip4 = "172.16.255.3"
334             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
335             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
336             cls.pg5.set_table_ip4(10)
337             cls.pg6._local_ip4 = "172.16.255.1"
338             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
339             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
340             cls.pg6.set_table_ip4(20)
341             for i in cls.overlapping_interfaces:
342                 i.config_ip4()
343                 i.admin_up()
344                 i.resolve_arp()
345
346             cls.pg7.admin_up()
347
348         except Exception:
349             super(TestSNAT, cls).tearDownClass()
350             raise
351
352     def clear_snat(self):
353         """
354         Clear SNAT configuration.
355         """
356         if self.pg7.has_ip4_config:
357             self.pg7.unconfig_ip4()
358
359         interfaces = self.vapi.snat_interface_addr_dump()
360         for intf in interfaces:
361             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
362
363         self.vapi.snat_ipfix(enable=0)
364
365         interfaces = self.vapi.snat_interface_dump()
366         for intf in interfaces:
367             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
368                                                      intf.is_inside,
369                                                      is_add=0)
370
371         static_mappings = self.vapi.snat_static_mapping_dump()
372         for sm in static_mappings:
373             self.vapi.snat_add_static_mapping(sm.local_ip_address,
374                                               sm.external_ip_address,
375                                               local_port=sm.local_port,
376                                               external_port=sm.external_port,
377                                               addr_only=sm.addr_only,
378                                               vrf_id=sm.vrf_id,
379                                               protocol=sm.protocol,
380                                               is_add=0)
381
382         adresses = self.vapi.snat_address_dump()
383         for addr in adresses:
384             self.vapi.snat_add_address_range(addr.ip_address,
385                                              addr.ip_address,
386                                              is_add=0)
387
388     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
389                                 local_port=0, external_port=0, vrf_id=0,
390                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
391                                 proto=0):
392         """
393         Add/delete S-NAT static mapping
394
395         :param local_ip: Local IP address
396         :param external_ip: External IP address
397         :param local_port: Local port number (Optional)
398         :param external_port: External port number (Optional)
399         :param vrf_id: VRF ID (Default 0)
400         :param is_add: 1 if add, 0 if delete (Default add)
401         :param external_sw_if_index: External interface instead of IP address
402         :param proto: IP protocol (Mandatory if port specified)
403         """
404         addr_only = 1
405         if local_port and external_port:
406             addr_only = 0
407         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
408         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
409         self.vapi.snat_add_static_mapping(
410             l_ip,
411             e_ip,
412             external_sw_if_index,
413             local_port,
414             external_port,
415             addr_only,
416             vrf_id,
417             proto,
418             is_add)
419
420     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
421         """
422         Add/delete S-NAT address
423
424         :param ip: IP address
425         :param is_add: 1 if add, 0 if delete (Default add)
426         """
427         snat_addr = socket.inet_pton(socket.AF_INET, ip)
428         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
429                                          vrf_id=vrf_id)
430
431     def test_dynamic(self):
432         """ SNAT dynamic translation test """
433
434         self.snat_add_address(self.snat_addr)
435         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
436         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
437                                                  is_inside=0)
438
439         # in2out
440         pkts = self.create_stream_in(self.pg0, self.pg1)
441         self.pg0.add_stream(pkts)
442         self.pg_enable_capture(self.pg_interfaces)
443         self.pg_start()
444         capture = self.pg1.get_capture(len(pkts))
445         self.verify_capture_out(capture)
446
447         # out2in
448         pkts = self.create_stream_out(self.pg1)
449         self.pg1.add_stream(pkts)
450         self.pg_enable_capture(self.pg_interfaces)
451         self.pg_start()
452         capture = self.pg0.get_capture(len(pkts))
453         self.verify_capture_in(capture, self.pg0)
454
455     def test_dynamic_icmp_errors_in2out_ttl_1(self):
456         """ SNAT handling of client packets with TTL=1 """
457
458         self.snat_add_address(self.snat_addr)
459         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
460         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
461                                                  is_inside=0)
462
463         # Client side - generate traffic
464         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
465         self.pg0.add_stream(pkts)
466         self.pg_enable_capture(self.pg_interfaces)
467         self.pg_start()
468
469         # Client side - verify ICMP type 11 packets
470         capture = self.pg0.get_capture(len(pkts))
471         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
472
473     def test_dynamic_icmp_errors_out2in_ttl_1(self):
474         """ SNAT handling of server packets with TTL=1 """
475
476         self.snat_add_address(self.snat_addr)
477         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
478         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
479                                                  is_inside=0)
480
481         # Client side - create sessions
482         pkts = self.create_stream_in(self.pg0, self.pg1)
483         self.pg0.add_stream(pkts)
484         self.pg_enable_capture(self.pg_interfaces)
485         self.pg_start()
486
487         # Server side - generate traffic
488         capture = self.pg1.get_capture(len(pkts))
489         self.verify_capture_out(capture)
490         pkts = self.create_stream_out(self.pg1, ttl=1)
491         self.pg1.add_stream(pkts)
492         self.pg_enable_capture(self.pg_interfaces)
493         self.pg_start()
494
495         # Server side - verify ICMP type 11 packets
496         capture = self.pg1.get_capture(len(pkts))
497         self.verify_capture_out_with_icmp_errors(capture,
498                                                  src_ip=self.pg1.local_ip4)
499
500     def test_dynamic_icmp_errors_in2out_ttl_2(self):
501         """ SNAT handling of error responses to client packets with TTL=2 """
502
503         self.snat_add_address(self.snat_addr)
504         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
506                                                  is_inside=0)
507
508         # Client side - generate traffic
509         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
510         self.pg0.add_stream(pkts)
511         self.pg_enable_capture(self.pg_interfaces)
512         self.pg_start()
513
514         # Server side - simulate ICMP type 11 response
515         capture = self.pg1.get_capture(len(pkts))
516         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
517                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
518                 ICMP(type=11) / packet[IP] for packet in capture]
519         self.pg1.add_stream(pkts)
520         self.pg_enable_capture(self.pg_interfaces)
521         self.pg_start()
522
523         # Client side - verify ICMP type 11 packets
524         capture = self.pg0.get_capture(len(pkts))
525         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
526
527     def test_dynamic_icmp_errors_out2in_ttl_2(self):
528         """ SNAT handling of error responses to server packets with TTL=2 """
529
530         self.snat_add_address(self.snat_addr)
531         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
533                                                  is_inside=0)
534
535         # Client side - create sessions
536         pkts = self.create_stream_in(self.pg0, self.pg1)
537         self.pg0.add_stream(pkts)
538         self.pg_enable_capture(self.pg_interfaces)
539         self.pg_start()
540
541         # Server side - generate traffic
542         capture = self.pg1.get_capture(len(pkts))
543         self.verify_capture_out(capture)
544         pkts = self.create_stream_out(self.pg1, ttl=2)
545         self.pg1.add_stream(pkts)
546         self.pg_enable_capture(self.pg_interfaces)
547         self.pg_start()
548
549         # Client side - simulate ICMP type 11 response
550         capture = self.pg0.get_capture(len(pkts))
551         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
552                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
553                 ICMP(type=11) / packet[IP] for packet in capture]
554         self.pg0.add_stream(pkts)
555         self.pg_enable_capture(self.pg_interfaces)
556         self.pg_start()
557
558         # Server side - verify ICMP type 11 packets
559         capture = self.pg1.get_capture(len(pkts))
560         self.verify_capture_out_with_icmp_errors(capture)
561
562     def test_ping_out_interface_from_outside(self):
563         """ Ping SNAT out interface from outside """
564
565         self.snat_add_address(self.snat_addr)
566         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
567         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
568                                                  is_inside=0)
569
570         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
571              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
572              ICMP(id=self.icmp_id_out, type='echo-request'))
573         pkts = [p]
574         self.pg1.add_stream(pkts)
575         self.pg_enable_capture(self.pg_interfaces)
576         self.pg_start()
577         capture = self.pg1.get_capture(len(pkts))
578         self.assertEqual(1, len(capture))
579         packet = capture[0]
580         try:
581             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
582             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
583             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
584             self.assertEqual(packet[ICMP].type, 0)  # echo reply
585         except:
586             self.logger.error(ppp("Unexpected or invalid packet "
587                                   "(outside network):", packet))
588             raise
589
590     def test_static_in(self):
591         """ SNAT 1:1 NAT initialized from inside network """
592
593         nat_ip = "10.0.0.10"
594         self.tcp_port_out = 6303
595         self.udp_port_out = 6304
596         self.icmp_id_out = 6305
597
598         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
599         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
600         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
601                                                  is_inside=0)
602
603         # in2out
604         pkts = self.create_stream_in(self.pg0, self.pg1)
605         self.pg0.add_stream(pkts)
606         self.pg_enable_capture(self.pg_interfaces)
607         self.pg_start()
608         capture = self.pg1.get_capture(len(pkts))
609         self.verify_capture_out(capture, nat_ip, True)
610
611         # out2in
612         pkts = self.create_stream_out(self.pg1, nat_ip)
613         self.pg1.add_stream(pkts)
614         self.pg_enable_capture(self.pg_interfaces)
615         self.pg_start()
616         capture = self.pg0.get_capture(len(pkts))
617         self.verify_capture_in(capture, self.pg0)
618
619     def test_static_out(self):
620         """ SNAT 1:1 NAT initialized from outside network """
621
622         nat_ip = "10.0.0.20"
623         self.tcp_port_out = 6303
624         self.udp_port_out = 6304
625         self.icmp_id_out = 6305
626
627         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
628         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
629         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
630                                                  is_inside=0)
631
632         # out2in
633         pkts = self.create_stream_out(self.pg1, nat_ip)
634         self.pg1.add_stream(pkts)
635         self.pg_enable_capture(self.pg_interfaces)
636         self.pg_start()
637         capture = self.pg0.get_capture(len(pkts))
638         self.verify_capture_in(capture, self.pg0)
639
640         # in2out
641         pkts = self.create_stream_in(self.pg0, self.pg1)
642         self.pg0.add_stream(pkts)
643         self.pg_enable_capture(self.pg_interfaces)
644         self.pg_start()
645         capture = self.pg1.get_capture(len(pkts))
646         self.verify_capture_out(capture, nat_ip, True)
647
648     def test_static_with_port_in(self):
649         """ SNAT 1:1 NAT with port initialized from inside network """
650
651         self.tcp_port_out = 3606
652         self.udp_port_out = 3607
653         self.icmp_id_out = 3608
654
655         self.snat_add_address(self.snat_addr)
656         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
657                                      self.tcp_port_in, self.tcp_port_out,
658                                      proto=IP_PROTOS.tcp)
659         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
660                                      self.udp_port_in, self.udp_port_out,
661                                      proto=IP_PROTOS.udp)
662         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
663                                      self.icmp_id_in, self.icmp_id_out,
664                                      proto=IP_PROTOS.icmp)
665         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
666         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
667                                                  is_inside=0)
668
669         # in2out
670         pkts = self.create_stream_in(self.pg0, self.pg1)
671         self.pg0.add_stream(pkts)
672         self.pg_enable_capture(self.pg_interfaces)
673         self.pg_start()
674         capture = self.pg1.get_capture(len(pkts))
675         self.verify_capture_out(capture)
676
677         # out2in
678         pkts = self.create_stream_out(self.pg1)
679         self.pg1.add_stream(pkts)
680         self.pg_enable_capture(self.pg_interfaces)
681         self.pg_start()
682         capture = self.pg0.get_capture(len(pkts))
683         self.verify_capture_in(capture, self.pg0)
684
685     def test_static_with_port_out(self):
686         """ SNAT 1:1 NAT with port initialized from outside network """
687
688         self.tcp_port_out = 30606
689         self.udp_port_out = 30607
690         self.icmp_id_out = 30608
691
692         self.snat_add_address(self.snat_addr)
693         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
694                                      self.tcp_port_in, self.tcp_port_out,
695                                      proto=IP_PROTOS.tcp)
696         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
697                                      self.udp_port_in, self.udp_port_out,
698                                      proto=IP_PROTOS.udp)
699         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
700                                      self.icmp_id_in, self.icmp_id_out,
701                                      proto=IP_PROTOS.icmp)
702         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
703         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
704                                                  is_inside=0)
705
706         # out2in
707         pkts = self.create_stream_out(self.pg1)
708         self.pg1.add_stream(pkts)
709         self.pg_enable_capture(self.pg_interfaces)
710         self.pg_start()
711         capture = self.pg0.get_capture(len(pkts))
712         self.verify_capture_in(capture, self.pg0)
713
714         # in2out
715         pkts = self.create_stream_in(self.pg0, self.pg1)
716         self.pg0.add_stream(pkts)
717         self.pg_enable_capture(self.pg_interfaces)
718         self.pg_start()
719         capture = self.pg1.get_capture(len(pkts))
720         self.verify_capture_out(capture)
721
722     def test_static_vrf_aware(self):
723         """ SNAT 1:1 NAT VRF awareness """
724
725         nat_ip1 = "10.0.0.30"
726         nat_ip2 = "10.0.0.40"
727         self.tcp_port_out = 6303
728         self.udp_port_out = 6304
729         self.icmp_id_out = 6305
730
731         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
732                                      vrf_id=10)
733         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
734                                      vrf_id=10)
735         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
736                                                  is_inside=0)
737         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
738         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
739
740         # inside interface VRF match SNAT static mapping VRF
741         pkts = self.create_stream_in(self.pg4, self.pg3)
742         self.pg4.add_stream(pkts)
743         self.pg_enable_capture(self.pg_interfaces)
744         self.pg_start()
745         capture = self.pg3.get_capture(len(pkts))
746         self.verify_capture_out(capture, nat_ip1, True)
747
748         # inside interface VRF don't match SNAT static mapping VRF (packets
749         # are dropped)
750         pkts = self.create_stream_in(self.pg0, self.pg3)
751         self.pg0.add_stream(pkts)
752         self.pg_enable_capture(self.pg_interfaces)
753         self.pg_start()
754         self.pg3.assert_nothing_captured()
755
756     def test_multiple_inside_interfaces(self):
757         """ SNAT multiple inside interfaces (non-overlapping address space) """
758
759         self.snat_add_address(self.snat_addr)
760         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
761         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
762         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
763                                                  is_inside=0)
764
765         # between two S-NAT inside interfaces (no translation)
766         pkts = self.create_stream_in(self.pg0, self.pg1)
767         self.pg0.add_stream(pkts)
768         self.pg_enable_capture(self.pg_interfaces)
769         self.pg_start()
770         capture = self.pg1.get_capture(len(pkts))
771         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
772
773         # from S-NAT inside to interface without S-NAT feature (no translation)
774         pkts = self.create_stream_in(self.pg0, self.pg2)
775         self.pg0.add_stream(pkts)
776         self.pg_enable_capture(self.pg_interfaces)
777         self.pg_start()
778         capture = self.pg2.get_capture(len(pkts))
779         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
780
781         # in2out 1st interface
782         pkts = self.create_stream_in(self.pg0, self.pg3)
783         self.pg0.add_stream(pkts)
784         self.pg_enable_capture(self.pg_interfaces)
785         self.pg_start()
786         capture = self.pg3.get_capture(len(pkts))
787         self.verify_capture_out(capture)
788
789         # out2in 1st interface
790         pkts = self.create_stream_out(self.pg3)
791         self.pg3.add_stream(pkts)
792         self.pg_enable_capture(self.pg_interfaces)
793         self.pg_start()
794         capture = self.pg0.get_capture(len(pkts))
795         self.verify_capture_in(capture, self.pg0)
796
797         # in2out 2nd interface
798         pkts = self.create_stream_in(self.pg1, self.pg3)
799         self.pg1.add_stream(pkts)
800         self.pg_enable_capture(self.pg_interfaces)
801         self.pg_start()
802         capture = self.pg3.get_capture(len(pkts))
803         self.verify_capture_out(capture)
804
805         # out2in 2nd interface
806         pkts = self.create_stream_out(self.pg3)
807         self.pg3.add_stream(pkts)
808         self.pg_enable_capture(self.pg_interfaces)
809         self.pg_start()
810         capture = self.pg1.get_capture(len(pkts))
811         self.verify_capture_in(capture, self.pg1)
812
813     def test_inside_overlapping_interfaces(self):
814         """ SNAT multiple inside interfaces with overlapping address space """
815
816         static_nat_ip = "10.0.0.10"
817         self.snat_add_address(self.snat_addr)
818         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
819                                                  is_inside=0)
820         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
821         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
822         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
823         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
824                                      vrf_id=20)
825
826         # between S-NAT inside interfaces with same VRF (no translation)
827         pkts = self.create_stream_in(self.pg4, self.pg5)
828         self.pg4.add_stream(pkts)
829         self.pg_enable_capture(self.pg_interfaces)
830         self.pg_start()
831         capture = self.pg5.get_capture(len(pkts))
832         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
833
834         # between S-NAT inside interfaces with different VRF (hairpinning)
835         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
836              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
837              TCP(sport=1234, dport=5678))
838         self.pg4.add_stream(p)
839         self.pg_enable_capture(self.pg_interfaces)
840         self.pg_start()
841         capture = self.pg6.get_capture(1)
842         p = capture[0]
843         try:
844             ip = p[IP]
845             tcp = p[TCP]
846             self.assertEqual(ip.src, self.snat_addr)
847             self.assertEqual(ip.dst, self.pg6.remote_ip4)
848             self.assertNotEqual(tcp.sport, 1234)
849             self.assertEqual(tcp.dport, 5678)
850         except:
851             self.logger.error(ppp("Unexpected or invalid packet:", p))
852             raise
853
854         # in2out 1st interface
855         pkts = self.create_stream_in(self.pg4, self.pg3)
856         self.pg4.add_stream(pkts)
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pg_start()
859         capture = self.pg3.get_capture(len(pkts))
860         self.verify_capture_out(capture)
861
862         # out2in 1st interface
863         pkts = self.create_stream_out(self.pg3)
864         self.pg3.add_stream(pkts)
865         self.pg_enable_capture(self.pg_interfaces)
866         self.pg_start()
867         capture = self.pg4.get_capture(len(pkts))
868         self.verify_capture_in(capture, self.pg4)
869
870         # in2out 2nd interface
871         pkts = self.create_stream_in(self.pg5, self.pg3)
872         self.pg5.add_stream(pkts)
873         self.pg_enable_capture(self.pg_interfaces)
874         self.pg_start()
875         capture = self.pg3.get_capture(len(pkts))
876         self.verify_capture_out(capture)
877
878         # out2in 2nd interface
879         pkts = self.create_stream_out(self.pg3)
880         self.pg3.add_stream(pkts)
881         self.pg_enable_capture(self.pg_interfaces)
882         self.pg_start()
883         capture = self.pg5.get_capture(len(pkts))
884         self.verify_capture_in(capture, self.pg5)
885
886         # pg5 session dump
887         addresses = self.vapi.snat_address_dump()
888         self.assertEqual(len(addresses), 1)
889         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
890         self.assertEqual(len(sessions), 3)
891         for session in sessions:
892             self.assertFalse(session.is_static)
893             self.assertEqual(session.inside_ip_address[0:4],
894                              self.pg5.remote_ip4n)
895             self.assertEqual(session.outside_ip_address,
896                              addresses[0].ip_address)
897         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
898         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
899         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
900         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
901         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
902         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
903         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
904         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
905         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
906
907         # in2out 3rd interface
908         pkts = self.create_stream_in(self.pg6, self.pg3)
909         self.pg6.add_stream(pkts)
910         self.pg_enable_capture(self.pg_interfaces)
911         self.pg_start()
912         capture = self.pg3.get_capture(len(pkts))
913         self.verify_capture_out(capture, static_nat_ip, True)
914
915         # out2in 3rd interface
916         pkts = self.create_stream_out(self.pg3, static_nat_ip)
917         self.pg3.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920         capture = self.pg6.get_capture(len(pkts))
921         self.verify_capture_in(capture, self.pg6)
922
923         # general user and session dump verifications
924         users = self.vapi.snat_user_dump()
925         self.assertTrue(len(users) >= 3)
926         addresses = self.vapi.snat_address_dump()
927         self.assertEqual(len(addresses), 1)
928         for user in users:
929             sessions = self.vapi.snat_user_session_dump(user.ip_address,
930                                                         user.vrf_id)
931             for session in sessions:
932                 self.assertEqual(user.ip_address, session.inside_ip_address)
933                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
934                 self.assertTrue(session.protocol in
935                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
936                                  IP_PROTOS.icmp])
937
938         # pg4 session dump
939         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
940         self.assertTrue(len(sessions) >= 4)
941         for session in sessions:
942             self.assertFalse(session.is_static)
943             self.assertEqual(session.inside_ip_address[0:4],
944                              self.pg4.remote_ip4n)
945             self.assertEqual(session.outside_ip_address,
946                              addresses[0].ip_address)
947
948         # pg6 session dump
949         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
950         self.assertTrue(len(sessions) >= 3)
951         for session in sessions:
952             self.assertTrue(session.is_static)
953             self.assertEqual(session.inside_ip_address[0:4],
954                              self.pg6.remote_ip4n)
955             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
956                              map(int, static_nat_ip.split('.')))
957             self.assertTrue(session.inside_port in
958                             [self.tcp_port_in, self.udp_port_in,
959                              self.icmp_id_in])
960
961     def test_hairpinning(self):
962         """ SNAT hairpinning """
963
964         host = self.pg0.remote_hosts[0]
965         server = self.pg0.remote_hosts[1]
966         host_in_port = 1234
967         host_out_port = 0
968         server_in_port = 5678
969         server_out_port = 8765
970
971         self.snat_add_address(self.snat_addr)
972         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
973         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
974                                                  is_inside=0)
975         # add static mapping for server
976         self.snat_add_static_mapping(server.ip4, self.snat_addr,
977                                      server_in_port, server_out_port,
978                                      proto=IP_PROTOS.tcp)
979
980         # send packet from host to server
981         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
982              IP(src=host.ip4, dst=self.snat_addr) /
983              TCP(sport=host_in_port, dport=server_out_port))
984         self.pg0.add_stream(p)
985         self.pg_enable_capture(self.pg_interfaces)
986         self.pg_start()
987         capture = self.pg0.get_capture(1)
988         p = capture[0]
989         try:
990             ip = p[IP]
991             tcp = p[TCP]
992             self.assertEqual(ip.src, self.snat_addr)
993             self.assertEqual(ip.dst, server.ip4)
994             self.assertNotEqual(tcp.sport, host_in_port)
995             self.assertEqual(tcp.dport, server_in_port)
996             host_out_port = tcp.sport
997         except:
998             self.logger.error(ppp("Unexpected or invalid packet:", p))
999             raise
1000
1001         # send reply from server to host
1002         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1003              IP(src=server.ip4, dst=self.snat_addr) /
1004              TCP(sport=server_in_port, dport=host_out_port))
1005         self.pg0.add_stream(p)
1006         self.pg_enable_capture(self.pg_interfaces)
1007         self.pg_start()
1008         capture = self.pg0.get_capture(1)
1009         p = capture[0]
1010         try:
1011             ip = p[IP]
1012             tcp = p[TCP]
1013             self.assertEqual(ip.src, self.snat_addr)
1014             self.assertEqual(ip.dst, host.ip4)
1015             self.assertEqual(tcp.sport, server_out_port)
1016             self.assertEqual(tcp.dport, host_in_port)
1017         except:
1018             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1019             raise
1020
1021     def test_max_translations_per_user(self):
1022         """ MAX translations per user - recycle the least recently used """
1023
1024         self.snat_add_address(self.snat_addr)
1025         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1026         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1027                                                  is_inside=0)
1028
1029         # get maximum number of translations per user
1030         snat_config = self.vapi.snat_show_config()
1031
1032         # send more than maximum number of translations per user packets
1033         pkts_num = snat_config.max_translations_per_user + 5
1034         pkts = []
1035         for port in range(0, pkts_num):
1036             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1037                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1038                  TCP(sport=1025 + port))
1039             pkts.append(p)
1040         self.pg0.add_stream(pkts)
1041         self.pg_enable_capture(self.pg_interfaces)
1042         self.pg_start()
1043
1044         # verify number of translated packet
1045         self.pg1.get_capture(pkts_num)
1046
1047     def test_interface_addr(self):
1048         """ Acquire SNAT addresses from interface """
1049         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1050
1051         # no address in NAT pool
1052         adresses = self.vapi.snat_address_dump()
1053         self.assertEqual(0, len(adresses))
1054
1055         # configure interface address and check NAT address pool
1056         self.pg7.config_ip4()
1057         adresses = self.vapi.snat_address_dump()
1058         self.assertEqual(1, len(adresses))
1059         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1060
1061         # remove interface address and check NAT address pool
1062         self.pg7.unconfig_ip4()
1063         adresses = self.vapi.snat_address_dump()
1064         self.assertEqual(0, len(adresses))
1065
1066     def test_interface_addr_static_mapping(self):
1067         """ Static mapping with addresses from interface """
1068         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1069         self.snat_add_static_mapping('1.2.3.4',
1070                                      external_sw_if_index=self.pg7.sw_if_index)
1071
1072         # static mappings with external interface
1073         static_mappings = self.vapi.snat_static_mapping_dump()
1074         self.assertEqual(1, len(static_mappings))
1075         self.assertEqual(self.pg7.sw_if_index,
1076                          static_mappings[0].external_sw_if_index)
1077
1078         # configure interface address and check static mappings
1079         self.pg7.config_ip4()
1080         static_mappings = self.vapi.snat_static_mapping_dump()
1081         self.assertEqual(1, len(static_mappings))
1082         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1083                          self.pg7.local_ip4n)
1084         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1085
1086         # remove interface address and check static mappings
1087         self.pg7.unconfig_ip4()
1088         static_mappings = self.vapi.snat_static_mapping_dump()
1089         self.assertEqual(0, len(static_mappings))
1090
1091     def test_ipfix_nat44_sess(self):
1092         """ S-NAT IPFIX logging NAT44 session created/delted """
1093         self.snat_add_address(self.snat_addr)
1094         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1095         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1096                                                  is_inside=0)
1097         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1098                                      src_address=self.pg3.local_ip4n,
1099                                      path_mtu=512,
1100                                      template_interval=10)
1101         self.vapi.snat_ipfix()
1102
1103         pkts = self.create_stream_in(self.pg0, self.pg1)
1104         self.pg0.add_stream(pkts)
1105         self.pg_enable_capture(self.pg_interfaces)
1106         self.pg_start()
1107         capture = self.pg1.get_capture(len(pkts))
1108         self.verify_capture_out(capture)
1109         self.snat_add_address(self.snat_addr, is_add=0)
1110         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1111         capture = self.pg3.get_capture(3)
1112         ipfix = IPFIXDecoder()
1113         # first load template
1114         for p in capture:
1115             self.assertTrue(p.haslayer(IPFIX))
1116             if p.haslayer(Template):
1117                 ipfix.add_template(p.getlayer(Template))
1118         # verify events in data set
1119         for p in capture:
1120             if p.haslayer(Data):
1121                 data = ipfix.decode_data_set(p.getlayer(Set))
1122                 self.verify_ipfix_nat44_ses(data)
1123
1124     def test_ipfix_addr_exhausted(self):
1125         """ S-NAT IPFIX logging NAT addresses exhausted """
1126         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1127         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1128                                                  is_inside=0)
1129         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1130                                      src_address=self.pg3.local_ip4n,
1131                                      path_mtu=512,
1132                                      template_interval=10)
1133         self.vapi.snat_ipfix()
1134
1135         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1136              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1137              TCP(sport=3025))
1138         self.pg0.add_stream(p)
1139         self.pg_enable_capture(self.pg_interfaces)
1140         self.pg_start()
1141         capture = self.pg1.get_capture(0)
1142         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1143         capture = self.pg3.get_capture(3)
1144         ipfix = IPFIXDecoder()
1145         # first load template
1146         for p in capture:
1147             self.assertTrue(p.haslayer(IPFIX))
1148             if p.haslayer(Template):
1149                 ipfix.add_template(p.getlayer(Template))
1150         # verify events in data set
1151         for p in capture:
1152             if p.haslayer(Data):
1153                 data = ipfix.decode_data_set(p.getlayer(Set))
1154                 self.verify_ipfix_addr_exhausted(data)
1155
1156     def test_pool_addr_fib(self):
1157         """ S-NAT add pool addresses to FIB """
1158         static_addr = '10.0.0.10'
1159         self.snat_add_address(self.snat_addr)
1160         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1161         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1162                                                  is_inside=0)
1163         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1164
1165         # SNAT address
1166         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1167              ARP(op=ARP.who_has, pdst=self.snat_addr,
1168                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1169         self.pg1.add_stream(p)
1170         self.pg_enable_capture(self.pg_interfaces)
1171         self.pg_start()
1172         capture = self.pg1.get_capture(1)
1173         self.assertTrue(capture[0].haslayer(ARP))
1174         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1175
1176         # 1:1 NAT address
1177         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1178              ARP(op=ARP.who_has, pdst=static_addr,
1179                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1180         self.pg1.add_stream(p)
1181         self.pg_enable_capture(self.pg_interfaces)
1182         self.pg_start()
1183         capture = self.pg1.get_capture(1)
1184         self.assertTrue(capture[0].haslayer(ARP))
1185         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1186
1187         # send ARP to non-SNAT interface
1188         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1189              ARP(op=ARP.who_has, pdst=self.snat_addr,
1190                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1191         self.pg2.add_stream(p)
1192         self.pg_enable_capture(self.pg_interfaces)
1193         self.pg_start()
1194         capture = self.pg1.get_capture(0)
1195
1196         # remove addresses and verify
1197         self.snat_add_address(self.snat_addr, is_add=0)
1198         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1199                                      is_add=0)
1200
1201         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1202              ARP(op=ARP.who_has, pdst=self.snat_addr,
1203                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1204         self.pg1.add_stream(p)
1205         self.pg_enable_capture(self.pg_interfaces)
1206         self.pg_start()
1207         capture = self.pg1.get_capture(0)
1208
1209         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1210              ARP(op=ARP.who_has, pdst=static_addr,
1211                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1212         self.pg1.add_stream(p)
1213         self.pg_enable_capture(self.pg_interfaces)
1214         self.pg_start()
1215         capture = self.pg1.get_capture(0)
1216
1217     def test_vrf_mode(self):
1218         """ S-NAT tenant VRF aware address pool mode """
1219
1220         vrf_id1 = 1
1221         vrf_id2 = 2
1222         nat_ip1 = "10.0.0.10"
1223         nat_ip2 = "10.0.0.11"
1224
1225         self.pg0.unconfig_ip4()
1226         self.pg1.unconfig_ip4()
1227         self.pg0.set_table_ip4(vrf_id1)
1228         self.pg1.set_table_ip4(vrf_id2)
1229         self.pg0.config_ip4()
1230         self.pg1.config_ip4()
1231
1232         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1233         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1234         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1235         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1236         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1237                                                  is_inside=0)
1238
1239         # first VRF
1240         pkts = self.create_stream_in(self.pg0, self.pg2)
1241         self.pg0.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244         capture = self.pg2.get_capture(len(pkts))
1245         self.verify_capture_out(capture, nat_ip1)
1246
1247         # second VRF
1248         pkts = self.create_stream_in(self.pg1, self.pg2)
1249         self.pg1.add_stream(pkts)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg2.get_capture(len(pkts))
1253         self.verify_capture_out(capture, nat_ip2)
1254
1255     def test_vrf_feature_independent(self):
1256         """ S-NAT tenant VRF independent address pool mode """
1257
1258         nat_ip1 = "10.0.0.10"
1259         nat_ip2 = "10.0.0.11"
1260
1261         self.snat_add_address(nat_ip1)
1262         self.snat_add_address(nat_ip2)
1263         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1264         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1265         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1266                                                  is_inside=0)
1267
1268         # first VRF
1269         pkts = self.create_stream_in(self.pg0, self.pg2)
1270         self.pg0.add_stream(pkts)
1271         self.pg_enable_capture(self.pg_interfaces)
1272         self.pg_start()
1273         capture = self.pg2.get_capture(len(pkts))
1274         self.verify_capture_out(capture, nat_ip1)
1275
1276         # second VRF
1277         pkts = self.create_stream_in(self.pg1, self.pg2)
1278         self.pg1.add_stream(pkts)
1279         self.pg_enable_capture(self.pg_interfaces)
1280         self.pg_start()
1281         capture = self.pg2.get_capture(len(pkts))
1282         self.verify_capture_out(capture, nat_ip1)
1283
1284     def tearDown(self):
1285         super(TestSNAT, self).tearDown()
1286         if not self.vpp_dead:
1287             self.logger.info(self.vapi.cli("show snat verbose"))
1288             self.clear_snat()
1289
1290
1291 class TestDeterministicNAT(MethodHolder):
1292     """ Deterministic NAT Test Cases """
1293
1294     @classmethod
1295     def setUpConstants(cls):
1296         super(TestDeterministicNAT, cls).setUpConstants()
1297         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1298
1299     @classmethod
1300     def setUpClass(cls):
1301         super(TestDeterministicNAT, cls).setUpClass()
1302
1303         try:
1304             cls.tcp_port_in = 6303
1305             cls.tcp_port_out = 6303
1306             cls.udp_port_in = 6304
1307             cls.udp_port_out = 6304
1308             cls.icmp_id_in = 6305
1309             cls.snat_addr = '10.0.0.3'
1310
1311             cls.create_pg_interfaces(range(2))
1312             cls.interfaces = list(cls.pg_interfaces)
1313
1314             for i in cls.interfaces:
1315                 i.admin_up()
1316                 i.config_ip4()
1317                 i.resolve_arp()
1318
1319             cls.pg0.generate_remote_hosts(2)
1320             cls.pg0.configure_ipv4_neighbors()
1321
1322         except Exception:
1323             super(TestDeterministicNAT, cls).tearDownClass()
1324             raise
1325
1326     def create_stream_in(self, in_if, out_if, ttl=64):
1327         """
1328         Create packet stream for inside network
1329
1330         :param in_if: Inside interface
1331         :param out_if: Outside interface
1332         :param ttl: TTL of generated packets
1333         """
1334         pkts = []
1335         # TCP
1336         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1337              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1338              TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
1339         pkts.append(p)
1340
1341         # UDP
1342         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1343              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1344              UDP(sport=self.udp_port_in, dport=self.udp_port_out))
1345         pkts.append(p)
1346
1347         # ICMP
1348         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1349              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1350              ICMP(id=self.icmp_id_in, type='echo-request'))
1351         pkts.append(p)
1352
1353         return pkts
1354
1355     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1356         """
1357         Create packet stream for outside network
1358
1359         :param out_if: Outside interface
1360         :param dst_ip: Destination IP address (Default use global SNAT address)
1361         :param ttl: TTL of generated packets
1362         """
1363         if dst_ip is None:
1364             dst_ip = self.snat_addr
1365         pkts = []
1366         # TCP
1367         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1368              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1369              TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
1370         pkts.append(p)
1371
1372         # UDP
1373         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1374              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1375              UDP(dport=self.udp_external_port, sport=self.udp_port_out))
1376         pkts.append(p)
1377
1378         # ICMP
1379         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1380              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1381              ICMP(id=self.icmp_external_id, type='echo-reply'))
1382         pkts.append(p)
1383
1384         return pkts
1385
1386     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1387         """
1388         Verify captured packets on outside network
1389
1390         :param capture: Captured packets
1391         :param nat_ip: Translated IP address (Default use global SNAT address)
1392         :param same_port: Sorce port number is not translated (Default False)
1393         :param packet_num: Expected number of packets (Default 3)
1394         """
1395         if nat_ip is None:
1396             nat_ip = self.snat_addr
1397         self.assertEqual(packet_num, len(capture))
1398         for packet in capture:
1399             try:
1400                 self.assertEqual(packet[IP].src, nat_ip)
1401                 if packet.haslayer(TCP):
1402                     self.tcp_external_port = packet[TCP].sport
1403                 elif packet.haslayer(UDP):
1404                     self.udp_external_port = packet[UDP].sport
1405                 else:
1406                     self.icmp_external_id = packet[ICMP].id
1407             except:
1408                 self.logger.error(ppp("Unexpected or invalid packet "
1409                                       "(outside network):", packet))
1410                 raise
1411
1412     def initiate_tcp_session(self, in_if, out_if):
1413         """
1414         Initiates TCP session
1415
1416         :param in_if: Inside interface
1417         :param out_if: Outside interface
1418         """
1419         try:
1420             # SYN packet in->out
1421             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1422                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1423                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1424                      flags="S"))
1425             in_if.add_stream(p)
1426             self.pg_enable_capture(self.pg_interfaces)
1427             self.pg_start()
1428             capture = out_if.get_capture(1)
1429             p = capture[0]
1430             self.tcp_external_port = p[TCP].sport
1431
1432             # SYN + ACK packet out->in
1433             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1434                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1435                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1436                      flags="SA"))
1437             out_if.add_stream(p)
1438             self.pg_enable_capture(self.pg_interfaces)
1439             self.pg_start()
1440             in_if.get_capture(1)
1441
1442             # ACK packet in->out
1443             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1444                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1445                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1446                      flags="A"))
1447             in_if.add_stream(p)
1448             self.pg_enable_capture(self.pg_interfaces)
1449             self.pg_start()
1450             out_if.get_capture(1)
1451
1452         except:
1453             self.logger.error("TCP 3 way handshake failed")
1454             raise
1455
1456     def test_deterministic_mode(self):
1457         """ S-NAT run deterministic mode """
1458         in_addr = '172.16.255.0'
1459         out_addr = '172.17.255.50'
1460         in_addr_t = '172.16.255.20'
1461         in_addr_n = socket.inet_aton(in_addr)
1462         out_addr_n = socket.inet_aton(out_addr)
1463         in_addr_t_n = socket.inet_aton(in_addr_t)
1464         in_plen = 24
1465         out_plen = 32
1466
1467         snat_config = self.vapi.snat_show_config()
1468         self.assertEqual(1, snat_config.deterministic)
1469
1470         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1471
1472         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1473         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1474         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1475         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1476
1477         deterministic_mappings = self.vapi.snat_det_map_dump()
1478         self.assertEqual(len(deterministic_mappings), 1)
1479         dsm = deterministic_mappings[0]
1480         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1481         self.assertEqual(in_plen, dsm.in_plen)
1482         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1483         self.assertEqual(out_plen, dsm.out_plen)
1484
1485         self.clear_snat()
1486         deterministic_mappings = self.vapi.snat_det_map_dump()
1487         self.assertEqual(len(deterministic_mappings), 0)
1488
1489     def test_set_timeouts(self):
1490         """ Set deterministic NAT timeouts """
1491         timeouts_before = self.vapi.snat_det_get_timeouts()
1492
1493         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1494                                         timeouts_before.tcp_established + 10,
1495                                         timeouts_before.tcp_transitory + 10,
1496                                         timeouts_before.icmp + 10)
1497
1498         timeouts_after = self.vapi.snat_det_get_timeouts()
1499
1500         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1501         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1502         self.assertNotEqual(timeouts_before.tcp_established,
1503                             timeouts_after.tcp_established)
1504         self.assertNotEqual(timeouts_before.tcp_transitory,
1505                             timeouts_after.tcp_transitory)
1506
1507     def test_det_in(self):
1508         """ CGNAT translation test (TCP, UDP, ICMP) """
1509
1510         nat_ip = "10.0.0.10"
1511         self.tcp_port_out = 6303
1512         self.udp_port_out = 6304
1513         self.icmp_id_in = 6305
1514         self.tcp_external_port = 7303
1515         self.udp_external_port = 7304
1516         self.icmp_id_out = 7305
1517
1518         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1519                                    32,
1520                                    socket.inet_aton(nat_ip),
1521                                    32)
1522         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1523         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1524                                                  is_inside=0)
1525
1526         # in2out
1527         pkts = self.create_stream_in(self.pg0, self.pg1)
1528         self.pg0.add_stream(pkts)
1529         self.pg_enable_capture(self.pg_interfaces)
1530         self.pg_start()
1531         capture = self.pg1.get_capture(len(pkts))
1532         self.verify_capture_out(capture, nat_ip)
1533
1534         # out2in
1535         pkts = self.create_stream_out(self.pg1, nat_ip)
1536         self.pg1.add_stream(pkts)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(len(pkts))
1540         self.verify_capture_in(capture, self.pg0)
1541
1542     def test_multiple_users(self):
1543         """ CGNAT multiple users """
1544
1545         nat_ip = "10.0.0.10"
1546         port_in = 80
1547         port_out = 6303
1548
1549         host0 = self.pg0.remote_hosts[0]
1550         host1 = self.pg0.remote_hosts[1]
1551
1552         self.vapi.snat_add_det_map(host0.ip4n,
1553                                    24,
1554                                    socket.inet_aton(nat_ip),
1555                                    32)
1556         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1557         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1558                                                  is_inside=0)
1559
1560         # host0 to out
1561         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1562              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1563              TCP(sport=port_in, dport=port_out))
1564         self.pg0.add_stream(p)
1565         self.pg_enable_capture(self.pg_interfaces)
1566         self.pg_start()
1567         capture = self.pg1.get_capture(1)
1568         p = capture[0]
1569         try:
1570             ip = p[IP]
1571             tcp = p[TCP]
1572             self.assertEqual(ip.src, nat_ip)
1573             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1574             self.assertEqual(tcp.dport, port_out)
1575             external_port0 = tcp.sport
1576         except:
1577             self.logger.error(ppp("Unexpected or invalid packet:", p))
1578             raise
1579
1580         # host1 to out
1581         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1582              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1583              TCP(sport=port_in, dport=port_out))
1584         self.pg0.add_stream(p)
1585         self.pg_enable_capture(self.pg_interfaces)
1586         self.pg_start()
1587         capture = self.pg1.get_capture(1)
1588         p = capture[0]
1589         try:
1590             ip = p[IP]
1591             tcp = p[TCP]
1592             self.assertEqual(ip.src, nat_ip)
1593             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1594             self.assertEqual(tcp.dport, port_out)
1595             external_port1 = tcp.sport
1596         except:
1597             self.logger.error(ppp("Unexpected or invalid packet:", p))
1598             raise
1599
1600         dms = self.vapi.snat_det_map_dump()
1601         self.assertEqual(1, len(dms))
1602         self.assertEqual(2, dms[0].ses_num)
1603
1604         # out to host0
1605         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1607              TCP(sport=port_out, dport=external_port0))
1608         self.pg1.add_stream(p)
1609         self.pg_enable_capture(self.pg_interfaces)
1610         self.pg_start()
1611         capture = self.pg0.get_capture(1)
1612         p = capture[0]
1613         try:
1614             ip = p[IP]
1615             tcp = p[TCP]
1616             self.assertEqual(ip.src, self.pg1.remote_ip4)
1617             self.assertEqual(ip.dst, host0.ip4)
1618             self.assertEqual(tcp.dport, port_in)
1619             self.assertEqual(tcp.sport, port_out)
1620         except:
1621             self.logger.error(ppp("Unexpected or invalid packet:", p))
1622             raise
1623
1624         # out to host1
1625         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1626              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1627              TCP(sport=port_out, dport=external_port1))
1628         self.pg1.add_stream(p)
1629         self.pg_enable_capture(self.pg_interfaces)
1630         self.pg_start()
1631         capture = self.pg0.get_capture(1)
1632         p = capture[0]
1633         try:
1634             ip = p[IP]
1635             tcp = p[TCP]
1636             self.assertEqual(ip.src, self.pg1.remote_ip4)
1637             self.assertEqual(ip.dst, host1.ip4)
1638             self.assertEqual(tcp.dport, port_in)
1639             self.assertEqual(tcp.sport, port_out)
1640         except:
1641             self.logger.error(ppp("Unexpected or invalid packet", p))
1642             raise
1643
1644     def test_tcp_session_close_detection_in(self):
1645         """ CGNAT TCP session close initiated from inside network """
1646         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1647                                    32,
1648                                    socket.inet_aton(self.snat_addr),
1649                                    32)
1650         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1651         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1652                                                  is_inside=0)
1653
1654         self.initiate_tcp_session(self.pg0, self.pg1)
1655
1656         # close the session from inside
1657         try:
1658             # FIN packet in -> out
1659             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1660                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1661                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1662                      flags="F"))
1663             self.pg0.add_stream(p)
1664             self.pg_enable_capture(self.pg_interfaces)
1665             self.pg_start()
1666             self.pg1.get_capture(1)
1667
1668             pkts = []
1669
1670             # ACK packet out -> in
1671             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1672                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1673                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1674                      flags="A"))
1675             pkts.append(p)
1676
1677             # FIN packet out -> in
1678             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1679                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1680                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1681                      flags="F"))
1682             pkts.append(p)
1683
1684             self.pg1.add_stream(pkts)
1685             self.pg_enable_capture(self.pg_interfaces)
1686             self.pg_start()
1687             self.pg0.get_capture(2)
1688
1689             # ACK packet in -> out
1690             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1691                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1692                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1693                      flags="A"))
1694             self.pg0.add_stream(p)
1695             self.pg_enable_capture(self.pg_interfaces)
1696             self.pg_start()
1697             self.pg1.get_capture(1)
1698
1699             # Check if snat closed the session
1700             dms = self.vapi.snat_det_map_dump()
1701             self.assertEqual(0, dms[0].ses_num)
1702         except:
1703             self.logger.error("TCP session termination failed")
1704             raise
1705
1706     def test_tcp_session_close_detection_out(self):
1707         """ CGNAT TCP session close initiated from outside network """
1708         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1709                                    32,
1710                                    socket.inet_aton(self.snat_addr),
1711                                    32)
1712         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1713         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1714                                                  is_inside=0)
1715
1716         self.initiate_tcp_session(self.pg0, self.pg1)
1717
1718         # close the session from outside
1719         try:
1720             # FIN packet out -> in
1721             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1722                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1723                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1724                      flags="F"))
1725             self.pg1.add_stream(p)
1726             self.pg_enable_capture(self.pg_interfaces)
1727             self.pg_start()
1728             self.pg0.get_capture(1)
1729
1730             pkts = []
1731
1732             # ACK packet in -> out
1733             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1734                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1735                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1736                      flags="A"))
1737             pkts.append(p)
1738
1739             # ACK packet in -> out
1740             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1741                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1742                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1743                      flags="F"))
1744             pkts.append(p)
1745
1746             self.pg0.add_stream(pkts)
1747             self.pg_enable_capture(self.pg_interfaces)
1748             self.pg_start()
1749             self.pg1.get_capture(2)
1750
1751             # ACK packet out -> in
1752             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1753                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1754                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1755                      flags="A"))
1756             self.pg1.add_stream(p)
1757             self.pg_enable_capture(self.pg_interfaces)
1758             self.pg_start()
1759             self.pg0.get_capture(1)
1760
1761             # Check if snat closed the session
1762             dms = self.vapi.snat_det_map_dump()
1763             self.assertEqual(0, dms[0].ses_num)
1764         except:
1765             self.logger.error("TCP session termination failed")
1766             raise
1767
1768     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1769     def test_session_timeout(self):
1770         """ CGNAT session timeouts """
1771         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1772                                    32,
1773                                    socket.inet_aton(self.snat_addr),
1774                                    32)
1775         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1776         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1777                                                  is_inside=0)
1778
1779         self.initiate_tcp_session(self.pg0, self.pg1)
1780         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
1781         pkts = self.create_stream_in(self.pg0, self.pg1)
1782         self.pg0.add_stream(pkts)
1783         self.pg_enable_capture(self.pg_interfaces)
1784         self.pg_start()
1785         capture = self.pg1.get_capture(len(pkts))
1786         sleep(15)
1787
1788         dms = self.vapi.snat_det_map_dump()
1789         self.assertEqual(0, dms[0].ses_num)
1790
1791     def test_session_limit_per_user(self):
1792         """ CGNAT maximum 1000 sessions per user should be created """
1793         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1794                                    32,
1795                                    socket.inet_aton(self.snat_addr),
1796                                    32)
1797         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1798         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1799                                                  is_inside=0)
1800
1801         pkts = []
1802         for port in range(1025, 2025):
1803             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1804                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1805                  UDP(sport=port, dport=port))
1806             pkts.append(p)
1807
1808         self.pg0.add_stream(pkts)
1809         self.pg_enable_capture(self.pg_interfaces)
1810         self.pg_start()
1811         capture = self.pg1.get_capture(len(pkts))
1812
1813         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1814              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1815              UDP(sport=3000, dport=3000))
1816         self.pg0.add_stream(p)
1817         self.pg_enable_capture(self.pg_interfaces)
1818         self.pg_start()
1819         capture = self.pg1.assert_nothing_captured()
1820
1821         dms = self.vapi.snat_det_map_dump()
1822
1823         self.assertEqual(1000, dms[0].ses_num)
1824
1825     def clear_snat(self):
1826         """
1827         Clear SNAT configuration.
1828         """
1829         self.vapi.snat_det_set_timeouts()
1830         deterministic_mappings = self.vapi.snat_det_map_dump()
1831         for dsm in deterministic_mappings:
1832             self.vapi.snat_add_det_map(dsm.in_addr,
1833                                        dsm.in_plen,
1834                                        dsm.out_addr,
1835                                        dsm.out_plen,
1836                                        is_add=0)
1837
1838         interfaces = self.vapi.snat_interface_dump()
1839         for intf in interfaces:
1840             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
1841                                                      intf.is_inside,
1842                                                      is_add=0)
1843
1844     def tearDown(self):
1845         super(TestDeterministicNAT, self).tearDown()
1846         if not self.vpp_dead:
1847             self.logger.info(self.vapi.cli("show snat detail"))
1848             self.clear_snat()
1849
1850 if __name__ == '__main__':
1851     unittest.main(testRunner=VppTestRunner)