4 object abstractions for representing SRv6 localSIDs in VPP
7 from vpp_object import VppObject
8 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
11 class SRv6LocalSIDBehaviors():
12 # from src/vnet/srv6/sr.h
16 SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
22 SR_BEHAVIOR_LAST = 10 # Must always be the last one
25 class SRv6PolicyType():
26 # from src/vnet/srv6/sr.h
27 SR_POLICY_TYPE_DEFAULT = 0
28 SR_POLICY_TYPE_SPRAY = 1
31 class SRv6PolicySteeringTypes():
32 # from src/vnet/srv6/sr.h
38 class VppSRv6LocalSID(VppObject):
43 def __init__(self, test, localsid, behavior, nh_addr,
44 end_psp, sw_if_index, vlan_index, fib_table):
46 self.localsid = localsid
47 self.behavior = behavior
48 self.nh_addr = nh_addr
49 self.end_psp = end_psp
50 self.sw_if_index = sw_if_index
51 self.vlan_index = vlan_index
52 self.fib_table = fib_table
53 self._configured = False
55 def add_vpp_config(self):
56 self._test.vapi.sr_localsid_add_del(
57 localsid=self.localsid,
58 behavior=self.behavior,
62 sw_if_index=self.sw_if_index,
63 vlan_index=self.vlan_index,
64 fib_table=self.fib_table)
65 self._configured = True
67 def remove_vpp_config(self):
68 self._test.vapi.sr_localsid_add_del(
69 localsid=self.localsid,
70 behavior=self.behavior,
74 sw_if_index=self.sw_if_index,
75 vlan_index=self.vlan_index,
76 fib_table=self.fib_table)
77 self._configured = False
79 def query_vpp_config(self):
80 # sr_localsids_dump API is disabled
81 # use _configured flag for now
82 return self._configured
91 class VppSRv6Policy(VppObject):
96 def __init__(self, test, bsid,
97 is_encap, sr_type, weight, fib_table,
101 self.is_encap = is_encap
102 self.sr_type = sr_type
104 self.fib_table = fib_table
105 self.segments = segments
106 self.n_segments = len(segments)
107 # source not passed to API
108 # self.source = inet_pton(AF_INET6, source)
110 self._configured = False
112 def add_vpp_config(self):
113 self._test.vapi.sr_policy_add(
116 is_encap=self.is_encap,
117 is_spray=self.sr_type,
118 fib_table=self.fib_table,
119 sids={'num_sids': self.n_segments, 'sids': self.segments})
120 self._configured = True
122 def remove_vpp_config(self):
123 self._test.vapi.sr_policy_del(
125 self._configured = False
127 def query_vpp_config(self):
128 # no API to query SR Policies
129 # use _configured flag for now
130 return self._configured
133 return ("%d;%s-><%s>;%d"
136 ','.join(self.segments),
140 class VppSRv6Steering(VppObject):
145 def __init__(self, test,
156 self.mask_width = mask_width
157 self.traffic_type = traffic_type
158 self.sr_policy_index = sr_policy_index
159 self.sw_if_index = sw_if_index
160 self.table_id = table_id
161 self._configured = False
163 def add_vpp_config(self):
164 self._test.vapi.sr_steering_add_del(
167 sr_policy_index=self.sr_policy_index,
168 table_id=self.table_id,
169 prefix={'address': self.prefix, 'len': self.mask_width},
170 sw_if_index=self.sw_if_index,
171 traffic_type=self.traffic_type)
172 self._configured = True
174 def remove_vpp_config(self):
175 self._test.vapi.sr_steering_add_del(
178 sr_policy_index=self.sr_policy_index,
179 table_id=self.table_id,
180 prefix={'address': self.prefix, 'len': self.mask_width},
181 sw_if_index=self.sw_if_index,
182 traffic_type=self.traffic_type)
183 self._configured = False
185 def query_vpp_config(self):
186 # no API to query steering entries
187 # use _configured flag for now
188 return self._configured
191 return ("%d;%d;%s/%d->%s"