i.set_table_ip4(0)
i.set_table_ip6(0)
- @unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Encaps(self):
"""Test SRv6 Transit.Encaps behavior for IPv6."""
# send traffic to one destination interface
)
route.add_vpp_config()
- # configure encaps IPv6 source address
- # needs to be done before SR Policy config
- # TODO: API?
- self.vapi.cli("set sr encaps source addr a3::")
-
bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
- sr_policy = VppSRv6Policy(
+ sr_policy = VppSRv6PolicyV2(
self,
bsid=bsid,
is_encap=1,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
weight=1,
fib_table=0,
- segments=["a4::", "a5::", "a6::c7"],
source="a3::",
+ encap_src="a3::",
)
- sr_policy.add_vpp_config()
+ sr_policy.add_vpp_config(segments=["a4::", "a5::", "a6::c7"])
self.sr_policy = sr_policy
# log the sr policies
pol_steering.add_vpp_config()
# log the sr steering policies
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# create packets
count = len(self.pg_packet_sizes)
# remove SR steering
pol_steering.remove_vpp_config()
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# remove SR Policies
self.sr_policy.remove_vpp_config()
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
weight=1,
fib_table=0,
- segments=["a4::", "a5::", "a6::c7"],
encap_src=other_src_ip,
source=other_src_ip,
)
- sr_policy.add_vpp_config()
+ sr_policy.add_vpp_config(segments=["a4::", "a5::", "a6::c7"])
self.sr_policy = sr_policy
# log the sr policies
# cleanup interfaces
self.teardown_interfaces()
- @unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Insert(self):
"""Test SRv6 Transit.Insert behavior (IPv6 only)."""
# send traffic to one destination interface
)
route.add_vpp_config()
- # configure encaps IPv6 source address
- # needs to be done before SR Policy config
- # TODO: API?
- self.vapi.cli("set sr encaps source addr a3::")
-
bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
- sr_policy = VppSRv6Policy(
+ sr_policy = VppSRv6PolicyV2(
self,
bsid=bsid,
is_encap=0,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
weight=1,
fib_table=0,
- segments=["a4::", "a5::", "a6::c7"],
source="a3::",
+ encap_src="a3::",
)
- sr_policy.add_vpp_config()
+ sr_policy.add_vpp_config(segments=["a4::", "a5::", "a6::c7"])
self.sr_policy = sr_policy
# log the sr policies
pol_steering.add_vpp_config()
# log the sr steering policies
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# create packets
count = len(self.pg_packet_sizes)
# remove SR steering
pol_steering.remove_vpp_config()
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# remove SR Policies
self.sr_policy.remove_vpp_config()
# cleanup interfaces
self.teardown_interfaces()
- @unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Encaps_IPv4(self):
"""Test SRv6 Transit.Encaps behavior for IPv4."""
# send traffic to one destination interface
)
route.add_vpp_config()
- # configure encaps IPv6 source address
- # needs to be done before SR Policy config
- # TODO: API?
- self.vapi.cli("set sr encaps source addr a3::")
-
bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
- sr_policy = VppSRv6Policy(
+ sr_policy = VppSRv6PolicyV2(
self,
bsid=bsid,
is_encap=1,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
weight=1,
fib_table=0,
- segments=["a4::", "a5::", "a6::c7"],
source="a3::",
+ encap_src="a3::",
)
- sr_policy.add_vpp_config()
+ sr_policy.add_vpp_config(segments=["a4::", "a5::", "a6::c7"])
+ self.sr_policy = sr_policy
+
+ # log the sr policies
+ self.logger.info(self.vapi.cli("show sr policies"))
+
+ # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
+ # use the bsid of the above self.sr_policy
+ pol_steering = VppSRv6Steering(
+ self,
+ bsid=self.sr_policy.bsid,
+ prefix="7.1.1.0",
+ mask_width=24,
+ traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
+ sr_policy_index=0,
+ table_id=0,
+ sw_if_index=0,
+ )
+ pol_steering.add_vpp_config()
+
+ # log the sr steering policies
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
+
+ # create packets
+ count = len(self.pg_packet_sizes)
+ dst_inner = "7.1.1.123"
+ pkts = []
+
+ # create IPv4 packets
+ packet_header = self.create_packet_header_IPv4(dst_inner)
+ # create traffic stream pg0->pg1
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
+ )
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # remove SR steering
+ pol_steering.remove_vpp_config()
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
+
+ # remove SR Policies
+ self.sr_policy.remove_vpp_config()
+ self.logger.info(self.vapi.cli("show sr policies"))
+
+ # remove FIB entries
+ # done by tearDown
+
+ # cleanup interfaces
+ self.teardown_interfaces()
+
+ def test_SRv6_T_Encaps_IPv4_ECMP(self):
+ """Test SRv6 Transit.Encaps behavior for IPv4 ECMP"""
+ # send traffic to one destination interface using multipath
+ # source interface is IPv4 only
+ # destination interface is IPv6 only
+ self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
+
+ # configure FIB entries
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
+ route.add_vpp_config()
+
+ route = VppIpRoute(
+ self, "b4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
+ route.add_vpp_config()
+
+ # configure SRv6 Policy
+ sr_policy = VppSRv6PolicyV2(
+ self,
+ bsid="a3::9999:1",
+ is_encap=1,
+ sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
+ weight=1,
+ fib_table=0,
+ source="a3::",
+ encap_src="a3::",
+ )
+ # add multipath
+ # Note: segment list order: first -> last
+ sr_policy.add_vpp_config(segments=["a4::", "a5::", "a6::c7"])
+ sr_policy.mod_vpp_config(segments=["b4::", "b5::", "b6::d7"])
+ self.assertTrue(sr_policy.query_vpp_config())
self.sr_policy = sr_policy
# log the sr policies
sw_if_index=0,
)
pol_steering.add_vpp_config()
+ pol_steering.query_vpp_config()
# log the sr steering policies
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# create packets
count = len(self.pg_packet_sizes)
# remove SR steering
pol_steering.remove_vpp_config()
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# remove SR Policies
self.sr_policy.remove_vpp_config()
pol_steering.add_vpp_config()
# log the sr steering policies
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# create packets
count = len(self.pg_packet_sizes)
# remove SR steering
pol_steering.remove_vpp_config()
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# remove SR Policies
self.sr_policy.remove_vpp_config()
# remove classifier SR steering
# classifier_steering.remove_vpp_config()
- self.logger.info(self.vapi.cli("show sr steering policies"))
+ self.logger.info(self.vapi.cli("show sr steering-policies"))
# remove SR Policies
self.sr_policy.remove_vpp_config()
tx_ip = tx_pkt.getlayer(IPv6)
- # expected segment-list
- seglist = self.sr_policy.segments
- # reverse list to get order as in SRH
- tx_seglist = seglist[::-1]
-
# get source address of SR Policy
sr_policy_source = self.sr_policy.source
# received ip.src should be equal to SR Policy source
self.assertEqual(rx_ip.src, sr_policy_source)
- # received ip.dst should be equal to expected sidlist[lastentry]
- self.assertEqual(rx_ip.dst, tx_seglist[-1])
- # rx'ed seglist should be equal to expected seglist
- self.assertEqual(rx_srh.addresses, tx_seglist)
- # segleft should be equal to size expected seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+
+ hit = None
+ for seglist in self.sr_policy.seg_lists:
+ # reverse list to get order as in SRH
+ tx_seglist = seglist[::-1]
+
+ # received ip.dst should be equal to sidlist[lastentry]
+ if rx_ip.dst == tx_seglist[-1]:
+ hit = True
+ # rx'ed seglist should be equal to seglist
+ self.assertEqual(rx_srh.addresses, tx_seglist)
+ # segleft should be equal to size seglist-1
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+ break
+
+ self.assertTrue(hit)
+
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
tx_ip = tx_pkt.getlayer(IP)
- # expected segment-list
- seglist = self.sr_policy.segments
- # reverse list to get order as in SRH
- tx_seglist = seglist[::-1]
-
# get source address of SR Policy
sr_policy_source = self.sr_policy.source
# received ip.src should be equal to SR Policy source
self.assertEqual(rx_ip.src, sr_policy_source)
- # received ip.dst should be equal to sidlist[lastentry]
- self.assertEqual(rx_ip.dst, tx_seglist[-1])
- # rx'ed seglist should be equal to seglist
- self.assertEqual(rx_srh.addresses, tx_seglist)
- # segleft should be equal to size seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+
+ hit = None
+ for seglist in self.sr_policy.seg_lists:
+ # reverse list to get order as in SRH
+ tx_seglist = seglist[::-1]
+
+ # received ip.dst should be equal to sidlist[lastentry]
+ if rx_ip.dst == tx_seglist[-1]:
+ hit = True
+ # rx'ed seglist should be equal to seglist
+ self.assertEqual(rx_srh.addresses, tx_seglist)
+ # segleft should be equal to size seglist-1
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+ break
+
+ self.assertTrue(hit)
+
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
# probably other ways to accomplish this are possible
tx_ip = IP(scapy.compat.raw(tx_ip))
- self.assertEqual(rx_srh.payload, tx_ip)
+ self.assertEqual(rx_pkt[IP], tx_ip)
self.logger.debug("packet verification: SUCCESS")
tx_ether = tx_pkt.getlayer(Ether)
# expected segment-list
- seglist = self.sr_policy.segments
+ seglist = self.sr_policy.seg_lists
# reverse list to get order as in SRH
tx_seglist = seglist[::-1]
# received ip.src should be equal to SR Policy source
self.assertEqual(rx_ip.src, sr_policy_source)
- # received ip.dst should be equal to sidlist[lastentry]
- self.assertEqual(rx_ip.dst, tx_seglist[-1])
- # rx'ed seglist should be equal to seglist
- self.assertEqual(rx_srh.addresses, tx_seglist)
- # segleft should be equal to size seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+
+ hit = None
+ for seglist in self.sr_policy.seg_lists:
+ # reverse list to get order as in SRH
+ tx_seglist = seglist[::-1]
+
+ # received ip.dst should be equal to sidlist[lastentry]
+ if rx_ip.dst == tx_seglist[-1]:
+ hit = True
+ # rx'ed seglist should be equal to seglist
+ self.assertEqual(rx_srh.addresses, tx_seglist)
+ # segleft should be equal to size seglist-1
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+ break
+
+ self.assertTrue(hit)
+
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
# nh should be "No Next Header" (143)
tx_ip2 = tx_pkt.getlayer(IPv6, 2)
tx_udp = tx_pkt[UDP]
- # expected segment-list (make copy of SR Policy segment list)
- seglist = self.sr_policy.segments[:]
- # expected seglist has initial dest addr as last segment
- seglist.append(tx_ip.dst)
- # reverse list to get order as in SRH
- tx_seglist = seglist[::-1]
-
# get source address of SR Policy
sr_policy_source = self.sr_policy.source
# rx'ed ip.src should be equal to tx'ed ip.src
self.assertEqual(rx_ip.src, tx_ip.src)
- # rx'ed ip.dst should be equal to sidlist[lastentry]
- self.assertEqual(rx_ip.dst, tx_seglist[-1])
- # rx'ed seglist should be equal to expected seglist
- self.assertEqual(rx_srh.addresses, tx_seglist)
- # segleft should be equal to size(expected seglist)-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+ hit = None
+ for s in self.sr_policy.seg_lists:
+ # expected segment-list (make copy of SR Policy segment list)
+ seglist = s[:]
+ # expected seglist has initial dest addr as last segment
+ seglist.append(tx_ip.dst)
+ # reverse list to get order as in SRH
+ tx_seglist = seglist[::-1]
+
+ # received ip.dst should be equal to sidlist[lastentry]
+ if rx_ip.dst == tx_seglist[-1]:
+ hit = True
+ # rx'ed seglist should be equal to seglist
+ self.assertEqual(rx_srh.addresses, tx_seglist)
+ # segleft should be equal to size seglist-1
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
+ break
+
+ self.assertTrue(hit)
+
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
self.weight = weight
self.fib_table = fib_table
self.segments = segments
- self.n_segments = len(segments)
# source not passed to API
# self.source = inet_pton(AF_INET6, source)
self.source = source
def add_vpp_config(self):
self._test.vapi.sr_policy_add(
- bsid=self.bsid,
+ bsid_addr=self.bsid,
weight=self.weight,
is_encap=self.is_encap,
is_spray=self.sr_type,
fib_table=self.fib_table,
- sids={"num_sids": self.n_segments, "sids": self.segments},
+ sids={
+ "num_sids": len(self.segments),
+ "sids": self._get_fixed_segments(self.segments),
+ "weight": 1,
+ },
)
self._configured = True
self.is_encap,
)
+ def _get_fixed_segments(self, segments):
+ segs = copy.copy(segments)
+ # note: array expect size is 16
+ for _ in range(16 - len(segments)):
+ segs.append("")
+ return segs
+
class VppSRv6PolicyV2(VppObject):
"""
sr_type,
weight,
fib_table,
- segments,
encap_src,
source,
):
self.sr_type = sr_type
self.weight = weight
self.fib_table = fib_table
- self.segments = segments
self.encap_src = encap_src
- self.n_segments = len(segments)
+
+ # list of segment list
+ self.seg_lists = []
# source not passed to API
# self.source = inet_pton(AF_INET6, source)
self.source = source
self._configured = False
- def add_vpp_config(self):
+ def add_vpp_config(self, segments=[]):
self._test.vapi.sr_policy_add_v2(
bsid_addr=self.bsid,
weight=self.weight,
fib_table=self.fib_table,
encap_src=self.encap_src,
sids={
- "num_sids": self.n_segments,
- "sids": self._get_fixed_segments(),
+ "num_sids": len(segments),
+ "sids": self._get_fixed_segments(segments),
"weight": 1,
},
)
+ self.seg_lists.append(segments)
self._configured = True
+ def mod_vpp_config(self, segments=[]):
+ # only ADD operation is supported
+ self._test.vapi.sr_policy_mod_v2(
+ bsid_addr=self.bsid,
+ weight=self.weight,
+ fib_table=self.fib_table,
+ operation=1,
+ sl_index=0xFFFFFFFF,
+ encap_src=self.encap_src,
+ sids={
+ "num_sids": len(segments),
+ "sids": self._get_fixed_segments(segments),
+ "weight": 1,
+ },
+ )
+ self.seg_lists.append(segments)
+
def remove_vpp_config(self):
self._test.vapi.sr_policy_del(self.bsid)
self._configured = False
def query_vpp_config(self):
- # no API to query SR Policies
- # use _configured flag for now
- return self._configured
+ match_counter = 0
+ policies = self._test.vapi.sr_policies_v2_dump()
+ for p in policies:
+ segments_matched = True
+ for i in range(p.num_sid_lists):
+ # transform sid_list from IPv6Address list to string list
+ p_sids = [
+ str(p) for p in p.sid_lists[i].sids[: p.sid_lists[i].num_sids]
+ ]
+ if p_sids != self.seg_lists[i]:
+ segments_matched = False
+
+ if (
+ str(p.bsid) == str(self.bsid)
+ and str(p.encap_src) == str(self.encap_src)
+ and p.type == self.sr_type
+ and p.is_encap == self.is_encap
+ and p.fib_table == self.fib_table
+ and p.num_sid_lists == len(self.seg_lists)
+ and segments_matched == True
+ ):
+ match_counter += 1
+ return match_counter == 1
def object_id(self):
return "%d;%s-><%s>;%d" % (
self.is_encap,
)
- def _get_fixed_segments(self):
- segs = copy.copy(self.segments)
+ def _get_fixed_segments(self, segments):
+ segs = copy.copy(segments)
# note: array expect size is 16
- for _ in range(16 - self.n_segments):
+ for _ in range(16 - len(segments)):
segs.append("")
return segs
self._configured = False
def query_vpp_config(self):
- # no API to query steering entries
- # use _configured flag for now
- return self._configured
+ match_counter = 0
+ steers = self._test.vapi.sr_steering_pol_dump()
+ for s in steers:
+ if s.traffic_type == self.traffic_type and str(s.bsid) == str(self.bsid):
+ if s.traffic_type == SRv6PolicySteeringTypes.SR_STEER_L2:
+ if s.sw_if_index == self.sw_if_index:
+ match_counter += 1
+ elif (
+ s.fib_table == self.table_id
+ and str(s.prefix.network_address) == str(self.prefix)
+ and s.prefix.prefixlen == self.mask_width
+ ):
+ match_counter += 1
+ return match_counter == 1
def object_id(self):
return "%d;%d;%s/%d->%s" % (