+ @staticmethod
+ def proto2layer(proto):
+ if proto == IP_PROTOS.tcp:
+ return TCP
+ elif proto == IP_PROTOS.udp:
+ return UDP
+ elif proto == IP_PROTOS.icmp:
+ return ICMP
+ else:
+ raise Exception("Unsupported protocol")
+
+ def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
+ layer = self.proto2layer(proto)
+
+ if proto == IP_PROTOS.tcp:
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ else:
+ data = "A" * 16 + "B" * 16 + "C" * 3
+ self.port_in = random.randint(1025, 65535)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_start = len(reass)
+
+ # in2out
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.port_in,
+ 20,
+ data,
+ proto)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ if not dont_translate:
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ else:
+ p = self.reass_frags_and_verify(frags,
+ self.pg0.remote_ip4,
+ self.pg1.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ if not dont_translate:
+ self.assertEqual(p[layer].dport, 20)
+ self.assertNotEqual(p[layer].sport, self.port_in)
+ else:
+ self.assertEqual(p[layer].sport, self.port_in)
+ else:
+ if not dont_translate:
+ self.assertNotEqual(p[layer].id, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ if not dont_translate:
+ dst_addr = self.nat_addr
+ else:
+ dst_addr = self.pg0.remote_ip4
+ if proto != IP_PROTOS.icmp:
+ sport = 20
+ dport = p[layer].sport
+ else:
+ sport = p[layer].id
+ dport = 0
+ pkts = self.create_stream_frag(self.pg1,
+ dst_addr,
+ sport,
+ dport,
+ data,
+ proto,
+ echo_reply=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg1.remote_ip4,
+ self.pg0.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].sport, 20)
+ self.assertEqual(p[layer].dport, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_end = len(reass)
+
+ self.assertEqual(reass_n_end - reass_n_start, 2)
+
+ def frag_in_order_in_plus_out(self, proto=IP_PROTOS.tcp):
+ layer = self.proto2layer(proto)
+
+ if proto == IP_PROTOS.tcp:
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ else:
+ data = "A" * 16 + "B" * 16 + "C" * 3
+ self.port_in = random.randint(1025, 65535)
+
+ for i in range(2):
+ reass = self.vapi.nat_reass_dump()
+ reass_n_start = len(reass)
+
+ # out2in
+ pkts = self.create_stream_frag(self.pg0,
+ self.server_out_addr,
+ self.port_in,
+ self.server_out_port,
+ data,
+ proto)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg0.remote_ip4,
+ self.server_in_addr)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].sport, self.port_in)
+ self.assertEqual(p[layer].dport, self.server_in_port)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ # in2out
+ if proto != IP_PROTOS.icmp:
+ pkts = self.create_stream_frag(self.pg1,
+ self.pg0.remote_ip4,
+ self.server_in_port,
+ p[layer].sport,
+ data,
+ proto)
+ else:
+ pkts = self.create_stream_frag(self.pg1,
+ self.pg0.remote_ip4,
+ p[layer].id,
+ 0,
+ data,
+ proto,
+ echo_reply=True)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.server_out_addr,
+ self.pg0.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].sport, self.server_out_port)
+ self.assertEqual(p[layer].dport, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ reass = self.vapi.nat_reass_dump()
+ reass_n_end = len(reass)
+
+ self.assertEqual(reass_n_end - reass_n_start, 2)
+
+ def reass_hairpinning(self, proto=IP_PROTOS.tcp):
+ layer = self.proto2layer(proto)
+
+ if proto == IP_PROTOS.tcp:
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ else:
+ data = "A" * 16 + "B" * 16 + "C" * 3
+
+ # send packet from host to server
+ pkts = self.create_stream_frag(self.pg0,
+ self.nat_addr,
+ self.host_in_port,
+ self.server_out_port,
+ data,
+ proto)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.server.ip4)
+ if proto != IP_PROTOS.icmp:
+ self.assertNotEqual(p[layer].sport, self.host_in_port)
+ self.assertEqual(p[layer].dport, self.server_in_port)
+ else:
+ self.assertNotEqual(p[layer].id, self.host_in_port)
+ self.assertEqual(data, p[Raw].load)
+
+ def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False):
+ layer = self.proto2layer(proto)
+
+ if proto == IP_PROTOS.tcp:
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ else:
+ data = "A" * 16 + "B" * 16 + "C" * 3
+ self.port_in = random.randint(1025, 65535)
+
+ for i in range(2):
+ # in2out
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.port_in,
+ 20,
+ data,
+ proto)
+ pkts.reverse()
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ if not dont_translate:
+ p = self.reass_frags_and_verify(frags,
+ self.nat_addr,
+ self.pg1.remote_ip4)
+ else:
+ p = self.reass_frags_and_verify(frags,
+ self.pg0.remote_ip4,
+ self.pg1.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ if not dont_translate:
+ self.assertEqual(p[layer].dport, 20)
+ self.assertNotEqual(p[layer].sport, self.port_in)
+ else:
+ self.assertEqual(p[layer].sport, self.port_in)
+ else:
+ if not dont_translate:
+ self.assertNotEqual(p[layer].id, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ # out2in
+ if not dont_translate:
+ dst_addr = self.nat_addr
+ else:
+ dst_addr = self.pg0.remote_ip4
+ if proto != IP_PROTOS.icmp:
+ sport = 20
+ dport = p[layer].sport
+ else:
+ sport = p[layer].id
+ dport = 0
+ pkts = self.create_stream_frag(self.pg1,
+ dst_addr,
+ sport,
+ dport,
+ data,
+ proto,
+ echo_reply=True)
+ pkts.reverse()
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg1.remote_ip4,
+ self.pg0.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].sport, 20)
+ self.assertEqual(p[layer].dport, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ def frag_out_of_order_in_plus_out(self, proto=IP_PROTOS.tcp):
+ layer = self.proto2layer(proto)
+
+ if proto == IP_PROTOS.tcp:
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ else:
+ data = "A" * 16 + "B" * 16 + "C" * 3
+ self.port_in = random.randint(1025, 65535)
+
+ for i in range(2):
+ # out2in
+ pkts = self.create_stream_frag(self.pg0,
+ self.server_out_addr,
+ self.port_in,
+ self.server_out_port,
+ data,
+ proto)
+ pkts.reverse()
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.pg0.remote_ip4,
+ self.server_in_addr)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].dport, self.server_in_port)
+ self.assertEqual(p[layer].sport, self.port_in)
+ self.assertEqual(p[layer].dport, self.server_in_port)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+
+ # in2out
+ if proto != IP_PROTOS.icmp:
+ pkts = self.create_stream_frag(self.pg1,
+ self.pg0.remote_ip4,
+ self.server_in_port,
+ p[layer].sport,
+ data,
+ proto)
+ else:
+ pkts = self.create_stream_frag(self.pg1,
+ self.pg0.remote_ip4,
+ p[layer].id,
+ 0,
+ data,
+ proto,
+ echo_reply=True)
+ pkts.reverse()
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg0.get_capture(len(pkts))
+ p = self.reass_frags_and_verify(frags,
+ self.server_out_addr,
+ self.pg0.remote_ip4)
+ if proto != IP_PROTOS.icmp:
+ self.assertEqual(p[layer].sport, self.server_out_port)
+ self.assertEqual(p[layer].dport, self.port_in)
+ else:
+ self.assertEqual(p[layer].id, self.port_in)
+ self.assertEqual(data, p[Raw].load)
+