4 object abstractions for representing SRv6 localSIDs in VPP
7 from vpp_object import VppObject
8 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
11 class SRv6LocalSIDBehaviors():
12 # from src/vnet/srv6/sr.h
16 SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
22 SR_BEHAVIOR_LAST = 10 # Must always be the last one
25 class SRv6PolicyType():
26 # from src/vnet/srv6/sr.h
27 SR_POLICY_TYPE_DEFAULT = 0
28 SR_POLICY_TYPE_SPRAY = 1
31 class SRv6PolicySteeringTypes():
32 # from src/vnet/srv6/sr.h
38 class VppSRv6LocalSID(VppObject):
43 def __init__(self, test, localsid, behavior, nh_addr4, nh_addr6,
44 end_psp, sw_if_index, vlan_index, fib_table):
46 self.localsid = localsid
47 # keep binary format in _localsid
48 self.localsid["addr"] = inet_pton(AF_INET6, self.localsid["addr"])
49 self.behavior = behavior
50 self.nh_addr4 = inet_pton(AF_INET, nh_addr4)
51 self.nh_addr6 = inet_pton(AF_INET6, nh_addr6)
52 self.end_psp = end_psp
53 self.sw_if_index = sw_if_index
54 self.vlan_index = vlan_index
55 self.fib_table = fib_table
56 self._configured = False
58 def add_vpp_config(self):
59 self._test.vapi.sr_localsid_add_del(
66 sw_if_index=self.sw_if_index,
67 vlan_index=self.vlan_index,
68 fib_table=self.fib_table)
69 self._configured = True
71 def remove_vpp_config(self):
72 self._test.vapi.sr_localsid_add_del(
79 sw_if_index=self.sw_if_index,
80 vlan_index=self.vlan_index,
81 fib_table=self.fib_table)
82 self._configured = False
84 def query_vpp_config(self):
85 # sr_localsids_dump API is disabled
86 # use _configured flag for now
87 return self._configured
96 class VppSRv6Policy(VppObject):
101 def __init__(self, test, bsid,
102 is_encap, sr_type, weight, fib_table,
106 # keep binary format in _bsid
107 self._bsid = inet_pton(AF_INET6, bsid)
108 self.is_encap = is_encap
109 self.sr_type = sr_type
111 self.fib_table = fib_table
112 self.segments = segments
113 # keep binary format in _segments
116 self._segments.extend(inet_pton(AF_INET6, seg))
117 self.n_segments = len(segments)
118 # source not passed to API
119 # self.source = inet_pton(AF_INET6, source)
121 self._configured = False
123 def add_vpp_config(self):
124 self._test.vapi.sr_policy_add(
132 self._configured = True
134 def remove_vpp_config(self):
135 self._test.vapi.sr_policy_del(
137 self._configured = False
139 def query_vpp_config(self):
140 # no API to query SR Policies
141 # use _configured flag for now
142 return self._configured
145 return ("%d;%s-><%s>;%d"
148 ','.join(self.segments),
152 class VppSRv6Steering(VppObject):
157 def __init__(self, test,
167 # keep binary format in _bsid
168 self._bsid = inet_pton(AF_INET6, bsid)
170 # keep binary format in _prefix
173 self._prefix = inet_pton(AF_INET6, prefix)
176 # API expects 16 octets (128 bits)
177 # last 4 octets are used for IPv4
178 # --> prepend 12 octets
179 self._prefix = ('\x00' * 12) + inet_pton(AF_INET, prefix)
180 self.mask_width = mask_width
181 self.traffic_type = traffic_type
182 self.sr_policy_index = sr_policy_index
183 self.sw_if_index = sw_if_index
184 self.table_id = table_id
185 self._configured = False
187 def add_vpp_config(self):
188 self._test.vapi.sr_steering_add_del(
191 self.sr_policy_index,
197 self._configured = True
199 def remove_vpp_config(self):
200 self._test.vapi.sr_steering_add_del(
203 self.sr_policy_index,
209 self._configured = False
211 def query_vpp_config(self):
212 # no API to query steering entries
213 # use _configured flag for now
214 return self._configured
217 return ("%d;%d;%s/%d->%s"