3 from framework import VppTestCase
4 from ipaddress import IPv4Address
5 from ipaddress import IPv6Address
6 from scapy.contrib.gtp import *
7 from scapy.all import *
10 class TestSRv6uSIDSRH(VppTestCase):
11 """ SRv6 End.uSID w/ SRH """
15 super(TestSRv6uSIDSRH, cls).setUpClass()
17 cls.create_pg_interfaces(range(2))
18 cls.pg_if_i = cls.pg_interfaces[0]
19 cls.pg_if_o = cls.pg_interfaces[1]
21 cls.pg_if_i.config_ip6()
22 cls.pg_if_o.config_ip6()
24 cls.ip6_nhop = cls.pg_if_o.remote_ip6
26 cls.ip6_dst = "1111:2222:aaaa:bbbb:cccc:dddd:eeee:ffff"
27 cls.ip6_src = "1111:2222::1"
29 for pg_if in cls.pg_interfaces:
31 pg_if.resolve_ndp(timeout=5)
34 super(TestSRv6uSIDSRH, cls).tearDownClass()
37 def create_packets(self, inner):
39 ip6_dst = IPv6Address(str(self.ip6_dst))
41 ip6_src = IPv6Address(str(self.ip6_src))
43 self.logger.info("ip6 dst: {}".format(ip6_dst))
44 self.logger.info("ip6 src: {}".format(ip6_src))
49 IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
50 IPv6ExtHdrSegmentRouting(segleft=1,
55 "1111:2222:aaaa:bbbb::"]) /
57 UDP(sport=1000, dport=23))
58 self.logger.info(pkt.show2(dump=True))
63 def test_srv6_usid_srh(self):
64 """ test_srv6_usid_srh """
65 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
67 self.vapi.cli("set sr encaps source addr A1::1")
69 "sr localsid prefix 1111:2222:aaaa::/48 behavior un 16")
71 "ip route add 1111:2222:bbbb::/48 via {}".format(self.ip6_nhop))
73 self.logger.info(self.vapi.cli("show sr localsids"))
75 self.vapi.cli("clear errors")
77 self.pg0.add_stream(pkts)
78 self.pg_enable_capture(self.pg_interfaces)
81 self.logger.info(self.vapi.cli("show errors"))
82 self.logger.info(self.vapi.cli("show int address"))
84 capture = self.pg1.get_capture(len(pkts))
87 self.logger.info(pkt.show2(dump=True))
89 pkt[IPv6].dst, "1111:2222:bbbb:cccc:dddd:eeee:ffff:0")
92 class TestSRv6uSID(VppTestCase):
93 """ SRv6 End.uSID w/o SRH """
97 super(TestSRv6uSID, cls).setUpClass()
99 cls.create_pg_interfaces(range(2))
100 cls.pg_if_i = cls.pg_interfaces[0]
101 cls.pg_if_o = cls.pg_interfaces[1]
103 cls.pg_if_i.config_ip6()
104 cls.pg_if_o.config_ip6()
106 cls.ip6_nhop = cls.pg_if_o.remote_ip6
108 cls.ip6_dst = "1111:2222:aaaa:bbbb:cccc:dddd:eeee:ffff"
109 cls.ip6_src = "1111:2222::1"
111 for pg_if in cls.pg_interfaces:
113 pg_if.resolve_ndp(timeout=5)
116 super(TestSRv6uSID, cls).tearDownClass()
119 def create_packets(self, inner):
121 ip6_dst = IPv6Address(str(self.ip6_dst))
123 ip6_src = IPv6Address(str(self.ip6_src))
125 self.logger.info("ip6 dst: {}".format(ip6_dst))
126 self.logger.info("ip6 src: {}".format(ip6_src))
131 IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
133 UDP(sport=1000, dport=23))
134 self.logger.info(pkt.show2(dump=True))
139 def test_srv6_usid(self):
140 """ test_srv6_usid """
141 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
143 self.vapi.cli("set sr encaps source addr A1::1")
145 "sr localsid prefix 1111:2222:aaaa::/48 behavior un 16")
147 "ip route add 1111:2222:bbbb::/48 via {}".format(self.ip6_nhop))
149 self.logger.info(self.vapi.cli("show sr localsids"))
151 self.vapi.cli("clear errors")
153 self.pg0.add_stream(pkts)
154 self.pg_enable_capture(self.pg_interfaces)
157 self.logger.info(self.vapi.cli("show errors"))
158 self.logger.info(self.vapi.cli("show int address"))
160 capture = self.pg1.get_capture(len(pkts))
163 self.logger.info(pkt.show2(dump=True))
165 pkt[IPv6].dst, "1111:2222:bbbb:cccc:dddd:eeee:ffff:0")
168 class TestSRv6uSIDFlexSRH(VppTestCase):
169 """ SRv6 End.uSID.Flex w/ SRH """
173 super(TestSRv6uSIDFlexSRH, cls).setUpClass()
175 cls.create_pg_interfaces(range(2))
176 cls.pg_if_i = cls.pg_interfaces[0]
177 cls.pg_if_o = cls.pg_interfaces[1]
179 cls.pg_if_i.config_ip6()
180 cls.pg_if_o.config_ip6()
182 cls.ip6_nhop = cls.pg_if_o.remote_ip6
184 cls.ip6_dst = "1111:2222:aaaa:bbbb:cccc:dddd:eeee:ffff"
185 cls.ip6_src = "1111:2222::1"
187 for pg_if in cls.pg_interfaces:
189 pg_if.resolve_ndp(timeout=5)
192 super(TestSRv6uSIDFlexSRH, cls).tearDownClass()
195 def create_packets(self, inner):
197 ip6_dst = IPv6Address(str(self.ip6_dst))
199 ip6_src = IPv6Address(str(self.ip6_src))
201 self.logger.info("ip6 dst: {}".format(ip6_dst))
202 self.logger.info("ip6 src: {}".format(ip6_src))
207 IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
208 IPv6ExtHdrSegmentRouting(segleft=1,
213 "1111:2222:aaaa:bbbb::"]) /
215 UDP(sport=1000, dport=23))
216 self.logger.info(pkt.show2(dump=True))
221 def test_srv6_usid_flex_srh(self):
222 """ test_srv6_usid_flex_srh """
223 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
225 self.vapi.cli("set sr encaps source addr A1::1")
227 "sr localsid prefix 1111:2222:aaaa::/48 behavior un.flex 16")
229 "ip route add 1111:2222:bbbb::/48 via {}".format(self.ip6_nhop))
231 self.logger.info(self.vapi.cli("show sr localsids"))
233 self.vapi.cli("clear errors")
235 self.pg0.add_stream(pkts)
236 self.pg_enable_capture(self.pg_interfaces)
239 self.logger.info(self.vapi.cli("show errors"))
240 self.logger.info(self.vapi.cli("show int address"))
242 capture = self.pg1.get_capture(len(pkts))
245 self.logger.info(pkt.show2(dump=True))
247 pkt[IPv6].dst, "1111:2222:bbbb:cccc:dddd:eeee:ffff:0")
250 class TestSRv6uSIDFlex(VppTestCase):
251 """ SRv6 End.uSID.Flex w/o SRH """
255 super(TestSRv6uSIDFlex, cls).setUpClass()
257 cls.create_pg_interfaces(range(2))
258 cls.pg_if_i = cls.pg_interfaces[0]
259 cls.pg_if_o = cls.pg_interfaces[1]
261 cls.pg_if_i.config_ip6()
262 cls.pg_if_o.config_ip6()
264 cls.ip6_nhop = cls.pg_if_o.remote_ip6
266 cls.ip6_dst = "1111:2222:aaaa:bbbb:cccc:dddd:eeee:ffff"
267 cls.ip6_src = "1111:2222::1"
269 for pg_if in cls.pg_interfaces:
271 pg_if.resolve_ndp(timeout=5)
274 super(TestSRv6uSIDFlex, cls).tearDownClass()
277 def create_packets(self, inner):
279 ip6_dst = IPv6Address(str(self.ip6_dst))
281 ip6_src = IPv6Address(str(self.ip6_src))
283 self.logger.info("ip6 dst: {}".format(ip6_dst))
284 self.logger.info("ip6 src: {}".format(ip6_src))
289 IPv6(dst=str(ip6_dst), src=str(ip6_src)) /
291 UDP(sport=1000, dport=23))
292 self.logger.info(pkt.show2(dump=True))
297 def test_srv6_usid_flex(self):
298 """ test_srv6_usid_flex """
299 pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
301 self.vapi.cli("set sr encaps source addr A1::1")
303 "sr localsid prefix 1111:2222:aaaa::/48 behavior un.flex 16")
305 "ip route add 1111:2222:bbbb::/48 via {}".format(self.ip6_nhop))
307 self.logger.info(self.vapi.cli("show sr localsids"))
309 self.vapi.cli("clear errors")
311 self.pg0.add_stream(pkts)
312 self.pg_enable_capture(self.pg_interfaces)
315 self.logger.info(self.vapi.cli("show errors"))
316 self.logger.info(self.vapi.cli("show int address"))
318 capture = self.pg1.get_capture(len(pkts))
321 self.logger.info(pkt.show2(dump=True))
323 pkt[IPv6].dst, "1111:2222:bbbb:cccc:dddd:eeee:ffff:0")