6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
15 class TestMAP(VppTestCase):
19 super(TestMAP, self).setUp()
21 # create 2 pg interfaces
22 self.create_pg_interfaces(range(4))
24 # pg0 is 'inside' IPv4
27 self.pg0.resolve_arp()
29 # pg1 is 'outside' IPv6
32 self.pg1.generate_remote_hosts(4)
33 self.pg1.configure_ipv6_neighbors()
36 super(TestMAP, self).tearDown()
37 for i in self.pg_interfaces:
42 def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
44 dmac = self.pg1.remote_mac
46 self.pg0.add_stream(tx)
48 self.pg_enable_capture(self.pg_interfaces)
51 rx = self.pg1.get_capture(1)
54 self.assertEqual(rx[Ether].dst, dmac)
55 self.assertEqual(rx[IP].src, tx[IP].src)
56 self.assertEqual(rx[IPv6].src, ip6_src)
57 self.assertEqual(rx[IPv6].dst, ip6_dst)
63 # Add a route to the MAP-BR
67 map_route = VppIpRoute(self,
70 [VppRoutePath(self.pg1.remote_ip6,
72 proto=DpoProto.DPO_PROTO_IP6)],
74 map_route.add_vpp_config()
77 # Add a domain that maps from pg0 to pg1
79 map_dst = socket.inet_pton(socket.AF_INET6, map_br_pfx)
81 map_src_n = socket.inet_pton(socket.AF_INET6, map_src)
82 client_pfx = socket.inet_pton(socket.AF_INET, "192.168.0.0")
84 self.vapi.map_add_domain(map_dst,
92 # Fire in a v4 packet that will be encapped to the BR
94 v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
95 IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
96 UDP(sport=20000, dport=10000) /
99 self.send_and_assert_encapped(v4, map_src, "2001::c0a8:0:0")
102 # Fire in a V6 encapped packet.
103 # expect a decapped packet on the inside ip4 link
105 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
106 IPv6(dst=map_src, src="2001::1") /
107 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
108 UDP(sport=20000, dport=10000) /
111 self.pg1.add_stream(p)
113 self.pg_enable_capture(self.pg_interfaces)
116 rx = self.pg0.get_capture(1)
119 self.assertFalse(rx.haslayer(IPv6))
120 self.assertEqual(rx[IP].src, p[IP].src)
121 self.assertEqual(rx[IP].dst, p[IP].dst)
124 # Pre-resolve. No API for this!!
126 self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
128 self.send_and_assert_no_replies(self.pg0, v4,
129 "resovled via default route")
132 # Add a route to 4001::1. Expect the encapped traffic to be
133 # sent via that routes next-hop
135 pre_res_route = VppIpRoute(
136 self, "4001::1", 128,
137 [VppRoutePath(self.pg1.remote_hosts[2].ip6,
138 self.pg1.sw_if_index,
139 proto=DpoProto.DPO_PROTO_IP6)],
141 pre_res_route.add_vpp_config()
143 self.send_and_assert_encapped(v4, map_src,
145 dmac=self.pg1.remote_hosts[2].mac)
148 # change the route to the pre-solved next-hop
150 pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
151 self.pg1.sw_if_index,
152 proto=DpoProto.DPO_PROTO_IP6)])
153 pre_res_route.add_vpp_config()
155 self.send_and_assert_encapped(v4, map_src,
157 dmac=self.pg1.remote_hosts[3].mac)
160 # cleanup. The test infra's object registry will ensure
161 # the route is really gone and thus that the unresolve worked.
163 pre_res_route.remove_vpp_config()
164 self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
166 def validate(self, rx, expected):
167 self.assertEqual(rx, expected.__class__(str(expected)))
169 def payload(self, len):
172 def test_map_t(self):
176 # Add a domain that maps from pg0 to pg1
178 map_dst = socket.inet_pton(socket.AF_INET6, "2001:db8::")
179 map_src = socket.inet_pton(socket.AF_INET6, "1234:5678:90ab:cdef::")
180 ip4_pfx = socket.inet_pton(socket.AF_INET, "192.168.0.0")
182 self.vapi.map_add_domain(map_dst, 32, map_src, 64, ip4_pfx,
185 # Enable MAP-T on interfaces.
187 # self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1)
188 # self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1)
190 map_route = VppIpRoute(self,
193 [VppRoutePath(self.pg1.remote_ip6,
194 self.pg1.sw_if_index,
195 proto=DpoProto.DPO_PROTO_IP6)],
197 map_route.add_vpp_config()
200 # Send a v4 packet that will be translated
202 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
203 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
204 payload = TCP(sport=0xabcd, dport=0xabcd)
206 p4 = (p_ether / p_ip4 / payload)
207 p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
208 dst="2001:db8:1f0::c0a8:1:f") / payload)
209 p6_translated.hlim -= 1
210 rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
212 self.validate(p[1], p6_translated)
214 # Send back an IPv6 packet that will be "untranslated"
215 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
216 p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
217 dst='1234:5678:90ab:cdef:ac:1001:200:0')
218 p6 = (p_ether6 / p_ip6 / payload)
219 p4_translated = (IP(src='192.168.0.1',
220 dst=self.pg0.remote_ip4) / payload)
222 p4_translated.ttl -= 1
223 rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
225 self.validate(p[1], p4_translated)
228 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
229 p4 = (p_ether / ip4_ttl_expired / payload)
231 icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
232 dst=self.pg0.remote_ip4) /
233 ICMP(type='time-exceeded',
234 code='ttl-zero-during-transit') /
235 IP(src=self.pg0.remote_ip4,
236 dst='192.168.0.1', ttl=0) / payload)
237 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
239 self.validate(p[1], icmp4_reply)
242 This one is broken, cause it would require hairpinning...
244 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
245 p4 = (p_ether / ip4_ttl_expired / payload)
247 icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
248 dst=self.pg0.remote_ip4) / \
249 ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
250 IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
251 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
253 self.validate(p[1], icmp4_reply)
257 ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
258 dst='1234:5678:90ab:cdef:ac:1001:200:0')
259 p6 = (p_ether6 / ip6_hlim_expired / payload)
261 icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
262 dst="2001:db8:1ab::c0a8:1:ab") /
263 ICMPv6TimeExceeded(code=0) /
264 IPv6(src="2001:db8:1ab::c0a8:1:ab",
265 dst='1234:5678:90ab:cdef:ac:1001:200:0',
267 rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
269 self.validate(p[1], icmp6_reply)
271 # IPv4 Well-known port
272 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
273 payload = UDP(sport=200, dport=200)
274 p4 = (p_ether / p_ip4 / payload)
275 self.send_and_assert_no_replies(self.pg0, p4*1)
277 # IPv6 Well-known port
278 payload = UDP(sport=200, dport=200)
279 p6 = (p_ether6 / p_ip6 / payload)
280 self.send_and_assert_no_replies(self.pg1, p6*1)
282 # Packet fragmentation
283 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
284 p4 = (p_ether / p_ip4 / payload)
285 self.pg_enable_capture()
286 self.pg0.add_stream(p4)
288 rx = self.pg1.get_capture(2)
291 # TODO: Manual validation
292 # self.validate(p[1], icmp4_reply)
294 # Packet fragmentation send fragments
295 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
296 p4 = (p_ether / p_ip4 / payload)
297 frags = fragment(p4, fragsize=1000)
298 self.pg_enable_capture()
299 self.pg0.add_stream(frags)
301 rx = self.pg1.get_capture(2)
305 # reass_pkt = reassemble(rx)
308 # self.validate(reass_pkt, p4_reply)
311 if __name__ == '__main__':
312 unittest.main(testRunner=VppTestRunner)