6 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9 from ipaddress import IPv6Network, IPv4Network
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
12 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
15 class TestMAP(VppTestCase):
19 super(TestMAP, self).setUp()
21 # create 2 pg interfaces
22 self.create_pg_interfaces(range(4))
24 # pg0 is 'inside' IPv4
27 self.pg0.resolve_arp()
29 # pg1 is 'outside' IPv6
32 self.pg1.generate_remote_hosts(4)
33 self.pg1.configure_ipv6_neighbors()
36 super(TestMAP, self).tearDown()
37 for i in self.pg_interfaces:
42 def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
44 dmac = self.pg1.remote_mac
46 self.pg0.add_stream(tx)
48 self.pg_enable_capture(self.pg_interfaces)
51 rx = self.pg1.get_capture(1)
54 self.assertEqual(rx[Ether].dst, dmac)
55 self.assertEqual(rx[IP].src, tx[IP].src)
56 self.assertEqual(rx[IPv6].src, ip6_src)
57 self.assertEqual(rx[IPv6].dst, ip6_dst)
63 # Add a route to the MAP-BR
67 map_route = VppIpRoute(self,
70 [VppRoutePath(self.pg1.remote_ip6,
72 proto=DpoProto.DPO_PROTO_IP6)],
74 map_route.add_vpp_config()
77 # Add a domain that maps from pg0 to pg1
79 map_dst = '{}/{}'.format(map_br_pfx, map_br_pfx_len)
80 map_src = '3000::1/128'
81 client_pfx = '192.168.0.0/16'
82 self.vapi.map_add_domain(map_dst, map_src, client_pfx)
85 # Fire in a v4 packet that will be encapped to the BR
87 v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
88 IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
89 UDP(sport=20000, dport=10000) /
92 self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
95 # Fire in a V6 encapped packet.
96 # expect a decapped packet on the inside ip4 link
98 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
99 IPv6(dst='3000::1', src="2001::1") /
100 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
101 UDP(sport=20000, dport=10000) /
104 self.pg1.add_stream(p)
106 self.pg_enable_capture(self.pg_interfaces)
109 rx = self.pg0.get_capture(1)
112 self.assertFalse(rx.haslayer(IPv6))
113 self.assertEqual(rx[IP].src, p[IP].src)
114 self.assertEqual(rx[IP].dst, p[IP].dst)
117 # Pre-resolve. No API for this!!
119 self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
121 self.send_and_assert_no_replies(self.pg0, v4,
122 "resovled via default route")
125 # Add a route to 4001::1. Expect the encapped traffic to be
126 # sent via that routes next-hop
128 pre_res_route = VppIpRoute(
129 self, "4001::1", 128,
130 [VppRoutePath(self.pg1.remote_hosts[2].ip6,
131 self.pg1.sw_if_index,
132 proto=DpoProto.DPO_PROTO_IP6)],
134 pre_res_route.add_vpp_config()
136 self.send_and_assert_encapped(v4, "3000::1",
138 dmac=self.pg1.remote_hosts[2].mac)
141 # change the route to the pre-solved next-hop
143 pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
144 self.pg1.sw_if_index,
145 proto=DpoProto.DPO_PROTO_IP6)])
146 pre_res_route.add_vpp_config()
148 self.send_and_assert_encapped(v4, "3000::1",
150 dmac=self.pg1.remote_hosts[3].mac)
153 # cleanup. The test infra's object registry will ensure
154 # the route is really gone and thus that the unresolve worked.
156 pre_res_route.remove_vpp_config()
157 self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
159 def validate(self, rx, expected):
160 self.assertEqual(rx, expected.__class__(str(expected)))
162 def payload(self, len):
165 def test_map_t(self):
169 # Add a domain that maps from pg0 to pg1
171 self.vapi.map_add_domain('2001:db8::/32',
172 '1234:5678:90ab:cdef::/64',
176 # Enable MAP-T on interfaces.
178 # self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1)
179 # self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1)
181 map_route = VppIpRoute(self,
184 [VppRoutePath(self.pg1.remote_ip6,
185 self.pg1.sw_if_index,
186 proto=DpoProto.DPO_PROTO_IP6)],
188 map_route.add_vpp_config()
191 # Send a v4 packet that will be translated
193 p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
194 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
195 payload = TCP(sport=0xabcd, dport=0xabcd)
197 p4 = (p_ether / p_ip4 / payload)
198 p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
199 dst="2001:db8:1f0::c0a8:1:f") / payload)
200 p6_translated.hlim -= 1
201 rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
203 self.validate(p[1], p6_translated)
205 # Send back an IPv6 packet that will be "untranslated"
206 p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
207 p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
208 dst='1234:5678:90ab:cdef:ac:1001:200:0')
209 p6 = (p_ether6 / p_ip6 / payload)
210 p4_translated = (IP(src='192.168.0.1',
211 dst=self.pg0.remote_ip4) / payload)
213 p4_translated.ttl -= 1
214 rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
216 self.validate(p[1], p4_translated)
219 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
220 p4 = (p_ether / ip4_ttl_expired / payload)
222 icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
223 dst=self.pg0.remote_ip4) /
224 ICMP(type='time-exceeded',
225 code='ttl-zero-during-transit') /
226 IP(src=self.pg0.remote_ip4,
227 dst='192.168.0.1', ttl=0) / payload)
228 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
230 self.validate(p[1], icmp4_reply)
233 This one is broken, cause it would require hairpinning...
235 ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
236 p4 = (p_ether / ip4_ttl_expired / payload)
238 icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
239 dst=self.pg0.remote_ip4) / \
240 ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
241 IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
242 rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
244 self.validate(p[1], icmp4_reply)
248 ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
249 dst='1234:5678:90ab:cdef:ac:1001:200:0')
250 p6 = (p_ether6 / ip6_hlim_expired / payload)
252 icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
253 dst="2001:db8:1ab::c0a8:1:ab") /
254 ICMPv6TimeExceeded(code=0) /
255 IPv6(src="2001:db8:1ab::c0a8:1:ab",
256 dst='1234:5678:90ab:cdef:ac:1001:200:0',
258 rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
260 self.validate(p[1], icmp6_reply)
262 # IPv4 Well-known port
263 p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
264 payload = UDP(sport=200, dport=200)
265 p4 = (p_ether / p_ip4 / payload)
266 self.send_and_assert_no_replies(self.pg0, p4*1)
268 # IPv6 Well-known port
269 payload = UDP(sport=200, dport=200)
270 p6 = (p_ether6 / p_ip6 / payload)
271 self.send_and_assert_no_replies(self.pg1, p6*1)
273 # Packet fragmentation
274 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
275 p4 = (p_ether / p_ip4 / payload)
276 self.pg_enable_capture()
277 self.pg0.add_stream(p4)
279 rx = self.pg1.get_capture(2)
282 # TODO: Manual validation
283 # self.validate(p[1], icmp4_reply)
285 # Packet fragmentation send fragments
286 payload = UDP(sport=40000, dport=4000) / self.payload(1453)
287 p4 = (p_ether / p_ip4 / payload)
288 frags = fragment(p4, fragsize=1000)
289 self.pg_enable_capture()
290 self.pg0.add_stream(frags)
292 rx = self.pg1.get_capture(2)
296 # reass_pkt = reassemble(rx)
299 # self.validate(reass_pkt, p4_reply)
302 if __name__ == '__main__':
303 unittest.main(testRunner=VppTestRunner)