3 from framework import VppTestCase, VppTestRunner
4 from vpp_udp_encap import *
5 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether, ARP
9 from scapy.layers.inet import IP, UDP
10 from scapy.layers.inet6 import IPv6
11 from scapy.contrib.mpls import MPLS
14 class TestUdpEncap(VppTestCase):
15 """ UDP Encap Test Case """
18 super(TestUdpEncap, self).setUp()
20 # create 2 pg interfaces
21 self.create_pg_interfaces(range(4))
24 # assign them different tables.
28 for i in self.pg_interfaces:
32 tbl = VppIpTable(self, table_id)
34 self.tables.append(tbl)
35 tbl = VppIpTable(self, table_id, is_ip6=1)
37 self.tables.append(tbl)
39 i.set_table_ip4(table_id)
40 i.set_table_ip6(table_id)
48 for i in self.pg_interfaces:
55 super(TestUdpEncap, self).tearDown()
57 def validate_outer4(self, rx, encap_obj):
58 self.assertEqual(rx[IP].src, encap_obj.src_ip_s)
59 self.assertEqual(rx[IP].dst, encap_obj.dst_ip_s)
60 self.assertEqual(rx[UDP].sport, encap_obj.src_port)
61 self.assertEqual(rx[UDP].dport, encap_obj.dst_port)
63 def validate_outer6(self, rx, encap_obj):
64 self.assertEqual(rx[IPv6].src, encap_obj.src_ip_s)
65 self.assertEqual(rx[IPv6].dst, encap_obj.dst_ip_s)
66 self.assertEqual(rx[UDP].sport, encap_obj.src_port)
67 self.assertEqual(rx[UDP].dport, encap_obj.dst_port)
69 def validate_inner4(self, rx, tx, ttl=None):
70 self.assertEqual(rx.src, tx[IP].src)
71 self.assertEqual(rx.dst, tx[IP].dst)
73 self.assertEqual(rx.ttl, ttl)
75 self.assertEqual(rx.ttl, tx[IP].ttl)
77 def validate_inner6(self, rx, tx):
78 self.assertEqual(rx.src, tx[IPv6].src)
79 self.assertEqual(rx.dst, tx[IPv6].dst)
80 self.assertEqual(rx.hlim, tx[IPv6].hlim)
82 def send_and_expect(self, input, output, pkts):
83 self.vapi.cli("clear trace")
84 input.add_stream(pkts)
85 self.pg_enable_capture(self.pg_interfaces)
87 rx = output.get_capture(len(pkts))
90 def test_udp_encap(self):
95 # construct a UDP encap object through each of the peers
96 # v4 through the first two peears, v6 through the second.
98 udp_encap_0 = VppUdpEncap(self, 0,
102 udp_encap_1 = VppUdpEncap(self, 1,
107 udp_encap_2 = VppUdpEncap(self, 2,
113 udp_encap_3 = VppUdpEncap(self, 3,
119 udp_encap_0.add_vpp_config()
120 udp_encap_1.add_vpp_config()
121 udp_encap_2.add_vpp_config()
122 udp_encap_3.add_vpp_config()
125 # Routes via each UDP encap object - all combinations of v4 and v6.
127 route_4o4 = VppIpRoute(self, "1.1.0.1", 32,
128 [VppRoutePath("0.0.0.0",
132 route_4o6 = VppIpRoute(self, "1.1.2.1", 32,
133 [VppRoutePath("0.0.0.0",
137 route_6o4 = VppIpRoute(self, "2001::1", 128,
138 [VppRoutePath("0.0.0.0",
143 route_6o6 = VppIpRoute(self, "2001::3", 128,
144 [VppRoutePath("0.0.0.0",
149 route_4o4.add_vpp_config()
150 route_4o6.add_vpp_config()
151 route_6o6.add_vpp_config()
152 route_6o4.add_vpp_config()
157 p_4o4 = (Ether(src=self.pg0.remote_mac,
158 dst=self.pg0.local_mac) /
159 IP(src="2.2.2.2", dst="1.1.0.1") /
160 UDP(sport=1234, dport=1234) /
162 rx = self.send_and_expect(self.pg0, self.pg0, p_4o4*65)
164 self.validate_outer4(p, udp_encap_0)
165 p = IP(p["UDP"].payload.load)
166 self.validate_inner4(p, p_4o4)
171 p_4o6 = (Ether(src=self.pg0.remote_mac,
172 dst=self.pg0.local_mac) /
173 IP(src="2.2.2.2", dst="1.1.2.1") /
174 UDP(sport=1234, dport=1234) /
176 rx = self.send_and_expect(self.pg0, self.pg2, p_4o6*65)
178 self.validate_outer6(p, udp_encap_2)
179 p = IP(p["UDP"].payload.load)
180 self.validate_inner4(p, p_4o6)
185 p_6o4 = (Ether(src=self.pg0.remote_mac,
186 dst=self.pg0.local_mac) /
187 IPv6(src="2001::100", dst="2001::1") /
188 UDP(sport=1234, dport=1234) /
190 rx = self.send_and_expect(self.pg0, self.pg1, p_6o4*65)
192 self.validate_outer4(p, udp_encap_1)
193 p = IPv6(p["UDP"].payload.load)
194 self.validate_inner6(p, p_6o4)
199 p_6o6 = (Ether(src=self.pg0.remote_mac,
200 dst=self.pg0.local_mac) /
201 IPv6(src="2001::100", dst="2001::3") /
202 UDP(sport=1234, dport=1234) /
204 rx = self.send_and_expect(self.pg0, self.pg3, p_6o6*65)
206 self.validate_outer6(p, udp_encap_3)
207 p = IPv6(p["UDP"].payload.load)
208 self.validate_inner6(p, p_6o6)
211 # A route with an output label
212 # the TTL of the inner packet is decremented on LSP ingress
214 route_4oMPLSo4 = VppIpRoute(self, "1.1.2.22", 32,
215 [VppRoutePath("0.0.0.0",
220 route_4oMPLSo4.add_vpp_config()
222 p_4omo4 = (Ether(src=self.pg0.remote_mac,
223 dst=self.pg0.local_mac) /
224 IP(src="2.2.2.2", dst="1.1.2.22") /
225 UDP(sport=1234, dport=1234) /
227 rx = self.send_and_expect(self.pg0, self.pg1, p_4omo4*65)
229 self.validate_outer4(p, udp_encap_1)
230 p = MPLS(p["UDP"].payload.load)
231 self.validate_inner4(p, p_4omo4, ttl=63)
234 if __name__ == '__main__':
235 unittest.main(testRunner=VppTestRunner)