4 object abstractions for representing SRv6 localSIDs in VPP
7 from vpp_object import VppObject
8 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
12 class SRv6LocalSIDBehaviors:
13 # from src/vnet/srv6/sr.h
17 SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
23 SR_BEHAVIOR_END_UN_PERF = 10
24 SR_BEHAVIOR_END_UN = 11
25 SR_BEHAVIOR_LAST = 12 # Must always be the last one
29 # from src/vnet/srv6/sr.h
30 SR_POLICY_TYPE_DEFAULT = 0
31 SR_POLICY_TYPE_SPRAY = 1
32 SR_POLICY_TYPE_TEF = 2
35 class SRv6PolicySteeringTypes:
36 # from src/vnet/srv6/sr.h
42 class VppSRv6LocalSID(VppObject):
59 self.localsid = localsid
60 self.behavior = behavior
61 self.nh_addr = nh_addr
62 self.end_psp = end_psp
63 self.sw_if_index = sw_if_index
64 self.vlan_index = vlan_index
65 self.fib_table = fib_table
66 self._configured = False
68 def add_vpp_config(self):
69 self._test.vapi.sr_localsid_add_del(
70 localsid=self.localsid,
71 behavior=self.behavior,
75 sw_if_index=self.sw_if_index,
76 vlan_index=self.vlan_index,
77 fib_table=self.fib_table,
79 self._configured = True
81 def remove_vpp_config(self):
82 self._test.vapi.sr_localsid_add_del(
83 localsid=self.localsid,
84 behavior=self.behavior,
88 sw_if_index=self.sw_if_index,
89 vlan_index=self.vlan_index,
90 fib_table=self.fib_table,
92 self._configured = False
94 def query_vpp_config(self):
95 # sr_localsids_dump API is disabled
96 # use _configured flag for now
97 return self._configured
100 return "%d;%s,%d" % (self.fib_table, self.localsid, self.behavior)
103 class VppSRv6Policy(VppObject):
109 self, test, bsid, is_encap, sr_type, weight, fib_table, segments, source
113 self.is_encap = is_encap
114 self.sr_type = sr_type
116 self.fib_table = fib_table
117 self.segments = segments
118 self.n_segments = len(segments)
119 # source not passed to API
120 # self.source = inet_pton(AF_INET6, source)
122 self._configured = False
124 def add_vpp_config(self):
125 self._test.vapi.sr_policy_add(
128 is_encap=self.is_encap,
129 is_spray=self.sr_type,
130 fib_table=self.fib_table,
131 sids={"num_sids": self.n_segments, "sids": self.segments},
133 self._configured = True
135 def remove_vpp_config(self):
136 self._test.vapi.sr_policy_del(self.bsid)
137 self._configured = False
139 def query_vpp_config(self):
140 # no API to query SR Policies
141 # use _configured flag for now
142 return self._configured
145 return "%d;%s-><%s>;%d" % (
148 ",".join(self.segments),
153 class VppSRv6PolicyV2(VppObject):
172 self.is_encap = is_encap
173 self.sr_type = sr_type
175 self.fib_table = fib_table
176 self.segments = segments
177 self.encap_src = encap_src
178 self.n_segments = len(segments)
180 # source not passed to API
181 # self.source = inet_pton(AF_INET6, source)
183 self._configured = False
185 def add_vpp_config(self):
186 self._test.vapi.sr_policy_add_v2(
189 is_encap=self.is_encap,
191 fib_table=self.fib_table,
192 encap_src=self.encap_src,
194 "num_sids": self.n_segments,
195 "sids": self._get_fixed_segments(),
199 self._configured = True
201 def remove_vpp_config(self):
202 self._test.vapi.sr_policy_del(self.bsid)
203 self._configured = False
205 def query_vpp_config(self):
206 # no API to query SR Policies
207 # use _configured flag for now
208 return self._configured
211 return "%d;%s-><%s>;%d" % (
214 ",".join(self.segments),
218 def _get_fixed_segments(self):
219 segs = copy.copy(self.segments)
220 # note: array expect size is 16
221 for _ in range(16 - self.n_segments):
226 class VppSRv6Steering(VppObject):
245 self.mask_width = mask_width
246 self.traffic_type = traffic_type
247 self.sr_policy_index = sr_policy_index
248 self.sw_if_index = sw_if_index
249 self.table_id = table_id
250 self._configured = False
252 def add_vpp_config(self):
253 self._test.vapi.sr_steering_add_del(
256 sr_policy_index=self.sr_policy_index,
257 table_id=self.table_id,
258 prefix={"address": self.prefix, "len": self.mask_width},
259 sw_if_index=self.sw_if_index,
260 traffic_type=self.traffic_type,
262 self._configured = True
264 def remove_vpp_config(self):
265 self._test.vapi.sr_steering_add_del(
268 sr_policy_index=self.sr_policy_index,
269 table_id=self.table_id,
270 prefix={"address": self.prefix, "len": self.mask_width},
271 sw_if_index=self.sw_if_index,
272 traffic_type=self.traffic_type,
274 self._configured = False
276 def query_vpp_config(self):
277 # no API to query steering entries
278 # use _configured flag for now
279 return self._configured
282 return "%d;%d;%s/%d->%s" % (