4 object abstractions for representing SRv6 localSIDs in VPP
7 from vpp_object import *
8 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
11 class SRv6LocalSIDBehaviors():
12 # from src/vnet/srv6/sr.h
16 SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
22 SR_BEHAVIOR_LAST = 10 # Must always be the last one
25 class SRv6PolicyType():
26 # from src/vnet/srv6/sr.h
27 SR_POLICY_TYPE_DEFAULT = 0
28 SR_POLICY_TYPE_SPRAY = 1
31 class SRv6PolicySteeringTypes():
32 # from src/vnet/srv6/sr.h
38 class VppSRv6LocalSID(VppObject):
43 def __init__(self, test, localsid, behavior, nh_addr4, nh_addr6,
44 end_psp, sw_if_index, vlan_index, fib_table):
46 self.localsid = localsid
47 # keep binary format in _localsid
48 self.localsid["addr"] = inet_pton(AF_INET6, self.localsid["addr"])
49 self.behavior = behavior
50 self.nh_addr4 = inet_pton(AF_INET, nh_addr4)
51 self.nh_addr6 = inet_pton(AF_INET6, nh_addr6)
52 self.end_psp = end_psp
53 self.sw_if_index = sw_if_index
54 self.vlan_index = vlan_index
55 self.fib_table = fib_table
56 self._configured = False
58 def add_vpp_config(self):
59 self._test.vapi.sr_localsid_add_del(
66 sw_if_index=self.sw_if_index,
67 vlan_index=self.vlan_index,
68 fib_table=self.fib_table)
69 self._configured = True
71 def remove_vpp_config(self):
72 self._test.vapi.sr_localsid_add_del(
79 sw_if_index=self.sw_if_index,
80 vlan_index=self.vlan_index,
81 fib_table=self.fib_table)
82 self._configured = False
84 def query_vpp_config(self):
85 # sr_localsids_dump API is disabled
86 # use _configured flag for now
87 return self._configured
90 return self.object_id()
99 class VppSRv6Policy(VppObject):
104 def __init__(self, test, bsid,
105 is_encap, sr_type, weight, fib_table,
109 # keep binary format in _bsid
110 self._bsid = inet_pton(AF_INET6, bsid)
111 self.is_encap = is_encap
112 self.sr_type = sr_type
114 self.fib_table = fib_table
115 self.segments = segments
116 # keep binary format in _segments
119 self._segments.extend(inet_pton(AF_INET6, seg))
120 self.n_segments = len(segments)
121 # source not passed to API
122 # self.source = inet_pton(AF_INET6, source)
124 self._configured = False
126 def add_vpp_config(self):
127 self._test.vapi.sr_policy_add(
135 self._configured = True
137 def remove_vpp_config(self):
138 self._test.vapi.sr_policy_del(
140 self._configured = False
142 def query_vpp_config(self):
143 # no API to query SR Policies
144 # use _configured flag for now
145 return self._configured
148 return self.object_id()
151 return ("%d;%s-><%s>;%d"
154 ','.join(self.segments),
158 class VppSRv6Steering(VppObject):
163 def __init__(self, test,
173 # keep binary format in _bsid
174 self._bsid = inet_pton(AF_INET6, bsid)
176 # keep binary format in _prefix
179 self._prefix = inet_pton(AF_INET6, prefix)
182 # API expects 16 octets (128 bits)
183 # last 4 octets are used for IPv4
184 # --> prepend 12 octets
185 self._prefix = ('\x00' * 12) + inet_pton(AF_INET, prefix)
186 self.mask_width = mask_width
187 self.traffic_type = traffic_type
188 self.sr_policy_index = sr_policy_index
189 self.sw_if_index = sw_if_index
190 self.table_id = table_id
191 self._configured = False
193 def add_vpp_config(self):
194 self._test.vapi.sr_steering_add_del(
197 self.sr_policy_index,
203 self._configured = True
205 def remove_vpp_config(self):
206 self._test.vapi.sr_steering_add_del(
209 self.sr_policy_index,
215 self._configured = False
217 def query_vpp_config(self):
218 # no API to query steering entries
219 # use _configured flag for now
220 return self._configured
223 return self.object_id()
226 return ("%d;%d;%s/%d->%s"