3 # The MIT License (MIT)
7 # Permission is hereby granted, free of charge, to any person obtaining a copy
8 # of this software and associated documentation files (the "Software"), to deal
9 # in the Software without restriction, including without limitation the rights
10 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 # copies of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
14 # The above copyright notice and this permission notice shall be included in all
15 # copies or substantial portions of the Software.
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 # File included from https://github.com/ejordangottlieb/pyswmap
30 # There is still a great deal of work required on this module. Please
35 from ipaddress import (
45 class MapCalc(object):
47 def __init__(self,**bmr):
49 self.portranges = False
51 # Validate and set BMR and BMR derived values
52 self._check_bmr_values(bmr)
54 def _check_bmr_values(self,bmr):
55 # Assume these values have not been supplied. Validate later.
59 # Validate that a proper PSID Offset has been set
60 if 'psidoffset' not in bmr:
61 # Set Default PSID Offset of 6 if it is not set
64 self.psidoffset = self._psid_offset(bmr['psidoffset'])
66 # Validate that a proper IPv4 rule prefix is defined
67 if 'rulev4' not in bmr:
68 print("The rule IPv4 prefix has not been set")
71 self.rulev4 = self._ipv4_rule(bmr['rulev4'])
73 # Validate that a proper IPv6 rule prefix is defined
74 if 'rulev6' not in bmr:
75 print("The rule IPv6 prefix has not been set")
78 self.rulev6 = self._ipv6_rule(bmr['rulev6'])
80 # Check if EA length was passed
81 if 'ealen' not in bmr:
84 self.ealen = bmr['ealen']
85 self.ratio = self._calc_ratio(bmr['ealen'])
87 # Check if sharing ratio was passed or calculated by _calc_ratio
88 if 'ratio' not in bmr:
89 # Skip if we have already calculated ratio
94 # Check to see if supplied EA length contradicts supplied ratio
95 if ( bmr['ratio'] != self.ratio ):
96 eavalue = "EA value {}".format(self.ealen)
97 sharingratio = "sharing ratio {}".format(bmr['ratio'])
98 print("Supplied {} and {} are contradictory".format(
104 self.ratio = bmr['ratio']
105 self.ealen = self._calc_ea(bmr['ratio'])
107 # EA length or sharing ratio must be set
108 if not ( self.ealen or self.ratio):
109 print("The BMR must include an EA length or sharing ratio")
112 # Since we have not hit an exception we can calculate the port bits
113 self.portbits = self._calc_port_bits()
115 def _ipv4_rule(self,rulev4):
117 self.rulev4mask = ip_network(
122 print("Invalid IPv4 prefix {}".format(rulev4))
125 self.rulev4object = ip_network(rulev4)
129 def _ipv6_rule(self,rulev6):
131 self.rulev6mask = IPv6Network(
136 print("Invalid IPv6 prefix {}".format(rulev6))
141 def _psid_offset(self,psidoffset):
143 if psidoffset in range(0,PSIDOFFSET_MAX+1):
146 print("Invalid PSID Offset value: {}".format(psidoffset))
149 def _psid_range(self,x):
151 for i in range(0,x+1):
155 def _calc_port_bits(self):
156 portbits = 16 - self.psidoffset - self.psidbits
159 def _calc_ea(self,ratio):
160 if ratio not in ( self._psid_range(16) ):
161 print("Invalid ratio {}".format(ratio))
162 print("Ratio between 2 to the power of 0 thru 16")
168 self.psidbits = int(log(ratio,2))
169 ealen = self.psidbits + ( 32 - self.rulev4mask )
172 def _calc_ratio(self,ealen):
173 maskbits = 32 - self.rulev4mask
174 if ( ealen < maskbits ):
175 print("EA of {} incompatible with rule IPv4 prefix {}".format(
180 print("EA length must be at least {} bits".format(
186 self.psidbits = ealen - ( 32 - self.rulev4mask )
187 if ( self.psidbits > 16):
188 print("EA length of {} is too large".format(
192 print("EA should not exceed {} for rule IPv4 prefix {}".format(
198 ratio = 2**self.psidbits
201 def gen_psid(self,portnum):
202 if ( portnum < self.start_port() ):
203 print("port value is less than allowed by PSID Offset")
205 psid = (portnum & ((2**self.psidbits - 1) << self.portbits))
206 psid = psid >> self.portbits
209 def port_ranges(self):
210 return 2**self.psidoffset - 1
212 def start_port(self):
213 if self.psidoffset == 0: return 0
214 return 2**(16 - self.psidoffset)
216 def port_list(self,psid):
217 startrange = psid * (2**self.portbits) + self.start_port()
218 increment = (2**self.psidbits) * (2**self.portbits)
220 for port in range(startrange,startrange + 2**self.portbits):
221 if port >= 65536: continue
222 portlist.append(port)
223 for x in range(1,self.port_ranges()):
224 startrange += increment
225 for port in range(startrange,startrange + 2**self.portbits):
226 portlist.append(port)
229 def ipv4_index(self,ipv4addr):
230 if ip_address(ipv4addr) in ip_network(self.rulev4):
231 x = ip_address(ipv4addr)
232 y = ip_network(self.rulev4,strict=False).network_address
234 return ( int(x) - int(y) )
236 print("Error: IPv4 address {} not in Rule IPv4 subnet {}".format(
238 ip_network(self.rulev4,strict=False).network_address))
241 def _calc_ipv6bit_pos(self):
242 addroffset = 128 - (self.rulev6mask + ( self.ealen - self.psidbits))
243 psidshift = 128 - ( self.rulev6mask + self.ealen )
244 return [addroffset,psidshift]
246 def _append_map_eabits(self,ipv4index,addroffset,psidshift,psid):
247 rulev6base = IPv6Network(self.rulev6,strict=False).network_address
248 map_prefix = int(rulev6base) | ( ipv4index << addroffset )
249 map_fullprefix = map_prefix | ( psid << psidshift)
250 return map_fullprefix
253 def get_mapce_addr(self,ipv4addr,psid):
254 ipv4index = self.ipv4_index(ipv4addr)
255 (addroffset,psidshift) = self._calc_ipv6bit_pos()
256 map_fullprefix = self._append_map_eabits(ipv4index,
260 mapv4iid = map_fullprefix | ( int(self.ipv4addr) << 16 )
261 map_full_address = mapv4iid | psid
262 mapce_address = "{}".format(IPv6Address(map_full_address))
265 def get_mapce_prefix(self,ipv4addr,psid):
266 ipv4index = self.ipv4_index(ipv4addr)
267 (addroffset,psidshift) = self._calc_ipv6bit_pos()
268 map_fullprefix = self._append_map_eabits(ipv4index,
272 mapce_prefix = "{}/{}".format(
273 IPv6Address(map_fullprefix),
274 self.rulev6mask + self.ealen
278 def get_map_ipv4(self,mapce_address):
279 ipv4 = (int(IPv6Address(mapce_address)) & ( 0xffffffff << 16 )) >> 16
280 return ip_address(ipv4)
284 class DmrCalc(object):
286 def __init__(self,dmr):
288 # Validate and set BMR and BMR derived values
289 self.dmrprefix = self._check_dmr_prefix(dmr)
291 def embed_6052addr(self,ipv4addr):
294 ipv4addrint = int(ip_address(ipv4addr))
296 print("Invalid IPv4 address {}".format(ipv4addr))
299 if ( self.dmrprefix.prefixlen == 64 ):
300 ipv6int = ipv4addrint << 24
301 ipv6int += int(self.dmrprefix.network_address)
302 return IPv6Address(ipv6int)
304 if ( self.dmrprefix.prefixlen == 96 ):
305 ipv6int = ipv4addrint
306 ipv6int += int(self.dmrprefix.network_address)
307 return IPv6Address(ipv6int)
309 def _check_dmr_prefix(self,dmrprefix):
311 self.dmrmask = IPv6Network(
316 print("Invalid IPv6 prefix {}".format(prefix))
319 if self.dmrmask not in (32,40,48,56,64,96):
320 print("Invalid prefix mask /{}".format(self.dmrmask))
323 return IPv6Network(dmrprefix)
325 if __name__ == "__main__":
326 m = DmrCalc('fd80::/48')