Packets recieved on VLAN-0 map to the main interface
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.l2 import Ether, ARP
11 from scapy.data import IP_PROTOS
12 from scapy.packet import bind_layers
13 from util import ppp
14 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
15 from time import sleep
16
17
18 class MethodHolder(VppTestCase):
19     """ SNAT create capture and verify method holder """
20
21     @classmethod
22     def setUpClass(cls):
23         super(MethodHolder, cls).setUpClass()
24
25     def tearDown(self):
26         super(MethodHolder, self).tearDown()
27
28     def create_stream_in(self, in_if, out_if, ttl=64):
29         """
30         Create packet stream for inside network
31
32         :param in_if: Inside interface
33         :param out_if: Outside interface
34         :param ttl: TTL of generated packets
35         """
36         pkts = []
37         # TCP
38         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
39              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
40              TCP(sport=self.tcp_port_in))
41         pkts.append(p)
42
43         # UDP
44         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
45              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
46              UDP(sport=self.udp_port_in))
47         pkts.append(p)
48
49         # ICMP
50         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
51              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
52              ICMP(id=self.icmp_id_in, type='echo-request'))
53         pkts.append(p)
54
55         return pkts
56
57     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
58         """
59         Create packet stream for outside network
60
61         :param out_if: Outside interface
62         :param dst_ip: Destination IP address (Default use global SNAT address)
63         :param ttl: TTL of generated packets
64         """
65         if dst_ip is None:
66             dst_ip = self.snat_addr
67         pkts = []
68         # TCP
69         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
70              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
71              TCP(dport=self.tcp_port_out))
72         pkts.append(p)
73
74         # UDP
75         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
76              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
77              UDP(dport=self.udp_port_out))
78         pkts.append(p)
79
80         # ICMP
81         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
82              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
83              ICMP(id=self.icmp_id_out, type='echo-reply'))
84         pkts.append(p)
85
86         return pkts
87
88     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
89                            packet_num=3):
90         """
91         Verify captured packets on outside network
92
93         :param capture: Captured packets
94         :param nat_ip: Translated IP address (Default use global SNAT address)
95         :param same_port: Sorce port number is not translated (Default False)
96         :param packet_num: Expected number of packets (Default 3)
97         """
98         if nat_ip is None:
99             nat_ip = self.snat_addr
100         self.assertEqual(packet_num, len(capture))
101         for packet in capture:
102             try:
103                 self.assertEqual(packet[IP].src, nat_ip)
104                 if packet.haslayer(TCP):
105                     if same_port:
106                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
107                     else:
108                         self.assertNotEqual(
109                             packet[TCP].sport, self.tcp_port_in)
110                     self.tcp_port_out = packet[TCP].sport
111                 elif packet.haslayer(UDP):
112                     if same_port:
113                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
114                     else:
115                         self.assertNotEqual(
116                             packet[UDP].sport, self.udp_port_in)
117                     self.udp_port_out = packet[UDP].sport
118                 else:
119                     if same_port:
120                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
121                     else:
122                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
123                     self.icmp_id_out = packet[ICMP].id
124             except:
125                 self.logger.error(ppp("Unexpected or invalid packet "
126                                       "(outside network):", packet))
127                 raise
128
129     def verify_capture_in(self, capture, in_if, packet_num=3):
130         """
131         Verify captured packets on inside network
132
133         :param capture: Captured packets
134         :param in_if: Inside interface
135         :param packet_num: Expected number of packets (Default 3)
136         """
137         self.assertEqual(packet_num, len(capture))
138         for packet in capture:
139             try:
140                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
141                 if packet.haslayer(TCP):
142                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
143                 elif packet.haslayer(UDP):
144                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
145                 else:
146                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
147             except:
148                 self.logger.error(ppp("Unexpected or invalid packet "
149                                       "(inside network):", packet))
150                 raise
151
152     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
153         """
154         Verify captured packet that don't have to be translated
155
156         :param capture: Captured packets
157         :param ingress_if: Ingress interface
158         :param egress_if: Egress interface
159         """
160         for packet in capture:
161             try:
162                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
163                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
164                 if packet.haslayer(TCP):
165                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
166                 elif packet.haslayer(UDP):
167                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
168                 else:
169                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
170             except:
171                 self.logger.error(ppp("Unexpected or invalid packet "
172                                       "(inside network):", packet))
173                 raise
174
175     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
176                                             packet_num=3, icmp_type=11):
177         """
178         Verify captured packets with ICMP errors on outside network
179
180         :param capture: Captured packets
181         :param src_ip: Translated IP address or IP address of VPP
182                        (Default use global SNAT address)
183         :param packet_num: Expected number of packets (Default 3)
184         :param icmp_type: Type of error ICMP packet
185                           we are expecting (Default 11)
186         """
187         if src_ip is None:
188             src_ip = self.snat_addr
189         self.assertEqual(packet_num, len(capture))
190         for packet in capture:
191             try:
192                 self.assertEqual(packet[IP].src, src_ip)
193                 self.assertTrue(packet.haslayer(ICMP))
194                 icmp = packet[ICMP]
195                 self.assertEqual(icmp.type, icmp_type)
196                 self.assertTrue(icmp.haslayer(IPerror))
197                 inner_ip = icmp[IPerror]
198                 if inner_ip.haslayer(TCPerror):
199                     self.assertEqual(inner_ip[TCPerror].dport,
200                                      self.tcp_port_out)
201                 elif inner_ip.haslayer(UDPerror):
202                     self.assertEqual(inner_ip[UDPerror].dport,
203                                      self.udp_port_out)
204                 else:
205                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
206             except:
207                 self.logger.error(ppp("Unexpected or invalid packet "
208                                       "(outside network):", packet))
209                 raise
210
211     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
212                                            icmp_type=11):
213         """
214         Verify captured packets with ICMP errors on inside network
215
216         :param capture: Captured packets
217         :param in_if: Inside interface
218         :param packet_num: Expected number of packets (Default 3)
219         :param icmp_type: Type of error ICMP packet
220                           we are expecting (Default 11)
221         """
222         self.assertEqual(packet_num, len(capture))
223         for packet in capture:
224             try:
225                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
226                 self.assertTrue(packet.haslayer(ICMP))
227                 icmp = packet[ICMP]
228                 self.assertEqual(icmp.type, icmp_type)
229                 self.assertTrue(icmp.haslayer(IPerror))
230                 inner_ip = icmp[IPerror]
231                 if inner_ip.haslayer(TCPerror):
232                     self.assertEqual(inner_ip[TCPerror].sport,
233                                      self.tcp_port_in)
234                 elif inner_ip.haslayer(UDPerror):
235                     self.assertEqual(inner_ip[UDPerror].sport,
236                                      self.udp_port_in)
237                 else:
238                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
239             except:
240                 self.logger.error(ppp("Unexpected or invalid packet "
241                                       "(inside network):", packet))
242                 raise
243
244     def verify_ipfix_nat44_ses(self, data):
245         """
246         Verify IPFIX NAT44 session create/delete event
247
248         :param data: Decoded IPFIX data records
249         """
250         nat44_ses_create_num = 0
251         nat44_ses_delete_num = 0
252         self.assertEqual(6, len(data))
253         for record in data:
254             # natEvent
255             self.assertIn(ord(record[230]), [4, 5])
256             if ord(record[230]) == 4:
257                 nat44_ses_create_num += 1
258             else:
259                 nat44_ses_delete_num += 1
260             # sourceIPv4Address
261             self.assertEqual(self.pg0.remote_ip4n, record[8])
262             # postNATSourceIPv4Address
263             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
264                              record[225])
265             # ingressVRFID
266             self.assertEqual(struct.pack("!I", 0), record[234])
267             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
268             if IP_PROTOS.icmp == ord(record[4]):
269                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
270                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
271                                  record[227])
272             elif IP_PROTOS.tcp == ord(record[4]):
273                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
274                                  record[7])
275                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
276                                  record[227])
277             elif IP_PROTOS.udp == ord(record[4]):
278                 self.assertEqual(struct.pack("!H", self.udp_port_in),
279                                  record[7])
280                 self.assertEqual(struct.pack("!H", self.udp_port_out),
281                                  record[227])
282             else:
283                 self.fail("Invalid protocol")
284         self.assertEqual(3, nat44_ses_create_num)
285         self.assertEqual(3, nat44_ses_delete_num)
286
287     def verify_ipfix_addr_exhausted(self, data):
288         """
289         Verify IPFIX NAT addresses event
290
291         :param data: Decoded IPFIX data records
292         """
293         self.assertEqual(1, len(data))
294         record = data[0]
295         # natEvent
296         self.assertEqual(ord(record[230]), 3)
297         # natPoolID
298         self.assertEqual(struct.pack("!I", 0), record[283])
299
300
301 class TestSNAT(MethodHolder):
302     """ SNAT Test Cases """
303
304     @classmethod
305     def setUpClass(cls):
306         super(TestSNAT, cls).setUpClass()
307
308         try:
309             cls.tcp_port_in = 6303
310             cls.tcp_port_out = 6303
311             cls.udp_port_in = 6304
312             cls.udp_port_out = 6304
313             cls.icmp_id_in = 6305
314             cls.icmp_id_out = 6305
315             cls.snat_addr = '10.0.0.3'
316             cls.ipfix_src_port = 4739
317             cls.ipfix_domain_id = 1
318
319             cls.create_pg_interfaces(range(9))
320             cls.interfaces = list(cls.pg_interfaces[0:4])
321
322             for i in cls.interfaces:
323                 i.admin_up()
324                 i.config_ip4()
325                 i.resolve_arp()
326
327             cls.pg0.generate_remote_hosts(3)
328             cls.pg0.configure_ipv4_neighbors()
329
330             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
331
332             cls.pg4._local_ip4 = "172.16.255.1"
333             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
334             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
335             cls.pg4.set_table_ip4(10)
336             cls.pg5._local_ip4 = "172.16.255.3"
337             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
338             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
339             cls.pg5.set_table_ip4(10)
340             cls.pg6._local_ip4 = "172.16.255.1"
341             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
342             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
343             cls.pg6.set_table_ip4(20)
344             for i in cls.overlapping_interfaces:
345                 i.config_ip4()
346                 i.admin_up()
347                 i.resolve_arp()
348
349             cls.pg7.admin_up()
350             cls.pg8.admin_up()
351
352         except Exception:
353             super(TestSNAT, cls).tearDownClass()
354             raise
355
356     def clear_snat(self):
357         """
358         Clear SNAT configuration.
359         """
360         # I found no elegant way to do this
361         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
362                                    dst_address_length=32,
363                                    next_hop_address=self.pg7.remote_ip4n,
364                                    next_hop_sw_if_index=self.pg7.sw_if_index,
365                                    is_add=0)
366         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
367                                    dst_address_length=32,
368                                    next_hop_address=self.pg8.remote_ip4n,
369                                    next_hop_sw_if_index=self.pg8.sw_if_index,
370                                    is_add=0)
371
372         for intf in [self.pg7, self.pg8]:
373             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
374             for n in neighbors:
375                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
376                                               n.mac_address,
377                                               n.ip_address,
378                                               is_add=0)
379
380         if self.pg7.has_ip4_config:
381             self.pg7.unconfig_ip4()
382
383         interfaces = self.vapi.snat_interface_addr_dump()
384         for intf in interfaces:
385             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
386
387         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
388                              domain_id=self.ipfix_domain_id)
389         self.ipfix_src_port = 4739
390         self.ipfix_domain_id = 1
391
392         interfaces = self.vapi.snat_interface_dump()
393         for intf in interfaces:
394             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
395                                                      intf.is_inside,
396                                                      is_add=0)
397
398         static_mappings = self.vapi.snat_static_mapping_dump()
399         for sm in static_mappings:
400             self.vapi.snat_add_static_mapping(sm.local_ip_address,
401                                               sm.external_ip_address,
402                                               local_port=sm.local_port,
403                                               external_port=sm.external_port,
404                                               addr_only=sm.addr_only,
405                                               vrf_id=sm.vrf_id,
406                                               protocol=sm.protocol,
407                                               is_add=0)
408
409         adresses = self.vapi.snat_address_dump()
410         for addr in adresses:
411             self.vapi.snat_add_address_range(addr.ip_address,
412                                              addr.ip_address,
413                                              is_add=0)
414
415     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
416                                 local_port=0, external_port=0, vrf_id=0,
417                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
418                                 proto=0):
419         """
420         Add/delete S-NAT static mapping
421
422         :param local_ip: Local IP address
423         :param external_ip: External IP address
424         :param local_port: Local port number (Optional)
425         :param external_port: External port number (Optional)
426         :param vrf_id: VRF ID (Default 0)
427         :param is_add: 1 if add, 0 if delete (Default add)
428         :param external_sw_if_index: External interface instead of IP address
429         :param proto: IP protocol (Mandatory if port specified)
430         """
431         addr_only = 1
432         if local_port and external_port:
433             addr_only = 0
434         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
435         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
436         self.vapi.snat_add_static_mapping(
437             l_ip,
438             e_ip,
439             external_sw_if_index,
440             local_port,
441             external_port,
442             addr_only,
443             vrf_id,
444             proto,
445             is_add)
446
447     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
448         """
449         Add/delete S-NAT address
450
451         :param ip: IP address
452         :param is_add: 1 if add, 0 if delete (Default add)
453         """
454         snat_addr = socket.inet_pton(socket.AF_INET, ip)
455         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
456                                          vrf_id=vrf_id)
457
458     def test_dynamic(self):
459         """ SNAT dynamic translation test """
460
461         self.snat_add_address(self.snat_addr)
462         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
463         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
464                                                  is_inside=0)
465
466         # in2out
467         pkts = self.create_stream_in(self.pg0, self.pg1)
468         self.pg0.add_stream(pkts)
469         self.pg_enable_capture(self.pg_interfaces)
470         self.pg_start()
471         capture = self.pg1.get_capture(len(pkts))
472         self.verify_capture_out(capture)
473
474         # out2in
475         pkts = self.create_stream_out(self.pg1)
476         self.pg1.add_stream(pkts)
477         self.pg_enable_capture(self.pg_interfaces)
478         self.pg_start()
479         capture = self.pg0.get_capture(len(pkts))
480         self.verify_capture_in(capture, self.pg0)
481
482     def test_dynamic_icmp_errors_in2out_ttl_1(self):
483         """ SNAT handling of client packets with TTL=1 """
484
485         self.snat_add_address(self.snat_addr)
486         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
487         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
488                                                  is_inside=0)
489
490         # Client side - generate traffic
491         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
492         self.pg0.add_stream(pkts)
493         self.pg_enable_capture(self.pg_interfaces)
494         self.pg_start()
495
496         # Client side - verify ICMP type 11 packets
497         capture = self.pg0.get_capture(len(pkts))
498         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
499
500     def test_dynamic_icmp_errors_out2in_ttl_1(self):
501         """ SNAT handling of server packets with TTL=1 """
502
503         self.snat_add_address(self.snat_addr)
504         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
505         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
506                                                  is_inside=0)
507
508         # Client side - create sessions
509         pkts = self.create_stream_in(self.pg0, self.pg1)
510         self.pg0.add_stream(pkts)
511         self.pg_enable_capture(self.pg_interfaces)
512         self.pg_start()
513
514         # Server side - generate traffic
515         capture = self.pg1.get_capture(len(pkts))
516         self.verify_capture_out(capture)
517         pkts = self.create_stream_out(self.pg1, ttl=1)
518         self.pg1.add_stream(pkts)
519         self.pg_enable_capture(self.pg_interfaces)
520         self.pg_start()
521
522         # Server side - verify ICMP type 11 packets
523         capture = self.pg1.get_capture(len(pkts))
524         self.verify_capture_out_with_icmp_errors(capture,
525                                                  src_ip=self.pg1.local_ip4)
526
527     def test_dynamic_icmp_errors_in2out_ttl_2(self):
528         """ SNAT handling of error responses to client packets with TTL=2 """
529
530         self.snat_add_address(self.snat_addr)
531         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
532         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
533                                                  is_inside=0)
534
535         # Client side - generate traffic
536         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
537         self.pg0.add_stream(pkts)
538         self.pg_enable_capture(self.pg_interfaces)
539         self.pg_start()
540
541         # Server side - simulate ICMP type 11 response
542         capture = self.pg1.get_capture(len(pkts))
543         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
544                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
545                 ICMP(type=11) / packet[IP] for packet in capture]
546         self.pg1.add_stream(pkts)
547         self.pg_enable_capture(self.pg_interfaces)
548         self.pg_start()
549
550         # Client side - verify ICMP type 11 packets
551         capture = self.pg0.get_capture(len(pkts))
552         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
553
554     def test_dynamic_icmp_errors_out2in_ttl_2(self):
555         """ SNAT handling of error responses to server packets with TTL=2 """
556
557         self.snat_add_address(self.snat_addr)
558         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
560                                                  is_inside=0)
561
562         # Client side - create sessions
563         pkts = self.create_stream_in(self.pg0, self.pg1)
564         self.pg0.add_stream(pkts)
565         self.pg_enable_capture(self.pg_interfaces)
566         self.pg_start()
567
568         # Server side - generate traffic
569         capture = self.pg1.get_capture(len(pkts))
570         self.verify_capture_out(capture)
571         pkts = self.create_stream_out(self.pg1, ttl=2)
572         self.pg1.add_stream(pkts)
573         self.pg_enable_capture(self.pg_interfaces)
574         self.pg_start()
575
576         # Client side - simulate ICMP type 11 response
577         capture = self.pg0.get_capture(len(pkts))
578         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
579                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
580                 ICMP(type=11) / packet[IP] for packet in capture]
581         self.pg0.add_stream(pkts)
582         self.pg_enable_capture(self.pg_interfaces)
583         self.pg_start()
584
585         # Server side - verify ICMP type 11 packets
586         capture = self.pg1.get_capture(len(pkts))
587         self.verify_capture_out_with_icmp_errors(capture)
588
589     def test_ping_out_interface_from_outside(self):
590         """ Ping SNAT out interface from outside network """
591
592         self.snat_add_address(self.snat_addr)
593         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
594         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
595                                                  is_inside=0)
596
597         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
598              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
599              ICMP(id=self.icmp_id_out, type='echo-request'))
600         pkts = [p]
601         self.pg1.add_stream(pkts)
602         self.pg_enable_capture(self.pg_interfaces)
603         self.pg_start()
604         capture = self.pg1.get_capture(len(pkts))
605         self.assertEqual(1, len(capture))
606         packet = capture[0]
607         try:
608             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
609             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
610             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
611             self.assertEqual(packet[ICMP].type, 0)  # echo reply
612         except:
613             self.logger.error(ppp("Unexpected or invalid packet "
614                                   "(outside network):", packet))
615             raise
616
617     def test_ping_internal_host_from_outside(self):
618         """ Ping internal host from outside network """
619
620         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
621         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
622         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
623                                                  is_inside=0)
624
625         # out2in
626         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
627                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
628                ICMP(id=self.icmp_id_out, type='echo-request'))
629         self.pg1.add_stream(pkt)
630         self.pg_enable_capture(self.pg_interfaces)
631         self.pg_start()
632         capture = self.pg0.get_capture(1)
633         self.verify_capture_in(capture, self.pg0, packet_num=1)
634         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
635
636         # in2out
637         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
638                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
639                ICMP(id=self.icmp_id_in, type='echo-reply'))
640         self.pg0.add_stream(pkt)
641         self.pg_enable_capture(self.pg_interfaces)
642         self.pg_start()
643         capture = self.pg1.get_capture(1)
644         self.verify_capture_out(capture, same_port=True, packet_num=1)
645         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
646
647     def test_static_in(self):
648         """ SNAT 1:1 NAT initialized from inside network """
649
650         nat_ip = "10.0.0.10"
651         self.tcp_port_out = 6303
652         self.udp_port_out = 6304
653         self.icmp_id_out = 6305
654
655         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
656         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
657         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
658                                                  is_inside=0)
659
660         # in2out
661         pkts = self.create_stream_in(self.pg0, self.pg1)
662         self.pg0.add_stream(pkts)
663         self.pg_enable_capture(self.pg_interfaces)
664         self.pg_start()
665         capture = self.pg1.get_capture(len(pkts))
666         self.verify_capture_out(capture, nat_ip, True)
667
668         # out2in
669         pkts = self.create_stream_out(self.pg1, nat_ip)
670         self.pg1.add_stream(pkts)
671         self.pg_enable_capture(self.pg_interfaces)
672         self.pg_start()
673         capture = self.pg0.get_capture(len(pkts))
674         self.verify_capture_in(capture, self.pg0)
675
676     def test_static_out(self):
677         """ SNAT 1:1 NAT initialized from outside network """
678
679         nat_ip = "10.0.0.20"
680         self.tcp_port_out = 6303
681         self.udp_port_out = 6304
682         self.icmp_id_out = 6305
683
684         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
685         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
686         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
687                                                  is_inside=0)
688
689         # out2in
690         pkts = self.create_stream_out(self.pg1, nat_ip)
691         self.pg1.add_stream(pkts)
692         self.pg_enable_capture(self.pg_interfaces)
693         self.pg_start()
694         capture = self.pg0.get_capture(len(pkts))
695         self.verify_capture_in(capture, self.pg0)
696
697         # in2out
698         pkts = self.create_stream_in(self.pg0, self.pg1)
699         self.pg0.add_stream(pkts)
700         self.pg_enable_capture(self.pg_interfaces)
701         self.pg_start()
702         capture = self.pg1.get_capture(len(pkts))
703         self.verify_capture_out(capture, nat_ip, True)
704
705     def test_static_with_port_in(self):
706         """ SNAT 1:1 NAT with port initialized from inside network """
707
708         self.tcp_port_out = 3606
709         self.udp_port_out = 3607
710         self.icmp_id_out = 3608
711
712         self.snat_add_address(self.snat_addr)
713         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
714                                      self.tcp_port_in, self.tcp_port_out,
715                                      proto=IP_PROTOS.tcp)
716         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
717                                      self.udp_port_in, self.udp_port_out,
718                                      proto=IP_PROTOS.udp)
719         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
720                                      self.icmp_id_in, self.icmp_id_out,
721                                      proto=IP_PROTOS.icmp)
722         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
723         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
724                                                  is_inside=0)
725
726         # in2out
727         pkts = self.create_stream_in(self.pg0, self.pg1)
728         self.pg0.add_stream(pkts)
729         self.pg_enable_capture(self.pg_interfaces)
730         self.pg_start()
731         capture = self.pg1.get_capture(len(pkts))
732         self.verify_capture_out(capture)
733
734         # out2in
735         pkts = self.create_stream_out(self.pg1)
736         self.pg1.add_stream(pkts)
737         self.pg_enable_capture(self.pg_interfaces)
738         self.pg_start()
739         capture = self.pg0.get_capture(len(pkts))
740         self.verify_capture_in(capture, self.pg0)
741
742     def test_static_with_port_out(self):
743         """ SNAT 1:1 NAT with port initialized from outside network """
744
745         self.tcp_port_out = 30606
746         self.udp_port_out = 30607
747         self.icmp_id_out = 30608
748
749         self.snat_add_address(self.snat_addr)
750         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
751                                      self.tcp_port_in, self.tcp_port_out,
752                                      proto=IP_PROTOS.tcp)
753         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
754                                      self.udp_port_in, self.udp_port_out,
755                                      proto=IP_PROTOS.udp)
756         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
757                                      self.icmp_id_in, self.icmp_id_out,
758                                      proto=IP_PROTOS.icmp)
759         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
760         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
761                                                  is_inside=0)
762
763         # out2in
764         pkts = self.create_stream_out(self.pg1)
765         self.pg1.add_stream(pkts)
766         self.pg_enable_capture(self.pg_interfaces)
767         self.pg_start()
768         capture = self.pg0.get_capture(len(pkts))
769         self.verify_capture_in(capture, self.pg0)
770
771         # in2out
772         pkts = self.create_stream_in(self.pg0, self.pg1)
773         self.pg0.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776         capture = self.pg1.get_capture(len(pkts))
777         self.verify_capture_out(capture)
778
779     def test_static_vrf_aware(self):
780         """ SNAT 1:1 NAT VRF awareness """
781
782         nat_ip1 = "10.0.0.30"
783         nat_ip2 = "10.0.0.40"
784         self.tcp_port_out = 6303
785         self.udp_port_out = 6304
786         self.icmp_id_out = 6305
787
788         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
789                                      vrf_id=10)
790         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
791                                      vrf_id=10)
792         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
793                                                  is_inside=0)
794         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
795         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
796
797         # inside interface VRF match SNAT static mapping VRF
798         pkts = self.create_stream_in(self.pg4, self.pg3)
799         self.pg4.add_stream(pkts)
800         self.pg_enable_capture(self.pg_interfaces)
801         self.pg_start()
802         capture = self.pg3.get_capture(len(pkts))
803         self.verify_capture_out(capture, nat_ip1, True)
804
805         # inside interface VRF don't match SNAT static mapping VRF (packets
806         # are dropped)
807         pkts = self.create_stream_in(self.pg0, self.pg3)
808         self.pg0.add_stream(pkts)
809         self.pg_enable_capture(self.pg_interfaces)
810         self.pg_start()
811         self.pg3.assert_nothing_captured()
812
813     def test_multiple_inside_interfaces(self):
814         """ SNAT multiple inside interfaces (non-overlapping address space) """
815
816         self.snat_add_address(self.snat_addr)
817         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
818         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
819         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
820                                                  is_inside=0)
821
822         # between two S-NAT inside interfaces (no translation)
823         pkts = self.create_stream_in(self.pg0, self.pg1)
824         self.pg0.add_stream(pkts)
825         self.pg_enable_capture(self.pg_interfaces)
826         self.pg_start()
827         capture = self.pg1.get_capture(len(pkts))
828         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
829
830         # from S-NAT inside to interface without S-NAT feature (no translation)
831         pkts = self.create_stream_in(self.pg0, self.pg2)
832         self.pg0.add_stream(pkts)
833         self.pg_enable_capture(self.pg_interfaces)
834         self.pg_start()
835         capture = self.pg2.get_capture(len(pkts))
836         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
837
838         # in2out 1st interface
839         pkts = self.create_stream_in(self.pg0, self.pg3)
840         self.pg0.add_stream(pkts)
841         self.pg_enable_capture(self.pg_interfaces)
842         self.pg_start()
843         capture = self.pg3.get_capture(len(pkts))
844         self.verify_capture_out(capture)
845
846         # out2in 1st interface
847         pkts = self.create_stream_out(self.pg3)
848         self.pg3.add_stream(pkts)
849         self.pg_enable_capture(self.pg_interfaces)
850         self.pg_start()
851         capture = self.pg0.get_capture(len(pkts))
852         self.verify_capture_in(capture, self.pg0)
853
854         # in2out 2nd interface
855         pkts = self.create_stream_in(self.pg1, self.pg3)
856         self.pg1.add_stream(pkts)
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pg_start()
859         capture = self.pg3.get_capture(len(pkts))
860         self.verify_capture_out(capture)
861
862         # out2in 2nd interface
863         pkts = self.create_stream_out(self.pg3)
864         self.pg3.add_stream(pkts)
865         self.pg_enable_capture(self.pg_interfaces)
866         self.pg_start()
867         capture = self.pg1.get_capture(len(pkts))
868         self.verify_capture_in(capture, self.pg1)
869
870     def test_inside_overlapping_interfaces(self):
871         """ SNAT multiple inside interfaces with overlapping address space """
872
873         static_nat_ip = "10.0.0.10"
874         self.snat_add_address(self.snat_addr)
875         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
876                                                  is_inside=0)
877         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
878         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
879         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
880         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
881                                      vrf_id=20)
882
883         # between S-NAT inside interfaces with same VRF (no translation)
884         pkts = self.create_stream_in(self.pg4, self.pg5)
885         self.pg4.add_stream(pkts)
886         self.pg_enable_capture(self.pg_interfaces)
887         self.pg_start()
888         capture = self.pg5.get_capture(len(pkts))
889         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
890
891         # between S-NAT inside interfaces with different VRF (hairpinning)
892         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
893              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
894              TCP(sport=1234, dport=5678))
895         self.pg4.add_stream(p)
896         self.pg_enable_capture(self.pg_interfaces)
897         self.pg_start()
898         capture = self.pg6.get_capture(1)
899         p = capture[0]
900         try:
901             ip = p[IP]
902             tcp = p[TCP]
903             self.assertEqual(ip.src, self.snat_addr)
904             self.assertEqual(ip.dst, self.pg6.remote_ip4)
905             self.assertNotEqual(tcp.sport, 1234)
906             self.assertEqual(tcp.dport, 5678)
907         except:
908             self.logger.error(ppp("Unexpected or invalid packet:", p))
909             raise
910
911         # in2out 1st interface
912         pkts = self.create_stream_in(self.pg4, self.pg3)
913         self.pg4.add_stream(pkts)
914         self.pg_enable_capture(self.pg_interfaces)
915         self.pg_start()
916         capture = self.pg3.get_capture(len(pkts))
917         self.verify_capture_out(capture)
918
919         # out2in 1st interface
920         pkts = self.create_stream_out(self.pg3)
921         self.pg3.add_stream(pkts)
922         self.pg_enable_capture(self.pg_interfaces)
923         self.pg_start()
924         capture = self.pg4.get_capture(len(pkts))
925         self.verify_capture_in(capture, self.pg4)
926
927         # in2out 2nd interface
928         pkts = self.create_stream_in(self.pg5, self.pg3)
929         self.pg5.add_stream(pkts)
930         self.pg_enable_capture(self.pg_interfaces)
931         self.pg_start()
932         capture = self.pg3.get_capture(len(pkts))
933         self.verify_capture_out(capture)
934
935         # out2in 2nd interface
936         pkts = self.create_stream_out(self.pg3)
937         self.pg3.add_stream(pkts)
938         self.pg_enable_capture(self.pg_interfaces)
939         self.pg_start()
940         capture = self.pg5.get_capture(len(pkts))
941         self.verify_capture_in(capture, self.pg5)
942
943         # pg5 session dump
944         addresses = self.vapi.snat_address_dump()
945         self.assertEqual(len(addresses), 1)
946         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
947         self.assertEqual(len(sessions), 3)
948         for session in sessions:
949             self.assertFalse(session.is_static)
950             self.assertEqual(session.inside_ip_address[0:4],
951                              self.pg5.remote_ip4n)
952             self.assertEqual(session.outside_ip_address,
953                              addresses[0].ip_address)
954         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
955         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
956         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
957         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
958         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
959         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
960         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
961         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
962         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
963
964         # in2out 3rd interface
965         pkts = self.create_stream_in(self.pg6, self.pg3)
966         self.pg6.add_stream(pkts)
967         self.pg_enable_capture(self.pg_interfaces)
968         self.pg_start()
969         capture = self.pg3.get_capture(len(pkts))
970         self.verify_capture_out(capture, static_nat_ip, True)
971
972         # out2in 3rd interface
973         pkts = self.create_stream_out(self.pg3, static_nat_ip)
974         self.pg3.add_stream(pkts)
975         self.pg_enable_capture(self.pg_interfaces)
976         self.pg_start()
977         capture = self.pg6.get_capture(len(pkts))
978         self.verify_capture_in(capture, self.pg6)
979
980         # general user and session dump verifications
981         users = self.vapi.snat_user_dump()
982         self.assertTrue(len(users) >= 3)
983         addresses = self.vapi.snat_address_dump()
984         self.assertEqual(len(addresses), 1)
985         for user in users:
986             sessions = self.vapi.snat_user_session_dump(user.ip_address,
987                                                         user.vrf_id)
988             for session in sessions:
989                 self.assertEqual(user.ip_address, session.inside_ip_address)
990                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
991                 self.assertTrue(session.protocol in
992                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
993                                  IP_PROTOS.icmp])
994
995         # pg4 session dump
996         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
997         self.assertTrue(len(sessions) >= 4)
998         for session in sessions:
999             self.assertFalse(session.is_static)
1000             self.assertEqual(session.inside_ip_address[0:4],
1001                              self.pg4.remote_ip4n)
1002             self.assertEqual(session.outside_ip_address,
1003                              addresses[0].ip_address)
1004
1005         # pg6 session dump
1006         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1007         self.assertTrue(len(sessions) >= 3)
1008         for session in sessions:
1009             self.assertTrue(session.is_static)
1010             self.assertEqual(session.inside_ip_address[0:4],
1011                              self.pg6.remote_ip4n)
1012             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1013                              map(int, static_nat_ip.split('.')))
1014             self.assertTrue(session.inside_port in
1015                             [self.tcp_port_in, self.udp_port_in,
1016                              self.icmp_id_in])
1017
1018     def test_hairpinning(self):
1019         """ SNAT hairpinning - 1:1 NAT with port"""
1020
1021         host = self.pg0.remote_hosts[0]
1022         server = self.pg0.remote_hosts[1]
1023         host_in_port = 1234
1024         host_out_port = 0
1025         server_in_port = 5678
1026         server_out_port = 8765
1027
1028         self.snat_add_address(self.snat_addr)
1029         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1030         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1031                                                  is_inside=0)
1032         # add static mapping for server
1033         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1034                                      server_in_port, server_out_port,
1035                                      proto=IP_PROTOS.tcp)
1036
1037         # send packet from host to server
1038         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1039              IP(src=host.ip4, dst=self.snat_addr) /
1040              TCP(sport=host_in_port, dport=server_out_port))
1041         self.pg0.add_stream(p)
1042         self.pg_enable_capture(self.pg_interfaces)
1043         self.pg_start()
1044         capture = self.pg0.get_capture(1)
1045         p = capture[0]
1046         try:
1047             ip = p[IP]
1048             tcp = p[TCP]
1049             self.assertEqual(ip.src, self.snat_addr)
1050             self.assertEqual(ip.dst, server.ip4)
1051             self.assertNotEqual(tcp.sport, host_in_port)
1052             self.assertEqual(tcp.dport, server_in_port)
1053             host_out_port = tcp.sport
1054         except:
1055             self.logger.error(ppp("Unexpected or invalid packet:", p))
1056             raise
1057
1058         # send reply from server to host
1059         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1060              IP(src=server.ip4, dst=self.snat_addr) /
1061              TCP(sport=server_in_port, dport=host_out_port))
1062         self.pg0.add_stream(p)
1063         self.pg_enable_capture(self.pg_interfaces)
1064         self.pg_start()
1065         capture = self.pg0.get_capture(1)
1066         p = capture[0]
1067         try:
1068             ip = p[IP]
1069             tcp = p[TCP]
1070             self.assertEqual(ip.src, self.snat_addr)
1071             self.assertEqual(ip.dst, host.ip4)
1072             self.assertEqual(tcp.sport, server_out_port)
1073             self.assertEqual(tcp.dport, host_in_port)
1074         except:
1075             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1076             raise
1077
1078     def test_hairpinning2(self):
1079         """ SNAT hairpinning - 1:1 NAT"""
1080
1081         server1_nat_ip = "10.0.0.10"
1082         server2_nat_ip = "10.0.0.11"
1083         host = self.pg0.remote_hosts[0]
1084         server1 = self.pg0.remote_hosts[1]
1085         server2 = self.pg0.remote_hosts[2]
1086         server_tcp_port = 22
1087         server_udp_port = 20
1088
1089         self.snat_add_address(self.snat_addr)
1090         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1091         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1092                                                  is_inside=0)
1093
1094         # add static mapping for servers
1095         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1096         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1097
1098         # host to server1
1099         pkts = []
1100         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1101              IP(src=host.ip4, dst=server1_nat_ip) /
1102              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1103         pkts.append(p)
1104         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1105              IP(src=host.ip4, dst=server1_nat_ip) /
1106              UDP(sport=self.udp_port_in, dport=server_udp_port))
1107         pkts.append(p)
1108         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1109              IP(src=host.ip4, dst=server1_nat_ip) /
1110              ICMP(id=self.icmp_id_in, type='echo-request'))
1111         pkts.append(p)
1112         self.pg0.add_stream(pkts)
1113         self.pg_enable_capture(self.pg_interfaces)
1114         self.pg_start()
1115         capture = self.pg0.get_capture(len(pkts))
1116         for packet in capture:
1117             try:
1118                 self.assertEqual(packet[IP].src, self.snat_addr)
1119                 self.assertEqual(packet[IP].dst, server1.ip4)
1120                 if packet.haslayer(TCP):
1121                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1122                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1123                     self.tcp_port_out = packet[TCP].sport
1124                 elif packet.haslayer(UDP):
1125                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1126                     self.assertEqual(packet[UDP].dport, server_udp_port)
1127                     self.udp_port_out = packet[UDP].sport
1128                 else:
1129                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1130                     self.icmp_id_out = packet[ICMP].id
1131             except:
1132                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1133                 raise
1134
1135         # server1 to host
1136         pkts = []
1137         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1138              IP(src=server1.ip4, dst=self.snat_addr) /
1139              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1140         pkts.append(p)
1141         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1142              IP(src=server1.ip4, dst=self.snat_addr) /
1143              UDP(sport=server_udp_port, dport=self.udp_port_out))
1144         pkts.append(p)
1145         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1146              IP(src=server1.ip4, dst=self.snat_addr) /
1147              ICMP(id=self.icmp_id_out, type='echo-reply'))
1148         pkts.append(p)
1149         self.pg0.add_stream(pkts)
1150         self.pg_enable_capture(self.pg_interfaces)
1151         self.pg_start()
1152         capture = self.pg0.get_capture(len(pkts))
1153         for packet in capture:
1154             try:
1155                 self.assertEqual(packet[IP].src, server1_nat_ip)
1156                 self.assertEqual(packet[IP].dst, host.ip4)
1157                 if packet.haslayer(TCP):
1158                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1159                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1160                 elif packet.haslayer(UDP):
1161                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1162                     self.assertEqual(packet[UDP].sport, server_udp_port)
1163                 else:
1164                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1165             except:
1166                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1167                 raise
1168
1169         # server2 to server1
1170         pkts = []
1171         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1172              IP(src=server2.ip4, dst=server1_nat_ip) /
1173              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1174         pkts.append(p)
1175         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1176              IP(src=server2.ip4, dst=server1_nat_ip) /
1177              UDP(sport=self.udp_port_in, dport=server_udp_port))
1178         pkts.append(p)
1179         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1180              IP(src=server2.ip4, dst=server1_nat_ip) /
1181              ICMP(id=self.icmp_id_in, type='echo-request'))
1182         pkts.append(p)
1183         self.pg0.add_stream(pkts)
1184         self.pg_enable_capture(self.pg_interfaces)
1185         self.pg_start()
1186         capture = self.pg0.get_capture(len(pkts))
1187         for packet in capture:
1188             try:
1189                 self.assertEqual(packet[IP].src, server2_nat_ip)
1190                 self.assertEqual(packet[IP].dst, server1.ip4)
1191                 if packet.haslayer(TCP):
1192                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1193                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1194                     self.tcp_port_out = packet[TCP].sport
1195                 elif packet.haslayer(UDP):
1196                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1197                     self.assertEqual(packet[UDP].dport, server_udp_port)
1198                     self.udp_port_out = packet[UDP].sport
1199                 else:
1200                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1201                     self.icmp_id_out = packet[ICMP].id
1202             except:
1203                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1204                 raise
1205
1206         # server1 to server2
1207         pkts = []
1208         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1209              IP(src=server1.ip4, dst=server2_nat_ip) /
1210              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1211         pkts.append(p)
1212         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213              IP(src=server1.ip4, dst=server2_nat_ip) /
1214              UDP(sport=server_udp_port, dport=self.udp_port_out))
1215         pkts.append(p)
1216         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1217              IP(src=server1.ip4, dst=server2_nat_ip) /
1218              ICMP(id=self.icmp_id_out, type='echo-reply'))
1219         pkts.append(p)
1220         self.pg0.add_stream(pkts)
1221         self.pg_enable_capture(self.pg_interfaces)
1222         self.pg_start()
1223         capture = self.pg0.get_capture(len(pkts))
1224         for packet in capture:
1225             try:
1226                 self.assertEqual(packet[IP].src, server1_nat_ip)
1227                 self.assertEqual(packet[IP].dst, server2.ip4)
1228                 if packet.haslayer(TCP):
1229                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1230                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1231                 elif packet.haslayer(UDP):
1232                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1233                     self.assertEqual(packet[UDP].sport, server_udp_port)
1234                 else:
1235                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1236             except:
1237                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1238                 raise
1239
1240     def test_max_translations_per_user(self):
1241         """ MAX translations per user - recycle the least recently used """
1242
1243         self.snat_add_address(self.snat_addr)
1244         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1245         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1246                                                  is_inside=0)
1247
1248         # get maximum number of translations per user
1249         snat_config = self.vapi.snat_show_config()
1250
1251         # send more than maximum number of translations per user packets
1252         pkts_num = snat_config.max_translations_per_user + 5
1253         pkts = []
1254         for port in range(0, pkts_num):
1255             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1256                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1257                  TCP(sport=1025 + port))
1258             pkts.append(p)
1259         self.pg0.add_stream(pkts)
1260         self.pg_enable_capture(self.pg_interfaces)
1261         self.pg_start()
1262
1263         # verify number of translated packet
1264         self.pg1.get_capture(pkts_num)
1265
1266     def test_interface_addr(self):
1267         """ Acquire SNAT addresses from interface """
1268         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1269
1270         # no address in NAT pool
1271         adresses = self.vapi.snat_address_dump()
1272         self.assertEqual(0, len(adresses))
1273
1274         # configure interface address and check NAT address pool
1275         self.pg7.config_ip4()
1276         adresses = self.vapi.snat_address_dump()
1277         self.assertEqual(1, len(adresses))
1278         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1279
1280         # remove interface address and check NAT address pool
1281         self.pg7.unconfig_ip4()
1282         adresses = self.vapi.snat_address_dump()
1283         self.assertEqual(0, len(adresses))
1284
1285     def test_interface_addr_static_mapping(self):
1286         """ Static mapping with addresses from interface """
1287         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1288         self.snat_add_static_mapping('1.2.3.4',
1289                                      external_sw_if_index=self.pg7.sw_if_index)
1290
1291         # static mappings with external interface
1292         static_mappings = self.vapi.snat_static_mapping_dump()
1293         self.assertEqual(1, len(static_mappings))
1294         self.assertEqual(self.pg7.sw_if_index,
1295                          static_mappings[0].external_sw_if_index)
1296
1297         # configure interface address and check static mappings
1298         self.pg7.config_ip4()
1299         static_mappings = self.vapi.snat_static_mapping_dump()
1300         self.assertEqual(1, len(static_mappings))
1301         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1302                          self.pg7.local_ip4n)
1303         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1304
1305         # remove interface address and check static mappings
1306         self.pg7.unconfig_ip4()
1307         static_mappings = self.vapi.snat_static_mapping_dump()
1308         self.assertEqual(0, len(static_mappings))
1309
1310     def test_ipfix_nat44_sess(self):
1311         """ S-NAT IPFIX logging NAT44 session created/delted """
1312         self.ipfix_domain_id = 10
1313         self.ipfix_src_port = 20202
1314         colector_port = 30303
1315         bind_layers(UDP, IPFIX, dport=30303)
1316         self.snat_add_address(self.snat_addr)
1317         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1318         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1319                                                  is_inside=0)
1320         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1321                                      src_address=self.pg3.local_ip4n,
1322                                      path_mtu=512,
1323                                      template_interval=10,
1324                                      collector_port=colector_port)
1325         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1326                              src_port=self.ipfix_src_port)
1327
1328         pkts = self.create_stream_in(self.pg0, self.pg1)
1329         self.pg0.add_stream(pkts)
1330         self.pg_enable_capture(self.pg_interfaces)
1331         self.pg_start()
1332         capture = self.pg1.get_capture(len(pkts))
1333         self.verify_capture_out(capture)
1334         self.snat_add_address(self.snat_addr, is_add=0)
1335         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1336         capture = self.pg3.get_capture(3)
1337         ipfix = IPFIXDecoder()
1338         # first load template
1339         for p in capture:
1340             self.assertTrue(p.haslayer(IPFIX))
1341             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1342             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1343             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1344             self.assertEqual(p[UDP].dport, colector_port)
1345             self.assertEqual(p[IPFIX].observationDomainID,
1346                              self.ipfix_domain_id)
1347             if p.haslayer(Template):
1348                 ipfix.add_template(p.getlayer(Template))
1349         # verify events in data set
1350         for p in capture:
1351             if p.haslayer(Data):
1352                 data = ipfix.decode_data_set(p.getlayer(Set))
1353                 self.verify_ipfix_nat44_ses(data)
1354
1355     def test_ipfix_addr_exhausted(self):
1356         """ S-NAT IPFIX logging NAT addresses exhausted """
1357         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1358         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1359                                                  is_inside=0)
1360         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1361                                      src_address=self.pg3.local_ip4n,
1362                                      path_mtu=512,
1363                                      template_interval=10)
1364         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1365                              src_port=self.ipfix_src_port)
1366
1367         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1368              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1369              TCP(sport=3025))
1370         self.pg0.add_stream(p)
1371         self.pg_enable_capture(self.pg_interfaces)
1372         self.pg_start()
1373         capture = self.pg1.get_capture(0)
1374         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1375         capture = self.pg3.get_capture(3)
1376         ipfix = IPFIXDecoder()
1377         # first load template
1378         for p in capture:
1379             self.assertTrue(p.haslayer(IPFIX))
1380             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1381             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1382             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1383             self.assertEqual(p[UDP].dport, 4739)
1384             self.assertEqual(p[IPFIX].observationDomainID,
1385                              self.ipfix_domain_id)
1386             if p.haslayer(Template):
1387                 ipfix.add_template(p.getlayer(Template))
1388         # verify events in data set
1389         for p in capture:
1390             if p.haslayer(Data):
1391                 data = ipfix.decode_data_set(p.getlayer(Set))
1392                 self.verify_ipfix_addr_exhausted(data)
1393
1394     def test_pool_addr_fib(self):
1395         """ S-NAT add pool addresses to FIB """
1396         static_addr = '10.0.0.10'
1397         self.snat_add_address(self.snat_addr)
1398         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1399         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1400                                                  is_inside=0)
1401         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1402
1403         # SNAT address
1404         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1405              ARP(op=ARP.who_has, pdst=self.snat_addr,
1406                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1407         self.pg1.add_stream(p)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410         capture = self.pg1.get_capture(1)
1411         self.assertTrue(capture[0].haslayer(ARP))
1412         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1413
1414         # 1:1 NAT address
1415         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1416              ARP(op=ARP.who_has, pdst=static_addr,
1417                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1418         self.pg1.add_stream(p)
1419         self.pg_enable_capture(self.pg_interfaces)
1420         self.pg_start()
1421         capture = self.pg1.get_capture(1)
1422         self.assertTrue(capture[0].haslayer(ARP))
1423         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1424
1425         # send ARP to non-SNAT interface
1426         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1427              ARP(op=ARP.who_has, pdst=self.snat_addr,
1428                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1429         self.pg2.add_stream(p)
1430         self.pg_enable_capture(self.pg_interfaces)
1431         self.pg_start()
1432         capture = self.pg1.get_capture(0)
1433
1434         # remove addresses and verify
1435         self.snat_add_address(self.snat_addr, is_add=0)
1436         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1437                                      is_add=0)
1438
1439         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1440              ARP(op=ARP.who_has, pdst=self.snat_addr,
1441                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1442         self.pg1.add_stream(p)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445         capture = self.pg1.get_capture(0)
1446
1447         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1448              ARP(op=ARP.who_has, pdst=static_addr,
1449                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1450         self.pg1.add_stream(p)
1451         self.pg_enable_capture(self.pg_interfaces)
1452         self.pg_start()
1453         capture = self.pg1.get_capture(0)
1454
1455     def test_vrf_mode(self):
1456         """ S-NAT tenant VRF aware address pool mode """
1457
1458         vrf_id1 = 1
1459         vrf_id2 = 2
1460         nat_ip1 = "10.0.0.10"
1461         nat_ip2 = "10.0.0.11"
1462
1463         self.pg0.unconfig_ip4()
1464         self.pg1.unconfig_ip4()
1465         self.pg0.set_table_ip4(vrf_id1)
1466         self.pg1.set_table_ip4(vrf_id2)
1467         self.pg0.config_ip4()
1468         self.pg1.config_ip4()
1469
1470         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1471         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1472         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1473         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1474         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1475                                                  is_inside=0)
1476
1477         # first VRF
1478         pkts = self.create_stream_in(self.pg0, self.pg2)
1479         self.pg0.add_stream(pkts)
1480         self.pg_enable_capture(self.pg_interfaces)
1481         self.pg_start()
1482         capture = self.pg2.get_capture(len(pkts))
1483         self.verify_capture_out(capture, nat_ip1)
1484
1485         # second VRF
1486         pkts = self.create_stream_in(self.pg1, self.pg2)
1487         self.pg1.add_stream(pkts)
1488         self.pg_enable_capture(self.pg_interfaces)
1489         self.pg_start()
1490         capture = self.pg2.get_capture(len(pkts))
1491         self.verify_capture_out(capture, nat_ip2)
1492
1493     def test_vrf_feature_independent(self):
1494         """ S-NAT tenant VRF independent address pool mode """
1495
1496         nat_ip1 = "10.0.0.10"
1497         nat_ip2 = "10.0.0.11"
1498
1499         self.snat_add_address(nat_ip1)
1500         self.snat_add_address(nat_ip2)
1501         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1502         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1503         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1504                                                  is_inside=0)
1505
1506         # first VRF
1507         pkts = self.create_stream_in(self.pg0, self.pg2)
1508         self.pg0.add_stream(pkts)
1509         self.pg_enable_capture(self.pg_interfaces)
1510         self.pg_start()
1511         capture = self.pg2.get_capture(len(pkts))
1512         self.verify_capture_out(capture, nat_ip1)
1513
1514         # second VRF
1515         pkts = self.create_stream_in(self.pg1, self.pg2)
1516         self.pg1.add_stream(pkts)
1517         self.pg_enable_capture(self.pg_interfaces)
1518         self.pg_start()
1519         capture = self.pg2.get_capture(len(pkts))
1520         self.verify_capture_out(capture, nat_ip1)
1521
1522     def test_dynamic_ipless_interfaces(self):
1523         """ SNAT interfaces without configured ip dynamic map """
1524
1525         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1526                                       self.pg7.remote_mac,
1527                                       self.pg7.remote_ip4n,
1528                                       is_static=1)
1529         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1530                                       self.pg8.remote_mac,
1531                                       self.pg8.remote_ip4n,
1532                                       is_static=1)
1533
1534         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1535                                    dst_address_length=32,
1536                                    next_hop_address=self.pg7.remote_ip4n,
1537                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1538         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1539                                    dst_address_length=32,
1540                                    next_hop_address=self.pg8.remote_ip4n,
1541                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1542
1543         self.snat_add_address(self.snat_addr)
1544         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1545         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1546                                                  is_inside=0)
1547
1548         # in2out
1549         pkts = self.create_stream_in(self.pg7, self.pg8)
1550         self.pg7.add_stream(pkts)
1551         self.pg_enable_capture(self.pg_interfaces)
1552         self.pg_start()
1553         capture = self.pg8.get_capture(len(pkts))
1554         self.verify_capture_out(capture)
1555
1556         # out2in
1557         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1558         self.pg8.add_stream(pkts)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg7.get_capture(len(pkts))
1562         self.verify_capture_in(capture, self.pg7)
1563
1564     def test_static_ipless_interfaces(self):
1565         """ SNAT 1:1 NAT interfaces without configured ip """
1566
1567         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1568                                       self.pg7.remote_mac,
1569                                       self.pg7.remote_ip4n,
1570                                       is_static=1)
1571         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1572                                       self.pg8.remote_mac,
1573                                       self.pg8.remote_ip4n,
1574                                       is_static=1)
1575
1576         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1577                                    dst_address_length=32,
1578                                    next_hop_address=self.pg7.remote_ip4n,
1579                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1580         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1581                                    dst_address_length=32,
1582                                    next_hop_address=self.pg8.remote_ip4n,
1583                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1584
1585         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1586         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1587         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1588                                                  is_inside=0)
1589
1590         # out2in
1591         pkts = self.create_stream_out(self.pg8)
1592         self.pg8.add_stream(pkts)
1593         self.pg_enable_capture(self.pg_interfaces)
1594         self.pg_start()
1595         capture = self.pg7.get_capture(len(pkts))
1596         self.verify_capture_in(capture, self.pg7)
1597
1598         # in2out
1599         pkts = self.create_stream_in(self.pg7, self.pg8)
1600         self.pg7.add_stream(pkts)
1601         self.pg_enable_capture(self.pg_interfaces)
1602         self.pg_start()
1603         capture = self.pg8.get_capture(len(pkts))
1604         self.verify_capture_out(capture, self.snat_addr, True)
1605
1606     def test_static_with_port_ipless_interfaces(self):
1607         """ SNAT 1:1 NAT with port interfaces without configured ip """
1608
1609         self.tcp_port_out = 30606
1610         self.udp_port_out = 30607
1611         self.icmp_id_out = 30608
1612
1613         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1614                                       self.pg7.remote_mac,
1615                                       self.pg7.remote_ip4n,
1616                                       is_static=1)
1617         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1618                                       self.pg8.remote_mac,
1619                                       self.pg8.remote_ip4n,
1620                                       is_static=1)
1621
1622         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1623                                    dst_address_length=32,
1624                                    next_hop_address=self.pg7.remote_ip4n,
1625                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1626         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1627                                    dst_address_length=32,
1628                                    next_hop_address=self.pg8.remote_ip4n,
1629                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1630
1631         self.snat_add_address(self.snat_addr)
1632         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1633                                      self.tcp_port_in, self.tcp_port_out,
1634                                      proto=IP_PROTOS.tcp)
1635         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1636                                      self.udp_port_in, self.udp_port_out,
1637                                      proto=IP_PROTOS.udp)
1638         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1639                                      self.icmp_id_in, self.icmp_id_out,
1640                                      proto=IP_PROTOS.icmp)
1641         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1642         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1643                                                  is_inside=0)
1644
1645         # out2in
1646         pkts = self.create_stream_out(self.pg8)
1647         self.pg8.add_stream(pkts)
1648         self.pg_enable_capture(self.pg_interfaces)
1649         self.pg_start()
1650         capture = self.pg7.get_capture(len(pkts))
1651         self.verify_capture_in(capture, self.pg7)
1652
1653         # in2out
1654         pkts = self.create_stream_in(self.pg7, self.pg8)
1655         self.pg7.add_stream(pkts)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658         capture = self.pg8.get_capture(len(pkts))
1659         self.verify_capture_out(capture)
1660
1661     def tearDown(self):
1662         super(TestSNAT, self).tearDown()
1663         if not self.vpp_dead:
1664             self.logger.info(self.vapi.cli("show snat verbose"))
1665             self.clear_snat()
1666
1667
1668 class TestDeterministicNAT(MethodHolder):
1669     """ Deterministic NAT Test Cases """
1670
1671     @classmethod
1672     def setUpConstants(cls):
1673         super(TestDeterministicNAT, cls).setUpConstants()
1674         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1675
1676     @classmethod
1677     def setUpClass(cls):
1678         super(TestDeterministicNAT, cls).setUpClass()
1679
1680         try:
1681             cls.tcp_port_in = 6303
1682             cls.tcp_external_port = 6303
1683             cls.udp_port_in = 6304
1684             cls.udp_external_port = 6304
1685             cls.icmp_id_in = 6305
1686             cls.snat_addr = '10.0.0.3'
1687
1688             cls.create_pg_interfaces(range(3))
1689             cls.interfaces = list(cls.pg_interfaces)
1690
1691             for i in cls.interfaces:
1692                 i.admin_up()
1693                 i.config_ip4()
1694                 i.resolve_arp()
1695
1696             cls.pg0.generate_remote_hosts(2)
1697             cls.pg0.configure_ipv4_neighbors()
1698
1699         except Exception:
1700             super(TestDeterministicNAT, cls).tearDownClass()
1701             raise
1702
1703     def create_stream_in(self, in_if, out_if, ttl=64):
1704         """
1705         Create packet stream for inside network
1706
1707         :param in_if: Inside interface
1708         :param out_if: Outside interface
1709         :param ttl: TTL of generated packets
1710         """
1711         pkts = []
1712         # TCP
1713         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1714              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1715              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1716         pkts.append(p)
1717
1718         # UDP
1719         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1720              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1721              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1722         pkts.append(p)
1723
1724         # ICMP
1725         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1726              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1727              ICMP(id=self.icmp_id_in, type='echo-request'))
1728         pkts.append(p)
1729
1730         return pkts
1731
1732     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1733         """
1734         Create packet stream for outside network
1735
1736         :param out_if: Outside interface
1737         :param dst_ip: Destination IP address (Default use global SNAT address)
1738         :param ttl: TTL of generated packets
1739         """
1740         if dst_ip is None:
1741             dst_ip = self.snat_addr
1742         pkts = []
1743         # TCP
1744         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1745              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1746              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1747         pkts.append(p)
1748
1749         # UDP
1750         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1751              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1752              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1753         pkts.append(p)
1754
1755         # ICMP
1756         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1757              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1758              ICMP(id=self.icmp_external_id, type='echo-reply'))
1759         pkts.append(p)
1760
1761         return pkts
1762
1763     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1764         """
1765         Verify captured packets on outside network
1766
1767         :param capture: Captured packets
1768         :param nat_ip: Translated IP address (Default use global SNAT address)
1769         :param same_port: Sorce port number is not translated (Default False)
1770         :param packet_num: Expected number of packets (Default 3)
1771         """
1772         if nat_ip is None:
1773             nat_ip = self.snat_addr
1774         self.assertEqual(packet_num, len(capture))
1775         for packet in capture:
1776             try:
1777                 self.assertEqual(packet[IP].src, nat_ip)
1778                 if packet.haslayer(TCP):
1779                     self.tcp_port_out = packet[TCP].sport
1780                 elif packet.haslayer(UDP):
1781                     self.udp_port_out = packet[UDP].sport
1782                 else:
1783                     self.icmp_external_id = packet[ICMP].id
1784             except:
1785                 self.logger.error(ppp("Unexpected or invalid packet "
1786                                       "(outside network):", packet))
1787                 raise
1788
1789     def initiate_tcp_session(self, in_if, out_if):
1790         """
1791         Initiates TCP session
1792
1793         :param in_if: Inside interface
1794         :param out_if: Outside interface
1795         """
1796         try:
1797             # SYN packet in->out
1798             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1799                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1800                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1801                      flags="S"))
1802             in_if.add_stream(p)
1803             self.pg_enable_capture(self.pg_interfaces)
1804             self.pg_start()
1805             capture = out_if.get_capture(1)
1806             p = capture[0]
1807             self.tcp_port_out = p[TCP].sport
1808
1809             # SYN + ACK packet out->in
1810             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1811                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1812                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1813                      flags="SA"))
1814             out_if.add_stream(p)
1815             self.pg_enable_capture(self.pg_interfaces)
1816             self.pg_start()
1817             in_if.get_capture(1)
1818
1819             # ACK packet in->out
1820             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1821                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1822                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1823                      flags="A"))
1824             in_if.add_stream(p)
1825             self.pg_enable_capture(self.pg_interfaces)
1826             self.pg_start()
1827             out_if.get_capture(1)
1828
1829         except:
1830             self.logger.error("TCP 3 way handshake failed")
1831             raise
1832
1833     def verify_ipfix_max_entries_per_user(self, data):
1834         """
1835         Verify IPFIX maximum entries per user exceeded event
1836
1837         :param data: Decoded IPFIX data records
1838         """
1839         self.assertEqual(1, len(data))
1840         record = data[0]
1841         # natEvent
1842         self.assertEqual(ord(record[230]), 13)
1843         # natQuotaExceededEvent
1844         self.assertEqual('\x03\x00\x00\x00', record[466])
1845         # sourceIPv4Address
1846         self.assertEqual(self.pg0.remote_ip4n, record[8])
1847
1848     def test_deterministic_mode(self):
1849         """ S-NAT run deterministic mode """
1850         in_addr = '172.16.255.0'
1851         out_addr = '172.17.255.50'
1852         in_addr_t = '172.16.255.20'
1853         in_addr_n = socket.inet_aton(in_addr)
1854         out_addr_n = socket.inet_aton(out_addr)
1855         in_addr_t_n = socket.inet_aton(in_addr_t)
1856         in_plen = 24
1857         out_plen = 32
1858
1859         snat_config = self.vapi.snat_show_config()
1860         self.assertEqual(1, snat_config.deterministic)
1861
1862         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1863
1864         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1865         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1866         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1867         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1868
1869         deterministic_mappings = self.vapi.snat_det_map_dump()
1870         self.assertEqual(len(deterministic_mappings), 1)
1871         dsm = deterministic_mappings[0]
1872         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1873         self.assertEqual(in_plen, dsm.in_plen)
1874         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1875         self.assertEqual(out_plen, dsm.out_plen)
1876
1877         self.clear_snat()
1878         deterministic_mappings = self.vapi.snat_det_map_dump()
1879         self.assertEqual(len(deterministic_mappings), 0)
1880
1881     def test_set_timeouts(self):
1882         """ Set deterministic NAT timeouts """
1883         timeouts_before = self.vapi.snat_det_get_timeouts()
1884
1885         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1886                                         timeouts_before.tcp_established + 10,
1887                                         timeouts_before.tcp_transitory + 10,
1888                                         timeouts_before.icmp + 10)
1889
1890         timeouts_after = self.vapi.snat_det_get_timeouts()
1891
1892         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1893         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1894         self.assertNotEqual(timeouts_before.tcp_established,
1895                             timeouts_after.tcp_established)
1896         self.assertNotEqual(timeouts_before.tcp_transitory,
1897                             timeouts_after.tcp_transitory)
1898
1899     def test_det_in(self):
1900         """ CGNAT translation test (TCP, UDP, ICMP) """
1901
1902         nat_ip = "10.0.0.10"
1903
1904         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1905                                    32,
1906                                    socket.inet_aton(nat_ip),
1907                                    32)
1908         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1909         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1910                                                  is_inside=0)
1911
1912         # in2out
1913         pkts = self.create_stream_in(self.pg0, self.pg1)
1914         self.pg0.add_stream(pkts)
1915         self.pg_enable_capture(self.pg_interfaces)
1916         self.pg_start()
1917         capture = self.pg1.get_capture(len(pkts))
1918         self.verify_capture_out(capture, nat_ip)
1919
1920         # out2in
1921         pkts = self.create_stream_out(self.pg1, nat_ip)
1922         self.pg1.add_stream(pkts)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925         capture = self.pg0.get_capture(len(pkts))
1926         self.verify_capture_in(capture, self.pg0)
1927
1928         # session dump test
1929         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
1930         self.assertEqual(len(sessions), 3)
1931
1932         # TCP session
1933         s = sessions[0]
1934         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1935         self.assertEqual(s.in_port, self.tcp_port_in)
1936         self.assertEqual(s.out_port, self.tcp_port_out)
1937         self.assertEqual(s.ext_port, self.tcp_external_port)
1938
1939         # UDP session
1940         s = sessions[1]
1941         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1942         self.assertEqual(s.in_port, self.udp_port_in)
1943         self.assertEqual(s.out_port, self.udp_port_out)
1944         self.assertEqual(s.ext_port, self.udp_external_port)
1945
1946         # ICMP session
1947         s = sessions[2]
1948         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
1949         self.assertEqual(s.in_port, self.icmp_id_in)
1950         self.assertEqual(s.out_port, self.icmp_external_id)
1951
1952     def test_multiple_users(self):
1953         """ CGNAT multiple users """
1954
1955         nat_ip = "10.0.0.10"
1956         port_in = 80
1957         external_port = 6303
1958
1959         host0 = self.pg0.remote_hosts[0]
1960         host1 = self.pg0.remote_hosts[1]
1961
1962         self.vapi.snat_add_det_map(host0.ip4n,
1963                                    24,
1964                                    socket.inet_aton(nat_ip),
1965                                    32)
1966         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1967         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1968                                                  is_inside=0)
1969
1970         # host0 to out
1971         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
1972              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
1973              TCP(sport=port_in, dport=external_port))
1974         self.pg0.add_stream(p)
1975         self.pg_enable_capture(self.pg_interfaces)
1976         self.pg_start()
1977         capture = self.pg1.get_capture(1)
1978         p = capture[0]
1979         try:
1980             ip = p[IP]
1981             tcp = p[TCP]
1982             self.assertEqual(ip.src, nat_ip)
1983             self.assertEqual(ip.dst, self.pg1.remote_ip4)
1984             self.assertEqual(tcp.dport, external_port)
1985             port_out0 = tcp.sport
1986         except:
1987             self.logger.error(ppp("Unexpected or invalid packet:", p))
1988             raise
1989
1990         # host1 to out
1991         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
1992              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
1993              TCP(sport=port_in, dport=external_port))
1994         self.pg0.add_stream(p)
1995         self.pg_enable_capture(self.pg_interfaces)
1996         self.pg_start()
1997         capture = self.pg1.get_capture(1)
1998         p = capture[0]
1999         try:
2000             ip = p[IP]
2001             tcp = p[TCP]
2002             self.assertEqual(ip.src, nat_ip)
2003             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2004             self.assertEqual(tcp.dport, external_port)
2005             port_out1 = tcp.sport
2006         except:
2007             self.logger.error(ppp("Unexpected or invalid packet:", p))
2008             raise
2009
2010         dms = self.vapi.snat_det_map_dump()
2011         self.assertEqual(1, len(dms))
2012         self.assertEqual(2, dms[0].ses_num)
2013
2014         # out to host0
2015         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2016              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2017              TCP(sport=external_port, dport=port_out0))
2018         self.pg1.add_stream(p)
2019         self.pg_enable_capture(self.pg_interfaces)
2020         self.pg_start()
2021         capture = self.pg0.get_capture(1)
2022         p = capture[0]
2023         try:
2024             ip = p[IP]
2025             tcp = p[TCP]
2026             self.assertEqual(ip.src, self.pg1.remote_ip4)
2027             self.assertEqual(ip.dst, host0.ip4)
2028             self.assertEqual(tcp.dport, port_in)
2029             self.assertEqual(tcp.sport, external_port)
2030         except:
2031             self.logger.error(ppp("Unexpected or invalid packet:", p))
2032             raise
2033
2034         # out to host1
2035         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2036              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2037              TCP(sport=external_port, dport=port_out1))
2038         self.pg1.add_stream(p)
2039         self.pg_enable_capture(self.pg_interfaces)
2040         self.pg_start()
2041         capture = self.pg0.get_capture(1)
2042         p = capture[0]
2043         try:
2044             ip = p[IP]
2045             tcp = p[TCP]
2046             self.assertEqual(ip.src, self.pg1.remote_ip4)
2047             self.assertEqual(ip.dst, host1.ip4)
2048             self.assertEqual(tcp.dport, port_in)
2049             self.assertEqual(tcp.sport, external_port)
2050         except:
2051             self.logger.error(ppp("Unexpected or invalid packet", p))
2052             raise
2053
2054         # session close api test
2055         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2056                                              port_out1,
2057                                              self.pg1.remote_ip4n,
2058                                              external_port)
2059         dms = self.vapi.snat_det_map_dump()
2060         self.assertEqual(dms[0].ses_num, 1)
2061
2062         self.vapi.snat_det_close_session_in(host0.ip4n,
2063                                             port_in,
2064                                             self.pg1.remote_ip4n,
2065                                             external_port)
2066         dms = self.vapi.snat_det_map_dump()
2067         self.assertEqual(dms[0].ses_num, 0)
2068
2069     def test_tcp_session_close_detection_in(self):
2070         """ CGNAT TCP session close initiated from inside network """
2071         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2072                                    32,
2073                                    socket.inet_aton(self.snat_addr),
2074                                    32)
2075         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2076         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2077                                                  is_inside=0)
2078
2079         self.initiate_tcp_session(self.pg0, self.pg1)
2080
2081         # close the session from inside
2082         try:
2083             # FIN packet in -> out
2084             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2085                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2086                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2087                      flags="F"))
2088             self.pg0.add_stream(p)
2089             self.pg_enable_capture(self.pg_interfaces)
2090             self.pg_start()
2091             self.pg1.get_capture(1)
2092
2093             pkts = []
2094
2095             # ACK packet out -> in
2096             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2097                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2098                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2099                      flags="A"))
2100             pkts.append(p)
2101
2102             # FIN packet out -> in
2103             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2104                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2105                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2106                      flags="F"))
2107             pkts.append(p)
2108
2109             self.pg1.add_stream(pkts)
2110             self.pg_enable_capture(self.pg_interfaces)
2111             self.pg_start()
2112             self.pg0.get_capture(2)
2113
2114             # ACK packet in -> out
2115             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2116                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2117                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2118                      flags="A"))
2119             self.pg0.add_stream(p)
2120             self.pg_enable_capture(self.pg_interfaces)
2121             self.pg_start()
2122             self.pg1.get_capture(1)
2123
2124             # Check if snat closed the session
2125             dms = self.vapi.snat_det_map_dump()
2126             self.assertEqual(0, dms[0].ses_num)
2127         except:
2128             self.logger.error("TCP session termination failed")
2129             raise
2130
2131     def test_tcp_session_close_detection_out(self):
2132         """ CGNAT TCP session close initiated from outside network """
2133         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2134                                    32,
2135                                    socket.inet_aton(self.snat_addr),
2136                                    32)
2137         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2138         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2139                                                  is_inside=0)
2140
2141         self.initiate_tcp_session(self.pg0, self.pg1)
2142
2143         # close the session from outside
2144         try:
2145             # FIN packet out -> in
2146             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2147                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2148                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2149                      flags="F"))
2150             self.pg1.add_stream(p)
2151             self.pg_enable_capture(self.pg_interfaces)
2152             self.pg_start()
2153             self.pg0.get_capture(1)
2154
2155             pkts = []
2156
2157             # ACK packet in -> out
2158             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2159                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2160                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2161                      flags="A"))
2162             pkts.append(p)
2163
2164             # ACK packet in -> out
2165             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2166                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2167                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2168                      flags="F"))
2169             pkts.append(p)
2170
2171             self.pg0.add_stream(pkts)
2172             self.pg_enable_capture(self.pg_interfaces)
2173             self.pg_start()
2174             self.pg1.get_capture(2)
2175
2176             # ACK packet out -> in
2177             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2178                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2179                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2180                      flags="A"))
2181             self.pg1.add_stream(p)
2182             self.pg_enable_capture(self.pg_interfaces)
2183             self.pg_start()
2184             self.pg0.get_capture(1)
2185
2186             # Check if snat closed the session
2187             dms = self.vapi.snat_det_map_dump()
2188             self.assertEqual(0, dms[0].ses_num)
2189         except:
2190             self.logger.error("TCP session termination failed")
2191             raise
2192
2193     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2194     def test_session_timeout(self):
2195         """ CGNAT session timeouts """
2196         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2197                                    32,
2198                                    socket.inet_aton(self.snat_addr),
2199                                    32)
2200         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2201         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2202                                                  is_inside=0)
2203
2204         self.initiate_tcp_session(self.pg0, self.pg1)
2205         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2206         pkts = self.create_stream_in(self.pg0, self.pg1)
2207         self.pg0.add_stream(pkts)
2208         self.pg_enable_capture(self.pg_interfaces)
2209         self.pg_start()
2210         capture = self.pg1.get_capture(len(pkts))
2211         sleep(15)
2212
2213         dms = self.vapi.snat_det_map_dump()
2214         self.assertEqual(0, dms[0].ses_num)
2215
2216     def test_session_limit_per_user(self):
2217         """ CGNAT maximum 1000 sessions per user should be created """
2218         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2219                                    32,
2220                                    socket.inet_aton(self.snat_addr),
2221                                    32)
2222         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2223         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2224                                                  is_inside=0)
2225         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2226                                      src_address=self.pg2.local_ip4n,
2227                                      path_mtu=512,
2228                                      template_interval=10)
2229         self.vapi.snat_ipfix()
2230
2231         pkts = []
2232         for port in range(1025, 2025):
2233             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2234                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2235                  UDP(sport=port, dport=port))
2236             pkts.append(p)
2237
2238         self.pg0.add_stream(pkts)
2239         self.pg_enable_capture(self.pg_interfaces)
2240         self.pg_start()
2241         capture = self.pg1.get_capture(len(pkts))
2242
2243         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2244              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2245              UDP(sport=3001, dport=3002))
2246         self.pg0.add_stream(p)
2247         self.pg_enable_capture(self.pg_interfaces)
2248         self.pg_start()
2249         capture = self.pg1.assert_nothing_captured()
2250
2251         # verify ICMP error packet
2252         capture = self.pg0.get_capture(1)
2253         p = capture[0]
2254         self.assertTrue(p.haslayer(ICMP))
2255         icmp = p[ICMP]
2256         self.assertEqual(icmp.type, 3)
2257         self.assertEqual(icmp.code, 1)
2258         self.assertTrue(icmp.haslayer(IPerror))
2259         inner_ip = icmp[IPerror]
2260         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2261         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2262
2263         dms = self.vapi.snat_det_map_dump()
2264
2265         self.assertEqual(1000, dms[0].ses_num)
2266
2267         # verify IPFIX logging
2268         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2269         capture = self.pg2.get_capture(2)
2270         ipfix = IPFIXDecoder()
2271         # first load template
2272         for p in capture:
2273             self.assertTrue(p.haslayer(IPFIX))
2274             if p.haslayer(Template):
2275                 ipfix.add_template(p.getlayer(Template))
2276         # verify events in data set
2277         for p in capture:
2278             if p.haslayer(Data):
2279                 data = ipfix.decode_data_set(p.getlayer(Set))
2280                 self.verify_ipfix_max_entries_per_user(data)
2281
2282     def clear_snat(self):
2283         """
2284         Clear SNAT configuration.
2285         """
2286         self.vapi.snat_ipfix(enable=0)
2287         self.vapi.snat_det_set_timeouts()
2288         deterministic_mappings = self.vapi.snat_det_map_dump()
2289         for dsm in deterministic_mappings:
2290             self.vapi.snat_add_det_map(dsm.in_addr,
2291                                        dsm.in_plen,
2292                                        dsm.out_addr,
2293                                        dsm.out_plen,
2294                                        is_add=0)
2295
2296         interfaces = self.vapi.snat_interface_dump()
2297         for intf in interfaces:
2298             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2299                                                      intf.is_inside,
2300                                                      is_add=0)
2301
2302     def tearDown(self):
2303         super(TestDeterministicNAT, self).tearDown()
2304         if not self.vpp_dead:
2305             self.logger.info(self.vapi.cli("show snat detail"))
2306             self.clear_snat()
2307
2308 if __name__ == '__main__':
2309     unittest.main(testRunner=VppTestRunner)