4 object abstractions for representing SRv6 localSIDs in VPP
7 from vpp_object import *
8 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
11 class SRv6LocalSIDBehaviors():
12 # from src/vnet/srv6/sr.h
16 SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
22 SR_BEHAVIOR_LAST = 10 # Must always be the last one
25 class SRv6PolicyType():
26 # from src/vnet/srv6/sr.h
27 SR_POLICY_TYPE_DEFAULT = 0
28 SR_POLICY_TYPE_SPRAY = 1
31 class SRv6PolicySteeringTypes():
32 # from src/vnet/srv6/sr.h
38 class VppSRv6LocalSID(VppObject):
43 def __init__(self, test, localsid_addr, behavior, nh_addr, end_psp,
44 sw_if_index, vlan_index, fib_table):
46 self.localsid_addr = localsid_addr
47 # keep binary format in _localsid_addr
48 self._localsid_addr = inet_pton(AF_INET6, self.localsid_addr)
49 self.behavior = behavior
50 self.nh_addr = nh_addr
51 # keep binary format in _nh_addr
54 self._nh_addr = inet_pton(AF_INET6, nh_addr)
57 # API expects 16 octets (128 bits)
58 # last 4 octets are used for IPv4
59 # --> prepend 12 octets
60 self._nh_addr = ('\x00' * 12) + inet_pton(AF_INET, nh_addr)
61 self.end_psp = end_psp
62 self.sw_if_index = sw_if_index
63 self.vlan_index = vlan_index
64 self.fib_table = fib_table
65 self._configured = False
67 def add_vpp_config(self):
68 self._test.vapi.sr_localsid_add_del(
74 sw_if_index=self.sw_if_index,
75 vlan_index=self.vlan_index,
76 fib_table=self.fib_table)
77 self._configured = True
79 def remove_vpp_config(self):
80 self._test.vapi.sr_localsid_add_del(
86 sw_if_index=self.sw_if_index,
87 vlan_index=self.vlan_index,
88 fib_table=self.fib_table)
89 self._configured = False
91 def query_vpp_config(self):
92 # sr_localsids_dump API is disabled
93 # use _configured flag for now
94 return self._configured
97 return self.object_id()
106 class VppSRv6Policy(VppObject):
111 def __init__(self, test, bsid,
112 is_encap, sr_type, weight, fib_table,
116 # keep binary format in _bsid
117 self._bsid = inet_pton(AF_INET6, bsid)
118 self.is_encap = is_encap
119 self.sr_type = sr_type
121 self.fib_table = fib_table
122 self.segments = segments
123 # keep binary format in _segments
126 self._segments.extend(inet_pton(AF_INET6, seg))
127 self.n_segments = len(segments)
128 # source not passed to API
129 # self.source = inet_pton(AF_INET6, source)
131 self._configured = False
133 def add_vpp_config(self):
134 self._test.vapi.sr_policy_add(
142 self._configured = True
144 def remove_vpp_config(self):
145 self._test.vapi.sr_policy_del(
147 self._configured = False
149 def query_vpp_config(self):
150 # no API to query SR Policies
151 # use _configured flag for now
152 return self._configured
155 return self.object_id()
158 return ("%d;%s-><%s>;%d"
161 ','.join(self.segments),
165 class VppSRv6Steering(VppObject):
170 def __init__(self, test,
180 # keep binary format in _bsid
181 self._bsid = inet_pton(AF_INET6, bsid)
183 # keep binary format in _prefix
186 self._prefix = inet_pton(AF_INET6, prefix)
189 # API expects 16 octets (128 bits)
190 # last 4 octets are used for IPv4
191 # --> prepend 12 octets
192 self._prefix = ('\x00' * 12) + inet_pton(AF_INET, prefix)
193 self.mask_width = mask_width
194 self.traffic_type = traffic_type
195 self.sr_policy_index = sr_policy_index
196 self.sw_if_index = sw_if_index
197 self.table_id = table_id
198 self._configured = False
200 def add_vpp_config(self):
201 self._test.vapi.sr_steering_add_del(
204 self.sr_policy_index,
210 self._configured = True
212 def remove_vpp_config(self):
213 self._test.vapi.sr_steering_add_del(
216 self.sr_policy_index,
222 self._configured = False
224 def query_vpp_config(self):
225 # no API to query steering entries
226 # use _configured flag for now
227 return self._configured
230 return self.object_id()
233 return ("%d;%d;%s/%d->%s"