42b071931784d72378a448af9ab1bde767f59bed
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from util import ppp
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from time import sleep
15
16
17 class MethodHolder(VppTestCase):
18     """ SNAT create capture and verify method holder """
19
20     @classmethod
21     def setUpClass(cls):
22         super(MethodHolder, cls).setUpClass()
23
24     def tearDown(self):
25         super(MethodHolder, self).tearDown()
26
27     def create_stream_in(self, in_if, out_if, ttl=64):
28         """
29         Create packet stream for inside network
30
31         :param in_if: Inside interface
32         :param out_if: Outside interface
33         :param ttl: TTL of generated packets
34         """
35         pkts = []
36         # TCP
37         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
38              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
39              TCP(sport=self.tcp_port_in))
40         pkts.append(p)
41
42         # UDP
43         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
44              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
45              UDP(sport=self.udp_port_in))
46         pkts.append(p)
47
48         # ICMP
49         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
50              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
51              ICMP(id=self.icmp_id_in, type='echo-request'))
52         pkts.append(p)
53
54         return pkts
55
56     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
57         """
58         Create packet stream for outside network
59
60         :param out_if: Outside interface
61         :param dst_ip: Destination IP address (Default use global SNAT address)
62         :param ttl: TTL of generated packets
63         """
64         if dst_ip is None:
65             dst_ip = self.snat_addr
66         pkts = []
67         # TCP
68         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
69              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
70              TCP(dport=self.tcp_port_out))
71         pkts.append(p)
72
73         # UDP
74         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
75              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
76              UDP(dport=self.udp_port_out))
77         pkts.append(p)
78
79         # ICMP
80         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
81              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
82              ICMP(id=self.icmp_id_out, type='echo-reply'))
83         pkts.append(p)
84
85         return pkts
86
87     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
88                            packet_num=3):
89         """
90         Verify captured packets on outside network
91
92         :param capture: Captured packets
93         :param nat_ip: Translated IP address (Default use global SNAT address)
94         :param same_port: Sorce port number is not translated (Default False)
95         :param packet_num: Expected number of packets (Default 3)
96         """
97         if nat_ip is None:
98             nat_ip = self.snat_addr
99         self.assertEqual(packet_num, len(capture))
100         for packet in capture:
101             try:
102                 self.assertEqual(packet[IP].src, nat_ip)
103                 if packet.haslayer(TCP):
104                     if same_port:
105                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
106                     else:
107                         self.assertNotEqual(
108                             packet[TCP].sport, self.tcp_port_in)
109                     self.tcp_port_out = packet[TCP].sport
110                 elif packet.haslayer(UDP):
111                     if same_port:
112                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
113                     else:
114                         self.assertNotEqual(
115                             packet[UDP].sport, self.udp_port_in)
116                     self.udp_port_out = packet[UDP].sport
117                 else:
118                     if same_port:
119                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
120                     else:
121                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
122                     self.icmp_id_out = packet[ICMP].id
123             except:
124                 self.logger.error(ppp("Unexpected or invalid packet "
125                                       "(outside network):", packet))
126                 raise
127
128     def verify_capture_in(self, capture, in_if, packet_num=3):
129         """
130         Verify captured packets on inside network
131
132         :param capture: Captured packets
133         :param in_if: Inside interface
134         :param packet_num: Expected number of packets (Default 3)
135         """
136         self.assertEqual(packet_num, len(capture))
137         for packet in capture:
138             try:
139                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
140                 if packet.haslayer(TCP):
141                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
142                 elif packet.haslayer(UDP):
143                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
144                 else:
145                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
146             except:
147                 self.logger.error(ppp("Unexpected or invalid packet "
148                                       "(inside network):", packet))
149                 raise
150
151     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
152         """
153         Verify captured packet that don't have to be translated
154
155         :param capture: Captured packets
156         :param ingress_if: Ingress interface
157         :param egress_if: Egress interface
158         """
159         for packet in capture:
160             try:
161                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
162                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
163                 if packet.haslayer(TCP):
164                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
165                 elif packet.haslayer(UDP):
166                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
167                 else:
168                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
169             except:
170                 self.logger.error(ppp("Unexpected or invalid packet "
171                                       "(inside network):", packet))
172                 raise
173
174     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
175                                             packet_num=3, icmp_type=11):
176         """
177         Verify captured packets with ICMP errors on outside network
178
179         :param capture: Captured packets
180         :param src_ip: Translated IP address or IP address of VPP
181                        (Default use global SNAT address)
182         :param packet_num: Expected number of packets (Default 3)
183         :param icmp_type: Type of error ICMP packet
184                           we are expecting (Default 11)
185         """
186         if src_ip is None:
187             src_ip = self.snat_addr
188         self.assertEqual(packet_num, len(capture))
189         for packet in capture:
190             try:
191                 self.assertEqual(packet[IP].src, src_ip)
192                 self.assertTrue(packet.haslayer(ICMP))
193                 icmp = packet[ICMP]
194                 self.assertEqual(icmp.type, icmp_type)
195                 self.assertTrue(icmp.haslayer(IPerror))
196                 inner_ip = icmp[IPerror]
197                 if inner_ip.haslayer(TCPerror):
198                     self.assertEqual(inner_ip[TCPerror].dport,
199                                      self.tcp_port_out)
200                 elif inner_ip.haslayer(UDPerror):
201                     self.assertEqual(inner_ip[UDPerror].dport,
202                                      self.udp_port_out)
203                 else:
204                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
205             except:
206                 self.logger.error(ppp("Unexpected or invalid packet "
207                                       "(outside network):", packet))
208                 raise
209
210     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
211                                            icmp_type=11):
212         """
213         Verify captured packets with ICMP errors on inside network
214
215         :param capture: Captured packets
216         :param in_if: Inside interface
217         :param packet_num: Expected number of packets (Default 3)
218         :param icmp_type: Type of error ICMP packet
219                           we are expecting (Default 11)
220         """
221         self.assertEqual(packet_num, len(capture))
222         for packet in capture:
223             try:
224                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
225                 self.assertTrue(packet.haslayer(ICMP))
226                 icmp = packet[ICMP]
227                 self.assertEqual(icmp.type, icmp_type)
228                 self.assertTrue(icmp.haslayer(IPerror))
229                 inner_ip = icmp[IPerror]
230                 if inner_ip.haslayer(TCPerror):
231                     self.assertEqual(inner_ip[TCPerror].sport,
232                                      self.tcp_port_in)
233                 elif inner_ip.haslayer(UDPerror):
234                     self.assertEqual(inner_ip[UDPerror].sport,
235                                      self.udp_port_in)
236                 else:
237                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
238             except:
239                 self.logger.error(ppp("Unexpected or invalid packet "
240                                       "(inside network):", packet))
241                 raise
242
243     def verify_ipfix_nat44_ses(self, data):
244         """
245         Verify IPFIX NAT44 session create/delete event
246
247         :param data: Decoded IPFIX data records
248         """
249         nat44_ses_create_num = 0
250         nat44_ses_delete_num = 0
251         self.assertEqual(6, len(data))
252         for record in data:
253             # natEvent
254             self.assertIn(ord(record[230]), [4, 5])
255             if ord(record[230]) == 4:
256                 nat44_ses_create_num += 1
257             else:
258                 nat44_ses_delete_num += 1
259             # sourceIPv4Address
260             self.assertEqual(self.pg0.remote_ip4n, record[8])
261             # postNATSourceIPv4Address
262             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
263                              record[225])
264             # ingressVRFID
265             self.assertEqual(struct.pack("!I", 0), record[234])
266             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
267             if IP_PROTOS.icmp == ord(record[4]):
268                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
269                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
270                                  record[227])
271             elif IP_PROTOS.tcp == ord(record[4]):
272                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
273                                  record[7])
274                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
275                                  record[227])
276             elif IP_PROTOS.udp == ord(record[4]):
277                 self.assertEqual(struct.pack("!H", self.udp_port_in),
278                                  record[7])
279                 self.assertEqual(struct.pack("!H", self.udp_port_out),
280                                  record[227])
281             else:
282                 self.fail("Invalid protocol")
283         self.assertEqual(3, nat44_ses_create_num)
284         self.assertEqual(3, nat44_ses_delete_num)
285
286     def verify_ipfix_addr_exhausted(self, data):
287         """
288         Verify IPFIX NAT addresses event
289
290         :param data: Decoded IPFIX data records
291         """
292         self.assertEqual(1, len(data))
293         record = data[0]
294         # natEvent
295         self.assertEqual(ord(record[230]), 3)
296         # natPoolID
297         self.assertEqual(struct.pack("!I", 0), record[283])
298
299
300 class TestSNAT(MethodHolder):
301     """ SNAT Test Cases """
302
303     @classmethod
304     def setUpClass(cls):
305         super(TestSNAT, cls).setUpClass()
306
307         try:
308             cls.tcp_port_in = 6303
309             cls.tcp_port_out = 6303
310             cls.udp_port_in = 6304
311             cls.udp_port_out = 6304
312             cls.icmp_id_in = 6305
313             cls.icmp_id_out = 6305
314             cls.snat_addr = '10.0.0.3'
315
316             cls.create_pg_interfaces(range(8))
317             cls.interfaces = list(cls.pg_interfaces[0:4])
318
319             for i in cls.interfaces:
320                 i.admin_up()
321                 i.config_ip4()
322                 i.resolve_arp()
323
324             cls.pg0.generate_remote_hosts(2)
325             cls.pg0.configure_ipv4_neighbors()
326
327             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
328
329             cls.pg4._local_ip4 = "172.16.255.1"
330             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
331             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
332             cls.pg4.set_table_ip4(10)
333             cls.pg5._local_ip4 = "172.16.255.3"
334             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
335             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
336             cls.pg5.set_table_ip4(10)
337             cls.pg6._local_ip4 = "172.16.255.1"
338             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
339             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
340             cls.pg6.set_table_ip4(20)
341             for i in cls.overlapping_interfaces:
342                 i.config_ip4()
343                 i.admin_up()
344                 i.resolve_arp()
345
346             cls.pg7.admin_up()
347
348         except Exception:
349             super(TestSNAT, cls).tearDownClass()
350             raise
351
352     def clear_snat(self):
353         """
354         Clear SNAT configuration.
355         """
356         if self.pg7.has_ip4_config:
357             self.pg7.unconfig_ip4()
358
359         interfaces = self.vapi.snat_interface_addr_dump()
360         for intf in interfaces:
361             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
362
363         self.vapi.snat_ipfix(enable=0)
364
365         interfaces = self.vapi.snat_interface_dump()
366         for intf in interfaces:
367             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
368                                                      intf.is_inside,
369                                                      is_add=0)
370
371         static_mappings = self.vapi.snat_static_mapping_dump()
372         for sm in static_mappings:
373             self.vapi.snat_add_static_mapping(sm.local_ip_address,
374                                               sm.external_ip_address,
375                                               local_port=sm.local_port,
376                                               external_port=sm.external_port,
377                                               addr_only=sm.addr_only,
378                                               vrf_id=sm.vrf_id,
379                                               protocol=sm.protocol,
380                                               is_add=0)
381
382         adresses = self.vapi.snat_address_dump()
383         for addr in adresses:
384             self.vapi.snat_add_address_range(addr.ip_address,
385                                              addr.ip_address,
386                                              is_add=0)
387
388     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
389                                 local_port=0, external_port=0, vrf_id=0,
390                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
391                                 proto=0):
392         """
393         Add/delete S-NAT static mapping
394
395         :param local_ip: Local IP address
396         :param external_ip: External IP address
397         :param local_port: Local port number (Optional)
398         :param external_port: External port number (Optional)
399         :param vrf_id: VRF ID (Default 0)
400         :param is_add: 1 if add, 0 if delete (Default add)
401         :param external_sw_if_index: External interface instead of IP address
402         :param proto: IP protocol (Mandatory if port specified)
403         """
404         addr_only = 1
405         if local_port and external_port:
406             addr_only = 0
407         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
408         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
409         self.vapi.snat_add_static_mapping(
410             l_ip,
411             e_ip,
412             external_sw_if_index,
413             local_port,
414             external_port,
415             addr_only,
416             vrf_id,
417             proto,
418             is_add)
419
420     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
421         """
422         Add/delete S-NAT address
423
424         :param ip: IP address
425         :param is_add: 1 if add, 0 if delete (Default add)
426         """
427         snat_addr = socket.inet_pton(socket.AF_INET, ip)
428         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
429                                          vrf_id=vrf_id)
430
431     def test_dynamic(self):
432         """ SNAT dynamic translation test """
433
434         self.snat_add_address(self.snat_addr)
435         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
436         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
437                                                  is_inside=0)
438
439         # in2out
440         pkts = self.create_stream_in(self.pg0, self.pg1)
441         self.pg0.add_stream(pkts)
442         self.pg_enable_capture(self.pg_interfaces)
443         self.pg_start()
444         capture = self.pg1.get_capture(len(pkts))
445         self.verify_capture_out(capture)
446
447         # out2in
448         pkts = self.create_stream_out(self.pg1)
449         self.pg1.add_stream(pkts)
450         self.pg_enable_capture(self.pg_interfaces)
451         self.pg_start()
452         capture = self.pg0.get_capture(len(pkts))
453         self.verify_capture_in(capture, self.pg0)
454
455     def test_dynamic_icmp_errors_in2out_ttl_1(self):
456         """ SNAT handling of client packets with TTL=1 """
457
458         self.snat_add_address(self.snat_addr)
459         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
460         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
461                                                  is_inside=0)
462
463         # Client side - generate traffic
464         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
465         self.pg0.add_stream(pkts)
466         self.pg_enable_capture(self.pg_interfaces)
467         self.pg_start()
468
469         # Client side - verify ICMP type 11 packets
470         capture = self.pg0.get_capture(len(pkts))
471         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
472
473     def test_dynamic_icmp_errors_out2in_ttl_1(self):
474         """ SNAT handling of server packets with TTL=1 """
475
476         self.snat_add_address(self.snat_addr)
477         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
478         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
479                                                  is_inside=0)
480
481         # Client side - create sessions
482         pkts = self.create_stream_in(self.pg0, self.pg1)
483         self.pg0.add_stream(pkts)
484         self.pg_enable_capture(self.pg_interfaces)
485         self.pg_start()
486
487         # Server side - generate traffic
488         capture = self.pg1.get_capture(len(pkts))
489         self.verify_capture_out(capture)
490         pkts = self.create_stream_out(self.pg1, ttl=1)
491         self.pg1.add_stream(pkts)
492         self.pg_enable_capture(self.pg_interfaces)
493         self.pg_start()
494
495         # Server side - verify ICMP type 11 packets
496         capture = self.pg1.get_capture(len(pkts))
497         self.verify_capture_out_with_icmp_errors(capture,
498                                                  src_ip=self.pg1.local_ip4)
499
500     def test_dynamic_icmp_errors_in2out_ttl_2(self):
501         """ SNAT handling of error responses to client packets with TTL=2 """
502
503         self.snat_add_address(self.snat_addr)
504         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
506                                                  is_inside=0)
507
508         # Client side - generate traffic
509         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
510         self.pg0.add_stream(pkts)
511         self.pg_enable_capture(self.pg_interfaces)
512         self.pg_start()
513
514         # Server side - simulate ICMP type 11 response
515         capture = self.pg1.get_capture(len(pkts))
516         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
517                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
518                 ICMP(type=11) / packet[IP] for packet in capture]
519         self.pg1.add_stream(pkts)
520         self.pg_enable_capture(self.pg_interfaces)
521         self.pg_start()
522
523         # Client side - verify ICMP type 11 packets
524         capture = self.pg0.get_capture(len(pkts))
525         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
526
527     def test_dynamic_icmp_errors_out2in_ttl_2(self):
528         """ SNAT handling of error responses to server packets with TTL=2 """
529
530         self.snat_add_address(self.snat_addr)
531         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
533                                                  is_inside=0)
534
535         # Client side - create sessions
536         pkts = self.create_stream_in(self.pg0, self.pg1)
537         self.pg0.add_stream(pkts)
538         self.pg_enable_capture(self.pg_interfaces)
539         self.pg_start()
540
541         # Server side - generate traffic
542         capture = self.pg1.get_capture(len(pkts))
543         self.verify_capture_out(capture)
544         pkts = self.create_stream_out(self.pg1, ttl=2)
545         self.pg1.add_stream(pkts)
546         self.pg_enable_capture(self.pg_interfaces)
547         self.pg_start()
548
549         # Client side - simulate ICMP type 11 response
550         capture = self.pg0.get_capture(len(pkts))
551         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
552                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
553                 ICMP(type=11) / packet[IP] for packet in capture]
554         self.pg0.add_stream(pkts)
555         self.pg_enable_capture(self.pg_interfaces)
556         self.pg_start()
557
558         # Server side - verify ICMP type 11 packets
559         capture = self.pg1.get_capture(len(pkts))
560         self.verify_capture_out_with_icmp_errors(capture)
561
562     def test_ping_out_interface_from_outside(self):
563         """ Ping SNAT out interface from outside network """
564
565         self.snat_add_address(self.snat_addr)
566         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
567         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
568                                                  is_inside=0)
569
570         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
571              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
572              ICMP(id=self.icmp_id_out, type='echo-request'))
573         pkts = [p]
574         self.pg1.add_stream(pkts)
575         self.pg_enable_capture(self.pg_interfaces)
576         self.pg_start()
577         capture = self.pg1.get_capture(len(pkts))
578         self.assertEqual(1, len(capture))
579         packet = capture[0]
580         try:
581             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
582             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
583             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
584             self.assertEqual(packet[ICMP].type, 0)  # echo reply
585         except:
586             self.logger.error(ppp("Unexpected or invalid packet "
587                                   "(outside network):", packet))
588             raise
589
590     def test_ping_internal_host_from_outside(self):
591         """ Ping internal host from outside network """
592
593         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
594         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
595         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
596                                                  is_inside=0)
597
598         # out2in
599         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
600                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
601                ICMP(id=self.icmp_id_out, type='echo-request'))
602         self.pg1.add_stream(pkt)
603         self.pg_enable_capture(self.pg_interfaces)
604         self.pg_start()
605         capture = self.pg0.get_capture(1)
606         self.verify_capture_in(capture, self.pg0, packet_num=1)
607         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
608
609         # in2out
610         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
611                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
612                ICMP(id=self.icmp_id_in, type='echo-reply'))
613         self.pg0.add_stream(pkt)
614         self.pg_enable_capture(self.pg_interfaces)
615         self.pg_start()
616         capture = self.pg1.get_capture(1)
617         self.verify_capture_out(capture, same_port=True, packet_num=1)
618         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
619
620     def test_static_in(self):
621         """ SNAT 1:1 NAT initialized from inside network """
622
623         nat_ip = "10.0.0.10"
624         self.tcp_port_out = 6303
625         self.udp_port_out = 6304
626         self.icmp_id_out = 6305
627
628         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
629         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
630         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
631                                                  is_inside=0)
632
633         # in2out
634         pkts = self.create_stream_in(self.pg0, self.pg1)
635         self.pg0.add_stream(pkts)
636         self.pg_enable_capture(self.pg_interfaces)
637         self.pg_start()
638         capture = self.pg1.get_capture(len(pkts))
639         self.verify_capture_out(capture, nat_ip, True)
640
641         # out2in
642         pkts = self.create_stream_out(self.pg1, nat_ip)
643         self.pg1.add_stream(pkts)
644         self.pg_enable_capture(self.pg_interfaces)
645         self.pg_start()
646         capture = self.pg0.get_capture(len(pkts))
647         self.verify_capture_in(capture, self.pg0)
648
649     def test_static_out(self):
650         """ SNAT 1:1 NAT initialized from outside network """
651
652         nat_ip = "10.0.0.20"
653         self.tcp_port_out = 6303
654         self.udp_port_out = 6304
655         self.icmp_id_out = 6305
656
657         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
658         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
659         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
660                                                  is_inside=0)
661
662         # out2in
663         pkts = self.create_stream_out(self.pg1, nat_ip)
664         self.pg1.add_stream(pkts)
665         self.pg_enable_capture(self.pg_interfaces)
666         self.pg_start()
667         capture = self.pg0.get_capture(len(pkts))
668         self.verify_capture_in(capture, self.pg0)
669
670         # in2out
671         pkts = self.create_stream_in(self.pg0, self.pg1)
672         self.pg0.add_stream(pkts)
673         self.pg_enable_capture(self.pg_interfaces)
674         self.pg_start()
675         capture = self.pg1.get_capture(len(pkts))
676         self.verify_capture_out(capture, nat_ip, True)
677
678     def test_static_with_port_in(self):
679         """ SNAT 1:1 NAT with port initialized from inside network """
680
681         self.tcp_port_out = 3606
682         self.udp_port_out = 3607
683         self.icmp_id_out = 3608
684
685         self.snat_add_address(self.snat_addr)
686         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
687                                      self.tcp_port_in, self.tcp_port_out,
688                                      proto=IP_PROTOS.tcp)
689         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
690                                      self.udp_port_in, self.udp_port_out,
691                                      proto=IP_PROTOS.udp)
692         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
693                                      self.icmp_id_in, self.icmp_id_out,
694                                      proto=IP_PROTOS.icmp)
695         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
696         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
697                                                  is_inside=0)
698
699         # in2out
700         pkts = self.create_stream_in(self.pg0, self.pg1)
701         self.pg0.add_stream(pkts)
702         self.pg_enable_capture(self.pg_interfaces)
703         self.pg_start()
704         capture = self.pg1.get_capture(len(pkts))
705         self.verify_capture_out(capture)
706
707         # out2in
708         pkts = self.create_stream_out(self.pg1)
709         self.pg1.add_stream(pkts)
710         self.pg_enable_capture(self.pg_interfaces)
711         self.pg_start()
712         capture = self.pg0.get_capture(len(pkts))
713         self.verify_capture_in(capture, self.pg0)
714
715     def test_static_with_port_out(self):
716         """ SNAT 1:1 NAT with port initialized from outside network """
717
718         self.tcp_port_out = 30606
719         self.udp_port_out = 30607
720         self.icmp_id_out = 30608
721
722         self.snat_add_address(self.snat_addr)
723         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
724                                      self.tcp_port_in, self.tcp_port_out,
725                                      proto=IP_PROTOS.tcp)
726         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
727                                      self.udp_port_in, self.udp_port_out,
728                                      proto=IP_PROTOS.udp)
729         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
730                                      self.icmp_id_in, self.icmp_id_out,
731                                      proto=IP_PROTOS.icmp)
732         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
733         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
734                                                  is_inside=0)
735
736         # out2in
737         pkts = self.create_stream_out(self.pg1)
738         self.pg1.add_stream(pkts)
739         self.pg_enable_capture(self.pg_interfaces)
740         self.pg_start()
741         capture = self.pg0.get_capture(len(pkts))
742         self.verify_capture_in(capture, self.pg0)
743
744         # in2out
745         pkts = self.create_stream_in(self.pg0, self.pg1)
746         self.pg0.add_stream(pkts)
747         self.pg_enable_capture(self.pg_interfaces)
748         self.pg_start()
749         capture = self.pg1.get_capture(len(pkts))
750         self.verify_capture_out(capture)
751
752     def test_static_vrf_aware(self):
753         """ SNAT 1:1 NAT VRF awareness """
754
755         nat_ip1 = "10.0.0.30"
756         nat_ip2 = "10.0.0.40"
757         self.tcp_port_out = 6303
758         self.udp_port_out = 6304
759         self.icmp_id_out = 6305
760
761         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
762                                      vrf_id=10)
763         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
764                                      vrf_id=10)
765         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
766                                                  is_inside=0)
767         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
768         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
769
770         # inside interface VRF match SNAT static mapping VRF
771         pkts = self.create_stream_in(self.pg4, self.pg3)
772         self.pg4.add_stream(pkts)
773         self.pg_enable_capture(self.pg_interfaces)
774         self.pg_start()
775         capture = self.pg3.get_capture(len(pkts))
776         self.verify_capture_out(capture, nat_ip1, True)
777
778         # inside interface VRF don't match SNAT static mapping VRF (packets
779         # are dropped)
780         pkts = self.create_stream_in(self.pg0, self.pg3)
781         self.pg0.add_stream(pkts)
782         self.pg_enable_capture(self.pg_interfaces)
783         self.pg_start()
784         self.pg3.assert_nothing_captured()
785
786     def test_multiple_inside_interfaces(self):
787         """ SNAT multiple inside interfaces (non-overlapping address space) """
788
789         self.snat_add_address(self.snat_addr)
790         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
791         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
792         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
793                                                  is_inside=0)
794
795         # between two S-NAT inside interfaces (no translation)
796         pkts = self.create_stream_in(self.pg0, self.pg1)
797         self.pg0.add_stream(pkts)
798         self.pg_enable_capture(self.pg_interfaces)
799         self.pg_start()
800         capture = self.pg1.get_capture(len(pkts))
801         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
802
803         # from S-NAT inside to interface without S-NAT feature (no translation)
804         pkts = self.create_stream_in(self.pg0, self.pg2)
805         self.pg0.add_stream(pkts)
806         self.pg_enable_capture(self.pg_interfaces)
807         self.pg_start()
808         capture = self.pg2.get_capture(len(pkts))
809         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
810
811         # in2out 1st interface
812         pkts = self.create_stream_in(self.pg0, self.pg3)
813         self.pg0.add_stream(pkts)
814         self.pg_enable_capture(self.pg_interfaces)
815         self.pg_start()
816         capture = self.pg3.get_capture(len(pkts))
817         self.verify_capture_out(capture)
818
819         # out2in 1st interface
820         pkts = self.create_stream_out(self.pg3)
821         self.pg3.add_stream(pkts)
822         self.pg_enable_capture(self.pg_interfaces)
823         self.pg_start()
824         capture = self.pg0.get_capture(len(pkts))
825         self.verify_capture_in(capture, self.pg0)
826
827         # in2out 2nd interface
828         pkts = self.create_stream_in(self.pg1, self.pg3)
829         self.pg1.add_stream(pkts)
830         self.pg_enable_capture(self.pg_interfaces)
831         self.pg_start()
832         capture = self.pg3.get_capture(len(pkts))
833         self.verify_capture_out(capture)
834
835         # out2in 2nd interface
836         pkts = self.create_stream_out(self.pg3)
837         self.pg3.add_stream(pkts)
838         self.pg_enable_capture(self.pg_interfaces)
839         self.pg_start()
840         capture = self.pg1.get_capture(len(pkts))
841         self.verify_capture_in(capture, self.pg1)
842
843     def test_inside_overlapping_interfaces(self):
844         """ SNAT multiple inside interfaces with overlapping address space """
845
846         static_nat_ip = "10.0.0.10"
847         self.snat_add_address(self.snat_addr)
848         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
849                                                  is_inside=0)
850         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
851         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
852         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
853         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
854                                      vrf_id=20)
855
856         # between S-NAT inside interfaces with same VRF (no translation)
857         pkts = self.create_stream_in(self.pg4, self.pg5)
858         self.pg4.add_stream(pkts)
859         self.pg_enable_capture(self.pg_interfaces)
860         self.pg_start()
861         capture = self.pg5.get_capture(len(pkts))
862         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
863
864         # between S-NAT inside interfaces with different VRF (hairpinning)
865         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
866              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
867              TCP(sport=1234, dport=5678))
868         self.pg4.add_stream(p)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         capture = self.pg6.get_capture(1)
872         p = capture[0]
873         try:
874             ip = p[IP]
875             tcp = p[TCP]
876             self.assertEqual(ip.src, self.snat_addr)
877             self.assertEqual(ip.dst, self.pg6.remote_ip4)
878             self.assertNotEqual(tcp.sport, 1234)
879             self.assertEqual(tcp.dport, 5678)
880         except:
881             self.logger.error(ppp("Unexpected or invalid packet:", p))
882             raise
883
884         # in2out 1st interface
885         pkts = self.create_stream_in(self.pg4, self.pg3)
886         self.pg4.add_stream(pkts)
887         self.pg_enable_capture(self.pg_interfaces)
888         self.pg_start()
889         capture = self.pg3.get_capture(len(pkts))
890         self.verify_capture_out(capture)
891
892         # out2in 1st interface
893         pkts = self.create_stream_out(self.pg3)
894         self.pg3.add_stream(pkts)
895         self.pg_enable_capture(self.pg_interfaces)
896         self.pg_start()
897         capture = self.pg4.get_capture(len(pkts))
898         self.verify_capture_in(capture, self.pg4)
899
900         # in2out 2nd interface
901         pkts = self.create_stream_in(self.pg5, self.pg3)
902         self.pg5.add_stream(pkts)
903         self.pg_enable_capture(self.pg_interfaces)
904         self.pg_start()
905         capture = self.pg3.get_capture(len(pkts))
906         self.verify_capture_out(capture)
907
908         # out2in 2nd interface
909         pkts = self.create_stream_out(self.pg3)
910         self.pg3.add_stream(pkts)
911         self.pg_enable_capture(self.pg_interfaces)
912         self.pg_start()
913         capture = self.pg5.get_capture(len(pkts))
914         self.verify_capture_in(capture, self.pg5)
915
916         # pg5 session dump
917         addresses = self.vapi.snat_address_dump()
918         self.assertEqual(len(addresses), 1)
919         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
920         self.assertEqual(len(sessions), 3)
921         for session in sessions:
922             self.assertFalse(session.is_static)
923             self.assertEqual(session.inside_ip_address[0:4],
924                              self.pg5.remote_ip4n)
925             self.assertEqual(session.outside_ip_address,
926                              addresses[0].ip_address)
927         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
928         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
929         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
930         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
931         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
932         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
933         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
934         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
935         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
936
937         # in2out 3rd interface
938         pkts = self.create_stream_in(self.pg6, self.pg3)
939         self.pg6.add_stream(pkts)
940         self.pg_enable_capture(self.pg_interfaces)
941         self.pg_start()
942         capture = self.pg3.get_capture(len(pkts))
943         self.verify_capture_out(capture, static_nat_ip, True)
944
945         # out2in 3rd interface
946         pkts = self.create_stream_out(self.pg3, static_nat_ip)
947         self.pg3.add_stream(pkts)
948         self.pg_enable_capture(self.pg_interfaces)
949         self.pg_start()
950         capture = self.pg6.get_capture(len(pkts))
951         self.verify_capture_in(capture, self.pg6)
952
953         # general user and session dump verifications
954         users = self.vapi.snat_user_dump()
955         self.assertTrue(len(users) >= 3)
956         addresses = self.vapi.snat_address_dump()
957         self.assertEqual(len(addresses), 1)
958         for user in users:
959             sessions = self.vapi.snat_user_session_dump(user.ip_address,
960                                                         user.vrf_id)
961             for session in sessions:
962                 self.assertEqual(user.ip_address, session.inside_ip_address)
963                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
964                 self.assertTrue(session.protocol in
965                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
966                                  IP_PROTOS.icmp])
967
968         # pg4 session dump
969         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
970         self.assertTrue(len(sessions) >= 4)
971         for session in sessions:
972             self.assertFalse(session.is_static)
973             self.assertEqual(session.inside_ip_address[0:4],
974                              self.pg4.remote_ip4n)
975             self.assertEqual(session.outside_ip_address,
976                              addresses[0].ip_address)
977
978         # pg6 session dump
979         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
980         self.assertTrue(len(sessions) >= 3)
981         for session in sessions:
982             self.assertTrue(session.is_static)
983             self.assertEqual(session.inside_ip_address[0:4],
984                              self.pg6.remote_ip4n)
985             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
986                              map(int, static_nat_ip.split('.')))
987             self.assertTrue(session.inside_port in
988                             [self.tcp_port_in, self.udp_port_in,
989                              self.icmp_id_in])
990
991     def test_hairpinning(self):
992         """ SNAT hairpinning """
993
994         host = self.pg0.remote_hosts[0]
995         server = self.pg0.remote_hosts[1]
996         host_in_port = 1234
997         host_out_port = 0
998         server_in_port = 5678
999         server_out_port = 8765
1000
1001         self.snat_add_address(self.snat_addr)
1002         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1003         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1004                                                  is_inside=0)
1005         # add static mapping for server
1006         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1007                                      server_in_port, server_out_port,
1008                                      proto=IP_PROTOS.tcp)
1009
1010         # send packet from host to server
1011         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1012              IP(src=host.ip4, dst=self.snat_addr) /
1013              TCP(sport=host_in_port, dport=server_out_port))
1014         self.pg0.add_stream(p)
1015         self.pg_enable_capture(self.pg_interfaces)
1016         self.pg_start()
1017         capture = self.pg0.get_capture(1)
1018         p = capture[0]
1019         try:
1020             ip = p[IP]
1021             tcp = p[TCP]
1022             self.assertEqual(ip.src, self.snat_addr)
1023             self.assertEqual(ip.dst, server.ip4)
1024             self.assertNotEqual(tcp.sport, host_in_port)
1025             self.assertEqual(tcp.dport, server_in_port)
1026             host_out_port = tcp.sport
1027         except:
1028             self.logger.error(ppp("Unexpected or invalid packet:", p))
1029             raise
1030
1031         # send reply from server to host
1032         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1033              IP(src=server.ip4, dst=self.snat_addr) /
1034              TCP(sport=server_in_port, dport=host_out_port))
1035         self.pg0.add_stream(p)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         capture = self.pg0.get_capture(1)
1039         p = capture[0]
1040         try:
1041             ip = p[IP]
1042             tcp = p[TCP]
1043             self.assertEqual(ip.src, self.snat_addr)
1044             self.assertEqual(ip.dst, host.ip4)
1045             self.assertEqual(tcp.sport, server_out_port)
1046             self.assertEqual(tcp.dport, host_in_port)
1047         except:
1048             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1049             raise
1050
1051     def test_max_translations_per_user(self):
1052         """ MAX translations per user - recycle the least recently used """
1053
1054         self.snat_add_address(self.snat_addr)
1055         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1056         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1057                                                  is_inside=0)
1058
1059         # get maximum number of translations per user
1060         snat_config = self.vapi.snat_show_config()
1061
1062         # send more than maximum number of translations per user packets
1063         pkts_num = snat_config.max_translations_per_user + 5
1064         pkts = []
1065         for port in range(0, pkts_num):
1066             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1067                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1068                  TCP(sport=1025 + port))
1069             pkts.append(p)
1070         self.pg0.add_stream(pkts)
1071         self.pg_enable_capture(self.pg_interfaces)
1072         self.pg_start()
1073
1074         # verify number of translated packet
1075         self.pg1.get_capture(pkts_num)
1076
1077     def test_interface_addr(self):
1078         """ Acquire SNAT addresses from interface """
1079         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1080
1081         # no address in NAT pool
1082         adresses = self.vapi.snat_address_dump()
1083         self.assertEqual(0, len(adresses))
1084
1085         # configure interface address and check NAT address pool
1086         self.pg7.config_ip4()
1087         adresses = self.vapi.snat_address_dump()
1088         self.assertEqual(1, len(adresses))
1089         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1090
1091         # remove interface address and check NAT address pool
1092         self.pg7.unconfig_ip4()
1093         adresses = self.vapi.snat_address_dump()
1094         self.assertEqual(0, len(adresses))
1095
1096     def test_interface_addr_static_mapping(self):
1097         """ Static mapping with addresses from interface """
1098         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1099         self.snat_add_static_mapping('1.2.3.4',
1100                                      external_sw_if_index=self.pg7.sw_if_index)
1101
1102         # static mappings with external interface
1103         static_mappings = self.vapi.snat_static_mapping_dump()
1104         self.assertEqual(1, len(static_mappings))
1105         self.assertEqual(self.pg7.sw_if_index,
1106                          static_mappings[0].external_sw_if_index)
1107
1108         # configure interface address and check static mappings
1109         self.pg7.config_ip4()
1110         static_mappings = self.vapi.snat_static_mapping_dump()
1111         self.assertEqual(1, len(static_mappings))
1112         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1113                          self.pg7.local_ip4n)
1114         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1115
1116         # remove interface address and check static mappings
1117         self.pg7.unconfig_ip4()
1118         static_mappings = self.vapi.snat_static_mapping_dump()
1119         self.assertEqual(0, len(static_mappings))
1120
1121     def test_ipfix_nat44_sess(self):
1122         """ S-NAT IPFIX logging NAT44 session created/delted """
1123         self.snat_add_address(self.snat_addr)
1124         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1125         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1126                                                  is_inside=0)
1127         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1128                                      src_address=self.pg3.local_ip4n,
1129                                      path_mtu=512,
1130                                      template_interval=10)
1131         self.vapi.snat_ipfix()
1132
1133         pkts = self.create_stream_in(self.pg0, self.pg1)
1134         self.pg0.add_stream(pkts)
1135         self.pg_enable_capture(self.pg_interfaces)
1136         self.pg_start()
1137         capture = self.pg1.get_capture(len(pkts))
1138         self.verify_capture_out(capture)
1139         self.snat_add_address(self.snat_addr, is_add=0)
1140         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1141         capture = self.pg3.get_capture(3)
1142         ipfix = IPFIXDecoder()
1143         # first load template
1144         for p in capture:
1145             self.assertTrue(p.haslayer(IPFIX))
1146             if p.haslayer(Template):
1147                 ipfix.add_template(p.getlayer(Template))
1148         # verify events in data set
1149         for p in capture:
1150             if p.haslayer(Data):
1151                 data = ipfix.decode_data_set(p.getlayer(Set))
1152                 self.verify_ipfix_nat44_ses(data)
1153
1154     def test_ipfix_addr_exhausted(self):
1155         """ S-NAT IPFIX logging NAT addresses exhausted """
1156         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1157         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1158                                                  is_inside=0)
1159         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1160                                      src_address=self.pg3.local_ip4n,
1161                                      path_mtu=512,
1162                                      template_interval=10)
1163         self.vapi.snat_ipfix()
1164
1165         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1166              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1167              TCP(sport=3025))
1168         self.pg0.add_stream(p)
1169         self.pg_enable_capture(self.pg_interfaces)
1170         self.pg_start()
1171         capture = self.pg1.get_capture(0)
1172         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1173         capture = self.pg3.get_capture(3)
1174         ipfix = IPFIXDecoder()
1175         # first load template
1176         for p in capture:
1177             self.assertTrue(p.haslayer(IPFIX))
1178             if p.haslayer(Template):
1179                 ipfix.add_template(p.getlayer(Template))
1180         # verify events in data set
1181         for p in capture:
1182             if p.haslayer(Data):
1183                 data = ipfix.decode_data_set(p.getlayer(Set))
1184                 self.verify_ipfix_addr_exhausted(data)
1185
1186     def test_pool_addr_fib(self):
1187         """ S-NAT add pool addresses to FIB """
1188         static_addr = '10.0.0.10'
1189         self.snat_add_address(self.snat_addr)
1190         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1191         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1192                                                  is_inside=0)
1193         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1194
1195         # SNAT address
1196         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1197              ARP(op=ARP.who_has, pdst=self.snat_addr,
1198                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1199         self.pg1.add_stream(p)
1200         self.pg_enable_capture(self.pg_interfaces)
1201         self.pg_start()
1202         capture = self.pg1.get_capture(1)
1203         self.assertTrue(capture[0].haslayer(ARP))
1204         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1205
1206         # 1:1 NAT address
1207         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1208              ARP(op=ARP.who_has, pdst=static_addr,
1209                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1210         self.pg1.add_stream(p)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213         capture = self.pg1.get_capture(1)
1214         self.assertTrue(capture[0].haslayer(ARP))
1215         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1216
1217         # send ARP to non-SNAT interface
1218         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1219              ARP(op=ARP.who_has, pdst=self.snat_addr,
1220                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1221         self.pg2.add_stream(p)
1222         self.pg_enable_capture(self.pg_interfaces)
1223         self.pg_start()
1224         capture = self.pg1.get_capture(0)
1225
1226         # remove addresses and verify
1227         self.snat_add_address(self.snat_addr, is_add=0)
1228         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1229                                      is_add=0)
1230
1231         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1232              ARP(op=ARP.who_has, pdst=self.snat_addr,
1233                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1234         self.pg1.add_stream(p)
1235         self.pg_enable_capture(self.pg_interfaces)
1236         self.pg_start()
1237         capture = self.pg1.get_capture(0)
1238
1239         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1240              ARP(op=ARP.who_has, pdst=static_addr,
1241                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1242         self.pg1.add_stream(p)
1243         self.pg_enable_capture(self.pg_interfaces)
1244         self.pg_start()
1245         capture = self.pg1.get_capture(0)
1246
1247     def test_vrf_mode(self):
1248         """ S-NAT tenant VRF aware address pool mode """
1249
1250         vrf_id1 = 1
1251         vrf_id2 = 2
1252         nat_ip1 = "10.0.0.10"
1253         nat_ip2 = "10.0.0.11"
1254
1255         self.pg0.unconfig_ip4()
1256         self.pg1.unconfig_ip4()
1257         self.pg0.set_table_ip4(vrf_id1)
1258         self.pg1.set_table_ip4(vrf_id2)
1259         self.pg0.config_ip4()
1260         self.pg1.config_ip4()
1261
1262         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1263         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1264         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1265         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1266         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1267                                                  is_inside=0)
1268
1269         # first VRF
1270         pkts = self.create_stream_in(self.pg0, self.pg2)
1271         self.pg0.add_stream(pkts)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg2.get_capture(len(pkts))
1275         self.verify_capture_out(capture, nat_ip1)
1276
1277         # second VRF
1278         pkts = self.create_stream_in(self.pg1, self.pg2)
1279         self.pg1.add_stream(pkts)
1280         self.pg_enable_capture(self.pg_interfaces)
1281         self.pg_start()
1282         capture = self.pg2.get_capture(len(pkts))
1283         self.verify_capture_out(capture, nat_ip2)
1284
1285     def test_vrf_feature_independent(self):
1286         """ S-NAT tenant VRF independent address pool mode """
1287
1288         nat_ip1 = "10.0.0.10"
1289         nat_ip2 = "10.0.0.11"
1290
1291         self.snat_add_address(nat_ip1)
1292         self.snat_add_address(nat_ip2)
1293         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1294         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1295         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1296                                                  is_inside=0)
1297
1298         # first VRF
1299         pkts = self.create_stream_in(self.pg0, self.pg2)
1300         self.pg0.add_stream(pkts)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg2.get_capture(len(pkts))
1304         self.verify_capture_out(capture, nat_ip1)
1305
1306         # second VRF
1307         pkts = self.create_stream_in(self.pg1, self.pg2)
1308         self.pg1.add_stream(pkts)
1309         self.pg_enable_capture(self.pg_interfaces)
1310         self.pg_start()
1311         capture = self.pg2.get_capture(len(pkts))
1312         self.verify_capture_out(capture, nat_ip1)
1313
1314     def tearDown(self):
1315         super(TestSNAT, self).tearDown()
1316         if not self.vpp_dead:
1317             self.logger.info(self.vapi.cli("show snat verbose"))
1318             self.clear_snat()
1319
1320
1321 class TestDeterministicNAT(MethodHolder):
1322     """ Deterministic NAT Test Cases """
1323
1324     @classmethod
1325     def setUpConstants(cls):
1326         super(TestDeterministicNAT, cls).setUpConstants()
1327         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1328
1329     @classmethod
1330     def setUpClass(cls):
1331         super(TestDeterministicNAT, cls).setUpClass()
1332
1333         try:
1334             cls.tcp_port_in = 6303
1335             cls.tcp_port_out = 6303
1336             cls.udp_port_in = 6304
1337             cls.udp_port_out = 6304
1338             cls.icmp_id_in = 6305
1339             cls.snat_addr = '10.0.0.3'
1340
1341             cls.create_pg_interfaces(range(2))
1342             cls.interfaces = list(cls.pg_interfaces)
1343
1344             for i in cls.interfaces:
1345                 i.admin_up()
1346                 i.config_ip4()
1347                 i.resolve_arp()
1348
1349             cls.pg0.generate_remote_hosts(2)
1350             cls.pg0.configure_ipv4_neighbors()
1351
1352         except Exception:
1353             super(TestDeterministicNAT, cls).tearDownClass()
1354             raise
1355
1356     def create_stream_in(self, in_if, out_if, ttl=64):
1357         """
1358         Create packet stream for inside network
1359
1360         :param in_if: Inside interface
1361         :param out_if: Outside interface
1362         :param ttl: TTL of generated packets
1363         """
1364         pkts = []
1365         # TCP
1366         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1367              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1368              TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
1369         pkts.append(p)
1370
1371         # UDP
1372         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1373              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1374              UDP(sport=self.udp_port_in, dport=self.udp_port_out))
1375         pkts.append(p)
1376
1377         # ICMP
1378         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1379              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1380              ICMP(id=self.icmp_id_in, type='echo-request'))
1381         pkts.append(p)
1382
1383         return pkts
1384
1385     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1386         """
1387         Create packet stream for outside network
1388
1389         :param out_if: Outside interface
1390         :param dst_ip: Destination IP address (Default use global SNAT address)
1391         :param ttl: TTL of generated packets
1392         """
1393         if dst_ip is None:
1394             dst_ip = self.snat_addr
1395         pkts = []
1396         # TCP
1397         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1398              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1399              TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
1400         pkts.append(p)
1401
1402         # UDP
1403         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1404              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1405              UDP(dport=self.udp_external_port, sport=self.udp_port_out))
1406         pkts.append(p)
1407
1408         # ICMP
1409         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1410              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1411              ICMP(id=self.icmp_external_id, type='echo-reply'))
1412         pkts.append(p)
1413
1414         return pkts
1415
1416     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1417         """
1418         Verify captured packets on outside network
1419
1420         :param capture: Captured packets
1421         :param nat_ip: Translated IP address (Default use global SNAT address)
1422         :param same_port: Sorce port number is not translated (Default False)
1423         :param packet_num: Expected number of packets (Default 3)
1424         """
1425         if nat_ip is None:
1426             nat_ip = self.snat_addr
1427         self.assertEqual(packet_num, len(capture))
1428         for packet in capture:
1429             try:
1430                 self.assertEqual(packet[IP].src, nat_ip)
1431                 if packet.haslayer(TCP):
1432                     self.tcp_external_port = packet[TCP].sport
1433                 elif packet.haslayer(UDP):
1434                     self.udp_external_port = packet[UDP].sport
1435                 else:
1436                     self.icmp_external_id = packet[ICMP].id
1437             except:
1438                 self.logger.error(ppp("Unexpected or invalid packet "
1439                                       "(outside network):", packet))
1440                 raise
1441
1442     def initiate_tcp_session(self, in_if, out_if):
1443         """
1444         Initiates TCP session
1445
1446         :param in_if: Inside interface
1447         :param out_if: Outside interface
1448         """
1449         try:
1450             # SYN packet in->out
1451             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1452                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1453                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1454                      flags="S"))
1455             in_if.add_stream(p)
1456             self.pg_enable_capture(self.pg_interfaces)
1457             self.pg_start()
1458             capture = out_if.get_capture(1)
1459             p = capture[0]
1460             self.tcp_external_port = p[TCP].sport
1461
1462             # SYN + ACK packet out->in
1463             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1464                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1465                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1466                      flags="SA"))
1467             out_if.add_stream(p)
1468             self.pg_enable_capture(self.pg_interfaces)
1469             self.pg_start()
1470             in_if.get_capture(1)
1471
1472             # ACK packet in->out
1473             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1474                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1475                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1476                      flags="A"))
1477             in_if.add_stream(p)
1478             self.pg_enable_capture(self.pg_interfaces)
1479             self.pg_start()
1480             out_if.get_capture(1)
1481
1482         except:
1483             self.logger.error("TCP 3 way handshake failed")
1484             raise
1485
1486     def test_deterministic_mode(self):
1487         """ S-NAT run deterministic mode """
1488         in_addr = '172.16.255.0'
1489         out_addr = '172.17.255.50'
1490         in_addr_t = '172.16.255.20'
1491         in_addr_n = socket.inet_aton(in_addr)
1492         out_addr_n = socket.inet_aton(out_addr)
1493         in_addr_t_n = socket.inet_aton(in_addr_t)
1494         in_plen = 24
1495         out_plen = 32
1496
1497         snat_config = self.vapi.snat_show_config()
1498         self.assertEqual(1, snat_config.deterministic)
1499
1500         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1501
1502         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1503         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1504         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1505         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1506
1507         deterministic_mappings = self.vapi.snat_det_map_dump()
1508         self.assertEqual(len(deterministic_mappings), 1)
1509         dsm = deterministic_mappings[0]
1510         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1511         self.assertEqual(in_plen, dsm.in_plen)
1512         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1513         self.assertEqual(out_plen, dsm.out_plen)
1514
1515         self.clear_snat()
1516         deterministic_mappings = self.vapi.snat_det_map_dump()
1517         self.assertEqual(len(deterministic_mappings), 0)
1518
1519     def test_set_timeouts(self):
1520         """ Set deterministic NAT timeouts """
1521         timeouts_before = self.vapi.snat_det_get_timeouts()
1522
1523         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1524                                         timeouts_before.tcp_established + 10,
1525                                         timeouts_before.tcp_transitory + 10,
1526                                         timeouts_before.icmp + 10)
1527
1528         timeouts_after = self.vapi.snat_det_get_timeouts()
1529
1530         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1531         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1532         self.assertNotEqual(timeouts_before.tcp_established,
1533                             timeouts_after.tcp_established)
1534         self.assertNotEqual(timeouts_before.tcp_transitory,
1535                             timeouts_after.tcp_transitory)
1536
1537     def test_det_in(self):
1538         """ CGNAT translation test (TCP, UDP, ICMP) """
1539
1540         nat_ip = "10.0.0.10"
1541         self.tcp_port_out = 6303
1542         self.udp_port_out = 6304
1543         self.icmp_id_in = 6305
1544         self.tcp_external_port = 7303
1545         self.udp_external_port = 7304
1546         self.icmp_id_out = 7305
1547
1548         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1549                                    32,
1550                                    socket.inet_aton(nat_ip),
1551                                    32)
1552         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1553         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1554                                                  is_inside=0)
1555
1556         # in2out
1557         pkts = self.create_stream_in(self.pg0, self.pg1)
1558         self.pg0.add_stream(pkts)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg1.get_capture(len(pkts))
1562         self.verify_capture_out(capture, nat_ip)
1563
1564         # out2in
1565         pkts = self.create_stream_out(self.pg1, nat_ip)
1566         self.pg1.add_stream(pkts)
1567         self.pg_enable_capture(self.pg_interfaces)
1568         self.pg_start()
1569         capture = self.pg0.get_capture(len(pkts))
1570         self.verify_capture_in(capture, self.pg0)
1571
1572     def test_multiple_users(self):
1573         """ CGNAT multiple users """
1574
1575         nat_ip = "10.0.0.10"
1576         port_in = 80
1577         port_out = 6303
1578
1579         host0 = self.pg0.remote_hosts[0]
1580         host1 = self.pg0.remote_hosts[1]
1581
1582         self.vapi.snat_add_det_map(host0.ip4n,
1583                                    24,
1584                                    socket.inet_aton(nat_ip),
1585                                    32)
1586         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1587         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1588                                                  is_inside=0)
1589
1590         # host0 to out
1591         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1592              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1593              TCP(sport=port_in, dport=port_out))
1594         self.pg0.add_stream(p)
1595         self.pg_enable_capture(self.pg_interfaces)
1596         self.pg_start()
1597         capture = self.pg1.get_capture(1)
1598         p = capture[0]
1599         try:
1600             ip = p[IP]
1601             tcp = p[TCP]
1602             self.assertEqual(ip.src, nat_ip)
1603             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1604             self.assertEqual(tcp.dport, port_out)
1605             external_port0 = tcp.sport
1606         except:
1607             self.logger.error(ppp("Unexpected or invalid packet:", p))
1608             raise
1609
1610         # host1 to out
1611         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1612              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1613              TCP(sport=port_in, dport=port_out))
1614         self.pg0.add_stream(p)
1615         self.pg_enable_capture(self.pg_interfaces)
1616         self.pg_start()
1617         capture = self.pg1.get_capture(1)
1618         p = capture[0]
1619         try:
1620             ip = p[IP]
1621             tcp = p[TCP]
1622             self.assertEqual(ip.src, nat_ip)
1623             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1624             self.assertEqual(tcp.dport, port_out)
1625             external_port1 = tcp.sport
1626         except:
1627             self.logger.error(ppp("Unexpected or invalid packet:", p))
1628             raise
1629
1630         dms = self.vapi.snat_det_map_dump()
1631         self.assertEqual(1, len(dms))
1632         self.assertEqual(2, dms[0].ses_num)
1633
1634         # out to host0
1635         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1636              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1637              TCP(sport=port_out, dport=external_port0))
1638         self.pg1.add_stream(p)
1639         self.pg_enable_capture(self.pg_interfaces)
1640         self.pg_start()
1641         capture = self.pg0.get_capture(1)
1642         p = capture[0]
1643         try:
1644             ip = p[IP]
1645             tcp = p[TCP]
1646             self.assertEqual(ip.src, self.pg1.remote_ip4)
1647             self.assertEqual(ip.dst, host0.ip4)
1648             self.assertEqual(tcp.dport, port_in)
1649             self.assertEqual(tcp.sport, port_out)
1650         except:
1651             self.logger.error(ppp("Unexpected or invalid packet:", p))
1652             raise
1653
1654         # out to host1
1655         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1656              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1657              TCP(sport=port_out, dport=external_port1))
1658         self.pg1.add_stream(p)
1659         self.pg_enable_capture(self.pg_interfaces)
1660         self.pg_start()
1661         capture = self.pg0.get_capture(1)
1662         p = capture[0]
1663         try:
1664             ip = p[IP]
1665             tcp = p[TCP]
1666             self.assertEqual(ip.src, self.pg1.remote_ip4)
1667             self.assertEqual(ip.dst, host1.ip4)
1668             self.assertEqual(tcp.dport, port_in)
1669             self.assertEqual(tcp.sport, port_out)
1670         except:
1671             self.logger.error(ppp("Unexpected or invalid packet", p))
1672             raise
1673
1674     def test_tcp_session_close_detection_in(self):
1675         """ CGNAT TCP session close initiated from inside network """
1676         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1677                                    32,
1678                                    socket.inet_aton(self.snat_addr),
1679                                    32)
1680         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1681         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1682                                                  is_inside=0)
1683
1684         self.initiate_tcp_session(self.pg0, self.pg1)
1685
1686         # close the session from inside
1687         try:
1688             # FIN packet in -> out
1689             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1690                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1691                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1692                      flags="F"))
1693             self.pg0.add_stream(p)
1694             self.pg_enable_capture(self.pg_interfaces)
1695             self.pg_start()
1696             self.pg1.get_capture(1)
1697
1698             pkts = []
1699
1700             # ACK packet out -> in
1701             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1702                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1703                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1704                      flags="A"))
1705             pkts.append(p)
1706
1707             # FIN packet out -> in
1708             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1709                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1710                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1711                      flags="F"))
1712             pkts.append(p)
1713
1714             self.pg1.add_stream(pkts)
1715             self.pg_enable_capture(self.pg_interfaces)
1716             self.pg_start()
1717             self.pg0.get_capture(2)
1718
1719             # ACK packet in -> out
1720             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1721                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1722                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1723                      flags="A"))
1724             self.pg0.add_stream(p)
1725             self.pg_enable_capture(self.pg_interfaces)
1726             self.pg_start()
1727             self.pg1.get_capture(1)
1728
1729             # Check if snat closed the session
1730             dms = self.vapi.snat_det_map_dump()
1731             self.assertEqual(0, dms[0].ses_num)
1732         except:
1733             self.logger.error("TCP session termination failed")
1734             raise
1735
1736     def test_tcp_session_close_detection_out(self):
1737         """ CGNAT TCP session close initiated from outside network """
1738         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1739                                    32,
1740                                    socket.inet_aton(self.snat_addr),
1741                                    32)
1742         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1743         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1744                                                  is_inside=0)
1745
1746         self.initiate_tcp_session(self.pg0, self.pg1)
1747
1748         # close the session from outside
1749         try:
1750             # FIN packet out -> in
1751             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1752                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1753                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1754                      flags="F"))
1755             self.pg1.add_stream(p)
1756             self.pg_enable_capture(self.pg_interfaces)
1757             self.pg_start()
1758             self.pg0.get_capture(1)
1759
1760             pkts = []
1761
1762             # ACK packet in -> out
1763             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1764                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1765                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1766                      flags="A"))
1767             pkts.append(p)
1768
1769             # ACK packet in -> out
1770             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1771                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1772                  TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
1773                      flags="F"))
1774             pkts.append(p)
1775
1776             self.pg0.add_stream(pkts)
1777             self.pg_enable_capture(self.pg_interfaces)
1778             self.pg_start()
1779             self.pg1.get_capture(2)
1780
1781             # ACK packet out -> in
1782             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1783                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
1784                  TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
1785                      flags="A"))
1786             self.pg1.add_stream(p)
1787             self.pg_enable_capture(self.pg_interfaces)
1788             self.pg_start()
1789             self.pg0.get_capture(1)
1790
1791             # Check if snat closed the session
1792             dms = self.vapi.snat_det_map_dump()
1793             self.assertEqual(0, dms[0].ses_num)
1794         except:
1795             self.logger.error("TCP session termination failed")
1796             raise
1797
1798     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1799     def test_session_timeout(self):
1800         """ CGNAT session timeouts """
1801         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1802                                    32,
1803                                    socket.inet_aton(self.snat_addr),
1804                                    32)
1805         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1806         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1807                                                  is_inside=0)
1808
1809         self.initiate_tcp_session(self.pg0, self.pg1)
1810         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
1811         pkts = self.create_stream_in(self.pg0, self.pg1)
1812         self.pg0.add_stream(pkts)
1813         self.pg_enable_capture(self.pg_interfaces)
1814         self.pg_start()
1815         capture = self.pg1.get_capture(len(pkts))
1816         sleep(15)
1817
1818         dms = self.vapi.snat_det_map_dump()
1819         self.assertEqual(0, dms[0].ses_num)
1820
1821     def test_session_limit_per_user(self):
1822         """ CGNAT maximum 1000 sessions per user should be created """
1823         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1824                                    32,
1825                                    socket.inet_aton(self.snat_addr),
1826                                    32)
1827         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1828         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1829                                                  is_inside=0)
1830
1831         pkts = []
1832         for port in range(1025, 2025):
1833             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1834                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1835                  UDP(sport=port, dport=port))
1836             pkts.append(p)
1837
1838         self.pg0.add_stream(pkts)
1839         self.pg_enable_capture(self.pg_interfaces)
1840         self.pg_start()
1841         capture = self.pg1.get_capture(len(pkts))
1842
1843         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1844              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1845              UDP(sport=3000, dport=3000))
1846         self.pg0.add_stream(p)
1847         self.pg_enable_capture(self.pg_interfaces)
1848         self.pg_start()
1849         capture = self.pg1.assert_nothing_captured()
1850
1851         dms = self.vapi.snat_det_map_dump()
1852
1853         self.assertEqual(1000, dms[0].ses_num)
1854
1855     def clear_snat(self):
1856         """
1857         Clear SNAT configuration.
1858         """
1859         self.vapi.snat_det_set_timeouts()
1860         deterministic_mappings = self.vapi.snat_det_map_dump()
1861         for dsm in deterministic_mappings:
1862             self.vapi.snat_add_det_map(dsm.in_addr,
1863                                        dsm.in_plen,
1864                                        dsm.out_addr,
1865                                        dsm.out_plen,
1866                                        is_add=0)
1867
1868         interfaces = self.vapi.snat_interface_dump()
1869         for intf in interfaces:
1870             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
1871                                                      intf.is_inside,
1872                                                      is_add=0)
1873
1874     def tearDown(self):
1875         super(TestDeterministicNAT, self).tearDown()
1876         if not self.vpp_dead:
1877             self.logger.info(self.vapi.cli("show snat detail"))
1878             self.clear_snat()
1879
1880 if __name__ == '__main__':
1881     unittest.main(testRunner=VppTestRunner)