+ if self.is_local or self.is_unreach or self.is_prohibit:
+ self._test.vapi.ip_add_del_route(
+ self.dest_addr,
+ self.dest_addr_len,
+ inet_pton(AF_INET6, "::"),
+ 0xffffffff,
+ is_local=self.is_local,
+ is_unreach=self.is_unreach,
+ is_prohibit=self.is_prohibit,
+ is_add=0,
+ table_id=self.table_id,
+ is_ipv6=self.is_ip6)
+ else:
+ for path in self.paths:
+ self._test.vapi.ip_add_del_route(
+ self.dest_addr,
+ self.dest_addr_len,
+ path.nh_addr,
+ path.nh_itf,
+ table_id=self.table_id,
+ next_hop_table_id=path.nh_table_id,
+ next_hop_via_label=path.nh_via_label,
+ next_hop_id=path.next_hop_id,
+ is_add=0,
+ is_udp_encap=path.is_udp_encap,
+ is_ipv6=self.is_ip6,
+ is_dvr=path.is_dvr)
+
+ def query_vpp_config(self):
+ return find_route(self._test,
+ self.dest_addr_p,
+ self.dest_addr_len,
+ self.table_id,
+ inet=AF_INET6 if self.is_ip6 == 1 else AF_INET)
+
+ def __str__(self):
+ return self.object_id()
+
+ def object_id(self):
+ return ("%d:%s/%d"
+ % (self.table_id,
+ self.dest_addr_p,
+ self.dest_addr_len))
+
+
+class VppIpMRoute(VppObject):
+ """
+ IP Multicast Route
+ """
+
+ def __init__(self, test, src_addr, grp_addr,
+ grp_addr_len, e_flags, paths, table_id=0,
+ rpf_id=0, is_ip6=0):
+ self._test = test
+ self.paths = paths
+ self.grp_addr_len = grp_addr_len
+ self.table_id = table_id
+ self.e_flags = e_flags
+ self.is_ip6 = is_ip6
+ self.rpf_id = rpf_id
+
+ if is_ip6:
+ self.grp_addr = inet_pton(AF_INET6, grp_addr)
+ self.src_addr = inet_pton(AF_INET6, src_addr)
+ else:
+ self.grp_addr = inet_pton(AF_INET, grp_addr)
+ self.src_addr = inet_pton(AF_INET, src_addr)
+
+ def add_vpp_config(self):
+ for path in self.paths:
+ self._test.vapi.ip_mroute_add_del(self.src_addr,
+ self.grp_addr,
+ self.grp_addr_len,
+ self.e_flags,
+ path.proto,
+ path.nh_itf,
+ path.nh_addr,
+ path.nh_i_flags,
+ bier_imp=path.bier_imp,
+ rpf_id=self.rpf_id,
+ table_id=self.table_id,
+ is_ipv6=self.is_ip6)
+ self._test.registry.register(self, self._test.logger)
+
+ def remove_vpp_config(self):
+ for path in self.paths:
+ self._test.vapi.ip_mroute_add_del(self.src_addr,
+ self.grp_addr,
+ self.grp_addr_len,
+ self.e_flags,
+ path.proto,
+ path.nh_itf,
+ path.nh_addr,
+ path.nh_i_flags,
+ table_id=self.table_id,
+ bier_imp=path.bier_imp,
+ is_add=0,
+ is_ipv6=self.is_ip6)
+
+ def update_entry_flags(self, flags):
+ self.e_flags = flags
+ self._test.vapi.ip_mroute_add_del(self.src_addr,
+ self.grp_addr,
+ self.grp_addr_len,
+ self.e_flags,
+ 0,
+ 0xffffffff,
+ "",
+ 0,
+ table_id=self.table_id,
+ is_ipv6=self.is_ip6)
+
+ def update_rpf_id(self, rpf_id):
+ self.rpf_id = rpf_id
+ self._test.vapi.ip_mroute_add_del(self.src_addr,
+ self.grp_addr,
+ self.grp_addr_len,
+ self.e_flags,
+ 0,
+ 0xffffffff,
+ "",
+ 0,
+ rpf_id=self.rpf_id,
+ table_id=self.table_id,
+ is_ipv6=self.is_ip6)
+
+ def update_path_flags(self, itf, flags):