self._base = base
# Resources sorted via dependency (instances)
- self._resources = dict()
+ self._resources = dict()
self._deps = None
# Store resource requirements used for automatic instanciation
# instance -> attribute -> requirements
- self._instance_requirements = dict()
+ self._instance_requirements = dict()
self._dirty = set()
self._auto_commit = False
# class -> Requirements
- self._class_requirements = dict()
+ self._class_requirements = dict()
self._map_uuid_name = dict()
self._map_name_uuid = dict()
self._router.add_interface('vicn', manager = self)
ws_port = self.get('websocket_port')
- self._ws = self._router.add_interface('websocketserver',
+ self._ws = self._router.add_interface('websocketserver',
port = ws_port)
# Monitoring
def _broadcast(self, query):
if not self._ws:
return
- self._ws.execute(query)
+ self._ws.execute(query)
def _broadcast_packet(self, packet):
self._broadcast(packet.to_query())
if not query.object_name == 'interface':
return
- q = Query(ACTION_UPDATE, 'channel', filter = query.filter,
+ q = Query(ACTION_UPDATE, 'channel', filter = query.filter,
params = query.params)
q.reply = True
- self._ws.execute(q)
+ self._ws.execute(q)
return None
def _on_netmon_record(self, packet):
query = packet.to_query()
# Find channel related to query
- # NOTE: we update the channel twice, once for each interface...
+ # NOTE: we update the channel twice, once for each interface...
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
for interface in node.interfaces:
if interface.device_name == device_name:
if interface.channel:
- f = Filter.from_list([['id', '==',
+ f = Filter.from_list([['id', '==',
interface.channel._state.uuid._uuid]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
def _on_netmon_channel_record(self, packet):
query = packet.to_query()
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
device_name = device_names[0]
f = Filter.from_list([['id', '==', device_name]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
def _on_vpp_record(self, packet, pylink_id):
query = packet.to_query()
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
# We might want to check if the query has SUM(*)
f = Filter.from_list([['id', '==', pylink_id]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
asyncio.ensure_future(self._set_resource_state(resource,
ResourceState.CLEAN))
continue
-
+
self.commit_resource(resource)
def setup(self, commit=False):
if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource:
candidates = inheritors(cls)
if not candidates:
- log.error('Abstract resource with no candidates: %s',
+ log.error('Abstract resource with no candidates: %s',
cls.__name__)
return None
for delegate in candidates:
- if capabilities and (not '__capabilities__' in vars(delegate)
+ if capabilities and (not '__capabilities__' in vars(delegate)
or not capabilities.issubset(delegate.__capabilities__)):
continue
log.info("Abstract resource %s, delegated %s among %r" % \
return delegate
return None
else:
- if capabilities and (not '__capabilities__' in vars(delegate) or
+ if capabilities and (not '__capabilities__' in vars(delegate) or
not capabilities.issubset(delegate.__capabilities__)):
- log.error('Capabilities conflict for resource : %s',
+ log.error('Capabilities conflict for resource : %s',
cls.__name__)
raise VICNException
return cls
cls, attr_dict = resource_tuple
for instance in self.by_type(cls):
cur_attr_dict = instance._get_attribute_dict()
- common_keys = [k for k in cur_attr_dict.keys()
+ common_keys = [k for k in cur_attr_dict.keys()
if k in attr_dict.keys()]
if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys):
return instance
if not aggregates:
return None
assert len(aggregates) == 1
- return next(aggregates)
+ return next(aggregates)
def by_type(self, type):
return [r for r in self if isinstance(r, type)]
await self.wait_attr_init(resource, attribute)
return resource.get(attribute)
- async def attribute_set(self, resource, attribute_name, value,
+ async def attribute_set(self, resource, attribute_name, value,
blocking=True):
with await resource._state.write_lock:
# Add the current operation to the pending list
# NOTE: collections are unordered and can be updated concurrently
#self._attribute_set_pending_value(resource, attribute_name)
- resource._state.dirty[attribute_name].trigger(Operations.SET,
+ resource._state.dirty[attribute_name].trigger(Operations.SET,
value)
attr_state = resource._state.attr_state[attribute_name]
setattr(dep, dep_attr_name, dep_attr_value)
dep_attr_value_pfx = '{}:{}'.format(
- dep_attr_value.get_type(),
+ dep_attr_value.get_type(),
dep_attr_value.get_uuid())
- self.log(resource,
+ self.log(resource,
S_WAIT_DEP.format(dep_attr_value_pfx))
await wait_resource(dep_attr_value)
- self.log(resource,
+ self.log(resource,
S_WAIT_DEP_OK .format(dep_attr_value_pfx))
async def _resource_wait_predecessors(self, resource):
for before in befores:
if not before.managed:
continue
- before_pfx = '{}:{}'.format(before.get_type(),
+ before_pfx = '{}:{}'.format(before.get_type(),
before.get_uuid())
self.log(resource, S_WAIT.format(before_pfx))
await wait_resource(before)
for before in befores:
if not before.managed:
continue
- before_pfx = '{}:{}'.format(before.get_type(),
+ before_pfx = '{}:{}'.format(before.get_type(),
before.get_uuid())
self.log(resource, S_WAIT.format(before_pfx))
await wait_resource_init(before)
await self._resource_wait_attributes(resource)
await self._resource_wait_predecessors(resource)
await self._resource_wait_subresources(resource)
-
+
def _task_resource_action(self, resource, action):
"""Perform action: __get__, __create__, __delete__ on the full class
hierarchy.
It is important to centralize state change since some states are
associated with Events().
"""
- resource._state.attr_state[attribute_name] = state
if state in [
AttributeState.INITIALIZED,
AttributeState.CLEAN,
AttributeState.DIRTY
]:
resource._state.attr_init[attribute_name].set()
+ elif state == AttributeState.RESET:
+ resource._state.attr_init[attribute_name].clear()
else:
raise RuntimeError("Inconsistent resource state {}".format(state))
resource._state.attr_clean[attribute_name].set()
elif state in [
AttributeState.INITIALIZED,
- AttributeState.DIRTY
+ AttributeState.DIRTY,
+ AttributeState.RESET
]:
resource._state.attr_clean[attribute_name].clear()
else:
raise RuntimeError
+ resource._state.attr_state[attribute_name] = AttributeState.UNINITIALIZED \
+ if state == AttributeState.RESET else state
+
async def _set_attribute_state(self, resource, attribute_name, state):
"""Sets the attribute state (lock version)
"""
while new_state != AttributeState.CLEAN:
#with await resource._state.attr_lock[attribute.name]:
state = resource._state.attr_state[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Current state is {}'.format(state))
# AttributeState.ERROR
- if resource._state.attr_change_success == False:
- log.error('Attribute error {} for resource {}'.format(
- resource.get_uuid(), attribute.name))
+ if resource._state.attr_change_success[attribute.name] == False:
e = resource._state.attr_change_value[attribute.name]
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(e, LxdNotFound) or isinstance(e, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ log.error('Attribute error {} for resource {}'.format(
+ resource.get_uuid(), attribute.name))
+ import traceback; traceback.print_tb(e.__traceback__)
+ log.error('Failed with exception: {}'.format(e))
+ import os; os._exit(1)
- # Signal update errors to the parent resource
- resource._state.attr_change_event[attribute.name].set()
+ # Signal update errors to the parent resource
+ resource._state.attr_change_event[attribute.name].set()
if state == AttributeState.UNINITIALIZED:
pending_state = AttributeState.PENDING_INIT
AttributeState.PENDING_UPDATE
]:
# Nothing to do
- pending_state = None
+ pending_state = None
elif state == AttributeState.CLEAN:
return
else:
raise RuntimeError
if pending_state is None:
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Nothing to do. Waiting for event...')
await resource._state.attr_change_event[attribute.name].wait()
resource._state.attr_change_event[attribute.name].clear()
task = EmptyTask()
else:
try:
- task = self._task_attribute_op(resource, attribute,
+ task = self._task_attribute_op(resource, attribute,
Operations.SET)
except Exception as e:
log.warning('No attribute setter attribute {}'.format(
self.attr_log(resource, attribute,
'Trigger {} -> {}. Waiting task completion'.format(
state, pending_state))
- self.schedule(task)
+ self.schedule(task)
await resource._state.attr_change_event[attribute.name].wait()
resource._state.attr_change_event[attribute.name].clear()
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Completed {} -> {}. Success = {}'.format(
state, pending_state,
resource._state.attr_change_success[attribute.name]))
if pending_state == AttributeState.PENDING_INIT:
if resource._state.attr_change_success[attribute.name] == True:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'INIT success. Value = {}'.format(attrs))
found = self._process_attr_dict(resource, attribute, attrs)
if not found:
log.error('Attribute missing return attrs: {}'.format(
attrs))
- found = self._process_attr_dict(resource, attribute,
+ found = self._process_attr_dict(resource, attribute,
attrs)
new_state = AttributeState.INITIALIZED
else:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
- 'INIT gave no value. Value = {}'.format(attrs))
- new_state = AttributeState.INITIALIZED
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ self.attr_log(resource, attribute,
+ 'INIT gave no value. Value = {}'.format(attrs))
+ new_state = AttributeState.INITIALIZED
elif pending_state == AttributeState.PENDING_UPDATE:
if resource._state.attr_change_success[attribute.name] == True:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs))
if attrs != NEVER_SET:
# None could be interpreted as the return value. Also,
new_state = AttributeState.CLEAN
else:
- log.error('Attribute error {} for resource {}'.format(
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ log.error('Attribute error {} for resource {}'.format(
resource.get_uuid(), attribute.name))
- e = resource._state.attr_change_value[attribute.name]
- new_state = AttributeState.ERROR
+ e = resource._state.attr_change_value[attribute.name]
+ new_state = AttributeState.ERROR
else:
raise RuntimeError
# Setting attribute state
- await self._set_attribute_state(resource, attribute.name,
+ await self._set_attribute_state(resource, attribute.name,
new_state)
#--------------------------------------------------------------------------
ws = self._router.add_interface('websocketclient', address=ip,
hook=self._on_netmon_record)
-
+
node = resource.node
for interface in node.interfaces:
if not interface.monitored:
continue
channel_id = interface.channel._state.uuid._uuid
-
- update_vpp = functools.partial(self._on_vpp_record,
+
+ update_vpp = functools.partial(self._on_vpp_record,
pylink_id = channel_id)
- ws_vpp = self._router.add_interface('websocketclient',
+ ws_vpp = self._router.add_interface('websocketclient',
address=ip, hook=update_vpp)
aggregate_interfaces = list()
for _interface in node.interfaces:
if not _interface.get_type() == 'dpdkdevice' and \
_interface.monitored:
- aggregate_interfaces.append('"' +
+ aggregate_interfaces.append('"' +
_interface.device_name + '"')
q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces))
interface.channel.already_subscribed = True
else:
- q_str = Q_SUB_IF.format(interface.device_name)
+ q_str = Q_SUB_IF.format(interface.device_name)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
self._router._flow_table.add(packet, None, ws)
ip = ns.node.bridge.ip_address # host_interface.ip_address
ws_ns = self._router.add_interface('websocketclient', address = ip,
- port = ns.control_port,
+ port = ns.control_port,
hook = self._on_ns_record)
ws = self._router.add_interface('websocketclient', address = ip,
hook = self._on_netmon_channel_record)
# We also need to subscribe on the node for the tap interfaces
# for individual bandwidth monitoring
tap = ns._sta_taps[station]
- q_str = Q_SUB_EMULATOR.format(tap.device_name)
+ q_str = Q_SUB_EMULATOR.format(tap.device_name)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
self._router._flow_table.add(packet, None, ws)
# Monitor all FSM one by one and inform about errors.
futs = list()
attrs = list()
- for attr in resource.iter_attributes():
+ for attr in resource.iter_attributes():
if resource.is_local_attribute(attr.name):
continue
if attr.key:
resource._state.change_success = all(
resource._state.attr_change_success[attr.name]
for attr in attrs)
- self.log(resource,
+ self.log(resource,
'All attributes FSM terminated with success={}'.format(
resource._state.change_success))
if resource._state.change_success:
- ret = [ resource._state.attr_change_value[attr.name]
+ ret = [ resource._state.attr_change_value[attr.name]
for attr in attrs]
- return ret
+ return ret
else:
raise NotImplementedError('At least one attribute failed')
if not futs:
self.log(resource, 'No key attribute to update')
- return None
+ return None
await asyncio.gather(*futs)
resource._state.change_success = all(
resource._state.attr_change_success[attr.name]
for attr in attrs)
- self.log(resource,
+ self.log(resource,
'KEY attributes FSM terminated with success={}'.format(
resource._state.change_success))
def log(self, resource, msg=None):
resource._state.log.append(msg)
-
+
# Display on screen
#pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
#print(pfx, msg)
pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
self.log(resource, 'Starting FSM...')
-
+
# When a resource is managed, it will get automatically monitored by
# adding the netmon resource on it.
from vicn.resource.node import Node
if resource.get_type() == 'lxccontainer':
- self.log(resource,
+ self.log(resource,
'Associating monitoring to lxc container resource...')
instance = self.create('netmon', node=resource)
self.commit_resource(instance)
- # FIXME
+ # FIXME
elif resource.get_type() == 'physical' and resource.managed and \
len(self.by_type_str('emulatedchannel')) > 0:
- self.log(resource,
+ self.log(resource,
'Associating monitoring to physical node resource...')
instance = self.create('netmon', node=resource)
self.commit_resource(instance)
elif state == ResourceState.DELETED:
raise NotImplementedError
# Nothing to do unless explicitely requested
- pending_state = None
+ pending_state = None
elif state in [
ResourceState.PENDING_DEPS,
self.log(resource, 'CREATE failed: {}'.format(e))
e = resource._state.change_value
import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Failed with exception {}'.format(e))
+ import os; os._exit(1)
elif pending_state == ResourceState.PENDING_UPDATE:
if resource._state.change_success == True:
e = resource._state.change_value
resource._state.write_lock.release()
import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Failed with exception {}'.format(e))
+ import os; os._exit(1)
elif pending_state == ResourceState.PENDING_DELETE:
raise NotImplementedError
CMD_SET_CAPACITY='\n'.join([
'tc qdisc del dev {netdevice.device_name} root || true',
'tc qdisc add dev {netdevice.device_name} root handle 1: tbf rate '
- '{netdevice.capacity}Mbit burst {burst}kb latency 70ms'
+ '{netdevice.capacity}Mbit burst {burst}kb latency 70ms',
'tc qdisc add dev {netdevice.device_name} parent 1:1 codel',
])
CMD_GET_PCI_ADDRESS='ethtool -i {netdevice.device_name} | ' \
addr, _, prefix = words[1].partition("/")
if prefix == '':
prefix = 128 if addrtype == "ipv6" else 32
- info["ip-addresses"].append({"ip-address-type": addrtype,
+ info["ip-addresses"].append({"ip-address-type": addrtype,
"ip-address": addr, "prefix": int(prefix)})
yield info
device_name = Attribute(String, description = 'Name of the NetDevice',
default = lambda x : x._default_device_name(),
max_size = MAX_DEVICE_NAME_SIZE)
- capacity = Attribute(Integer,
+ capacity = Attribute(Integer,
description = 'Capacity for interface shaping (Mb/s)')
mac_address = Attribute(String, description = 'Mac address of the device')
ip_address = Attribute(String, description = 'IP address of the device')
- pci_address = Attribute(String,
+ pci_address = Attribute(String,
description = 'PCI bus address of the device',
ro = True)
promiscuous = Attribute(Bool, description = 'Promiscuous', default = False)
# Merge into parse_ip_link
def parse(rv):
assert rv is not None
-
+
nds = parse_ip_link(rv.stdout)
# This will raise an exception is the interface does not exist
nd = nds[self.device_name]
attrs['mac_address'] = nd['hardware-address']
# We assume a single IPv4 address for now...
- ips = [ip for ip in nd['ip-addresses']
+ ips = [ip for ip in nd['ip-addresses']
if ip['ip-address-type'] == 'ipv4']
if len(ips) >= 1:
if len(ips) > 1:
def _set_ip_address(self):
if self.ip_address is None:
# Unset IP
- return BashTask(self.node, CMD_FLUSH_IP,
+ return BashTask(self.node, CMD_FLUSH_IP,
{'device_name': self.device_name})
- return BashTask(self.node, CMD_SET_IP_ADDRESS,
+ return BashTask(self.node, CMD_SET_IP_ADDRESS,
{'netdevice': self})
@task
def _set_promiscuous(self):
on_off = 'on' if self.promiscuous else 'off'
- return BashTask(self.node, CMD_SET_PROMISC,
- {'netdevice': self, 'on_off' : on_off})
+ return BashTask(self.node, CMD_SET_PROMISC,
+ {'netdevice': self, 'on_off' : on_off})
@task
def _get_up(self):
# http://unix.stackexchange.com/questions/100785/bucket-size-in-tbf
MBPS = 1000000
- KBPS = 1024
+ KBPS = 1024
BYTES = 8
HZ = 250
def parse(rv):
lines = rv.stdout.splitlines()
return (int(lines[0][-1]) + int(lines[1][-1]) > 0)
- return BashTask(self.node, CMD_GET_RP_FILTER, {'netdevice' :self},
+ return BashTask(self.node, CMD_GET_RP_FILTER, {'netdevice' :self},
parse = parse)
def _set_rp_filter(self):
rnd = ''.join(random.choice(string.ascii_uppercase + string.digits)
for _ in range(3))
return 'unk{}'.format(rnd)
-
+
def _remote_interface(self):
if not self.channel:
return None
if remote_node_name:
return remote_node_name
else:
- return AddressManager().get('device_name', self,
- prefix = self.prefix, scope = self.prefix)
+ return AddressManager().get('device_name', self,
+ prefix = self.prefix, scope = self.prefix)
#------------------------------------------------------------------------------
super().__init__(*args, **kwargs)
def _get_offload(self):
- return BashTask(self.node, CMD_GET_OFFLOAD, {'netdevice': self},
+ return BashTask(self.node, CMD_GET_OFFLOAD, {'netdevice': self},
parse = lambda rv : rv.stdout.strip() == 'on')
def _set_offload(self):
max_len = MAX_DEVICE_NAME_SIZE - len(self.node.name) - 1
device_name = self.device_name[:max_len]
- return NetDevice(node = host,
+ return NetDevice(node = host,
device_name = '{}-{}'.format(self.node.name, device_name),
managed = False)