# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Route module encapsulates functions related to static routing and
related configurations on NGFW.
When retrieving routing, it is done from the engine context.
For example, retrieve all routing for an engine in context::
>>> engine = Engine('sg_vm')
>>> for route_node in engine.routing:
... print(route_node)
...
Routing(name=Interface 0,level=interface)
Routing(name=Interface 1,level=interface)
Routing(name=Interface 2,level=interface)
Routing(name=Tunnel Interface 2000,level=interface)
Routing(name=Tunnel Interface 2001,level=interface)
Routing nodes are nested, starting with the engine level.
Routing node nesting is made up of 'levels' and can be
represented as a tree::
engine (root)
|
--> interface
|
--> network
|
--> gateway
|
--> any
You can get a representation of the routing or antispoofing tree nodes
by calling as_tree::
>>> print(engine.routing.as_tree())
Routing(name=myfw,level=engine_cluster)
--Routing(name=Interface 0,level=interface)
----Routing(name=network-1.1.1.0/24,level=network)
------Routing(name=mypeering,level=gateway)
------Routing(name=mynetlink,level=gateway)
--------Routing(name=router-1.1.1.1,level=any)
------Routing(name=mystatic,level=gateway)
--Routing(name=Interface 1,level=interface)
----Routing(name=network-10.10.10.0/24,level=network)
------Routing(name=anotherpeering,level=gateway)
--Routing(name=Tunnel Interface 1000,level=interface)
----Routing(name=network-2.2.2.0/24,level=network)
--Routing(name=Tunnel Interface 1001,level=interface)
--Routing(name=Interface 2,level=interface)
----Routing(name=Network (IPv4),level=network)
------Routing(name=dynamic_netlink-myfw-Interface 2,level=gateway)
--------Routing(name=Any network,level=any)
If nested routes exist, you can iterate a given node to get specific
information::
>>> interface = engine.routing.get(1)
>>> for routes in interface:
... print(routes)
...
Routing(name=network-10.0.0.0/24,level=network)
...
>>> for networks in interface:
... networks
... for gateways in networks:
... print gateways, gateways.ip
...
Routing(name=network-172.18.1.0/24,level=network)
Routing(name=asus-wireless,level=gateway) 172.18.1.200
If BGP, OSPF or a Traffic Handler (netlink) need to be added to an interface
that has multiple IP addresses assigned and you want to bind to only one, you
can provide the ``network`` parameter to ``add_`` methods. The network can be
obtained for an interface::
>>> engine = Engine('sg_vm')
>>> interface0 = engine.routing.get(0)
>>> for network in interface0:
... network, network.ip
...
(Routing(name=network-172.18.1.0/24,level=network), '172.18.1.0/24')
Then add using::
>>> engine = Engine('sg_vm')
>>> interface0 = engine.routing.get(0)
>>> interface0.add_traffic_handler(StaticNetlink('foo'), network='172.18.1.0/24')
.. note:: If the ``network`` keyword is omitted and the interface has multiple
IP addresses assigned, this will bind OSPF, BGP or the Traffic Handler
to all address assigned.
Adding a basic static route can be done from the engine directly if it is a
simple source network to destination route::
engine.add_route(gateway='192.168.1.254/32', network='172.18.1.0/24')
The route gateway will be mapped to an interface with an address range in
the 192.168.1.x network automatically.
For more complex static routes such as ones that may use group elements, use
the routing node::
>>> engine = Engine('ve-1')
>>> interface0 = engine.routing.get(0)
>>> interface0.add_static_route(Router('tmprouter'), destination=[Group('routegroup')])
When a routing gateway is added to an IPv6 network, the gateway is validated before
adding. For example, if you have a single interface that has both an IPv4 and IPv6
address assigned, a static route using a Router gateway with only an IPv4 address will
only bind to the IPv4 network. In this case, you can optionally add both an IPv4 and
IPv6 to the router element, or run this operation for each network respectively.
.. seealso:: :meth:`.Routing.add_static_route`
.. note:: When changing are made to a routing node, i.e. adding OSPF, BGP, Netlink's, the
configuration is updated immediately without calling .update()
"""
import collections
from smc.base.model import SubElement, Element, ElementCache
from smc.base.util import element_resolver
from smc.api.exceptions import InterfaceNotFound, ModificationAborted
from smc.base.structs import SerializedIterable
def flush_parent_cache(node):
"""
Flush parent cache will recurse back up the tree and
wipe the cache from each parent node reference on the
given element. This allows the objects to be reused
and a clean way to force the object to update itself
if attributes or methods are referenced after update.
"""
if node._parent is None:
node._del_cache()
return
node._del_cache()
flush_parent_cache(node._parent)
[docs]
class RoutingTree(SubElement):
"""
RoutingTree is the base class for both Routing and Antispoofing nodes.
This provides a commmon API for operations that affect how routing table
and antispoofing operate.
"""
def __init__(self, data=None, **meta):
super(RoutingTree, self).__init__(**meta)
if data is not None:
self.data = ElementCache(data)
def __iter__(self):
for node in self.data[self.typeof]:
data = ElementCache(node)
yield (
self.__class__(
href=data.get_link("self"), type=self.__class__.__name__, data=node, parent=self
)
)
@property
def name(self):
"""
Interface name / ID for routing level
:return: name of routing node
:rtype: str
"""
return self.data.get("name")
@property
def nicid(self):
"""
NIC id for this interface
:return: nic identifier
:rtype: str
"""
return self.data.get("nic_id")
@property
def dynamic_nicid(self):
"""
NIC id for this dynamic interface
:return: nic identifier, if this is a DHCP interface
:rtype: str or None
"""
return self.data.get("dynamic_nicid")
@property
def ip(self):
"""
IP network / host for this route
:return: IP address of this routing level
:rtype: str
"""
return self.data.get("ip")
@property
def level(self):
"""
Routing nodes have multiple 'levels' where routes can
be nested. Most routes are placed at the interface level.
This setting can mostly be ignored, but provides an
informative view of how the route is nested.
:return: routing node level (interface,network,gateway,any)
:rtype: str
"""
return self.data.get("level")
@property
def related_element_type(self):
"""
.. versionadded:: 0.6.0
Requires SMC version >= 6.4
Related element type defines the 'type' of element at this
routing or antispoofing node level.
:rtype: str
"""
if "related_element_type" in self.data:
return self.data.get("related_element_type")
return (
None
if self.dynamic_nicid or (self.nicid and "." in self.nicid)
else Element.from_href(self.data.get("href")).typeof
) # pre-6.4
@property
def probe_test(self):
"""
probe test for a route.
possible values: ping, next_hop_reachability, not_enabled
:return: probe test value
:rtype: str
"""
return self.data.get("probe_test")
@probe_test.setter
def probe_test(self, value):
self.data["probe_test"] = value
@property
def probe_ecmp(self):
"""
the ECMP for multi path routing.
:rtype: int
"""
return self.data.get("probe_ecmp")
@probe_ecmp.setter
def probe_ecmp(self, value):
self.data["probe_ecmp"] = value
@property
def probe_metric(self):
"""
the probe metric.
:rtype: int
"""
return self.data.get("probe_metric")
@probe_metric.setter
def probe_metric(self, value):
self.data["probe_metric"] = value
@property
def probe_ipaddress(self):
"""
the probe ipaddress, when the probe test is Ping.
:rtype: str
"""
return self.data.get("probe_ipaddress")
@probe_ipaddress.setter
def probe_ipaddress(self, value):
self.data["probe_ipaddress"] = value
@property
def probe_retry_count(self):
"""
the probe retry counter.
:rtype: int
"""
return self.data.get("probe_retry_count")
@probe_retry_count.setter
def probe_retry_count(self, value):
self.data["probe_retry_count"] = value
@property
def probe_interval(self):
"""
the probe interval.
:rtype: int
"""
return self.data.get("probe_interval")
@probe_interval.setter
def probe_interval(self, value):
self.data["probe_interval"] = value
[docs]
def as_tree(self, level=0):
"""
Display the routing tree representation in string
format
:rtype: str
"""
ret = "--" * level + repr(self) + "\n"
for routing_node in self:
ret += routing_node.as_tree(level + 1)
return ret
[docs]
def get(self, interface_id):
"""
Obtain routing configuration for a specific interface by
ID.
.. note::
If interface is a VLAN, you must use a str to specify the
interface id, such as '3.13' (interface 3, VLAN 13)
:param str,int interface_id: interface identifier
:raises InterfaceNotFound: invalid interface for engine
:return: Routing element, or None if not found
:rtype: Routing
"""
for interface in self:
if interface.nicid == str(interface_id) or interface.dynamic_nicid == str(interface_id):
return interface
raise InterfaceNotFound(
"Specified interface {} does not exist on " "this engine.".format(interface_id)
)
[docs]
def delete(self):
super(RoutingTree, self).delete()
flush_parent_cache(self._parent)
[docs]
def update(self):
super(RoutingTree, self).update()
flush_parent_cache(self._parent)
[docs]
def all(self):
"""
Return all routes for this engine.
:return: current route entries as :class:`.Routing` element
:rtype: list
"""
return [node for node in self]
def __str__(self):
return "{}(name={},level={},type={})".format(
self.__class__.__name__, self.name, self.level, self.related_element_type
)
def __repr__(self):
return str(self)
[docs]
class Routing(RoutingTree):
"""
Routing represents the Engine routing configuration and provides the
ability to view and add features to routing nodes such as OSPF.
"""
typeof = "routing_node"
def __init__(self, data=None, **meta):
self._parent = meta.pop("parent", None)
super(Routing, self).__init__(data, **meta)
@property
def routing_node_element(self):
"""
A routing node element will reference the element used to represent
the node (i.e. router, host, network, netlink, bgp peering, etc).
Although the routing node already resolves the element and provides
the `ip` property to obtain the address/network, use this property
to obtain access to modifying the element itself::
>>> interface0 = engine.routing.get(0)
>>> for networks in interface0:
... for gateway in networks:
... gateway.routing_node_element
...
Router(name=router-1.1.1.1)
StaticNetlink(name=mystatic)
BGPPeering(name=anotherpeering)
BGPPeering(name=mypeering)
>>>
"""
return from_meta(self)
@property
def bgp_peerings(self):
"""
BGP Peerings applied to a routing node. This can be called from
the engine, interface or network level. Return is a tuple
of (interface, network, bgp_peering). This simplifies viewing
and removing BGP Peers from the routing table::
>>> for bgp in engine.routing.bgp_peerings:
... bgp
...
(Routing(name=Interface 0,level=interface,type=physical_interface),
Routing(name=network-1.1.1.0/24,level=network,type=network),
Routing(name=mypeering,level=gateway,type=bgp_peering))
(Routing(name=Interface 1,level=interface,type=physical_interface),
Routing(name=network-2.2.2.0/24,level=network,type=network),
Routing(name=mypeering,level=gateway,type=bgp_peering))
.. seealso:: :meth:`~netlinks` and :meth:`~ospf_areas` for obtaining
other routing element types
:rtype: tuple(Routing)
"""
return gateway_by_type(self, "bgp_peering")
@property
def netlinks(self):
"""
Netlinks applied to a routing node. This can be called
from the engine, interface or network level. Return is a
tuple of (interface, network, netlink). This simplifies
viewing and removing Netlinks from the routing table::
>>> interface = engine.routing.get(1)
>>> for static_netlink in interface.netlinks:
... interface, network, netlink = static_netlink
... netlink
... netlink.delete()
...
Routing(name=mylink,level=gateway,type=netlink)
.. seealso:: :meth:`~bgp_peerings` and :meth:`~ospf_areas` for obtaining
other routing element types
:rtype: tuple(Routing)
"""
return gateway_by_type(self, "netlink")
@property
def ospf_areas(self):
"""
OSPFv2 areas applied to a routing node. This can be called from
the engine, interface or network level. Return is a tuple
of (interface, network, bgp_peering). This simplifies viewing
and removing BGP Peers from the routing table::
>>> for ospf in engine.routing.ospf_areas:
... ospf
...
(Routing(name=Interface 0,level=interface,type=physical_interface),
Routing(name=network-1.1.1.0/24,level=network,type=network),
Routing(name=area10,level=gateway,type=ospfv2_area))
.. seealso:: :meth:`~bgp_peerings` and :meth:`~netlinks` for obtaining
other routing element types
:rtype: tuple(Routing)
"""
return gateway_by_type(self, "ospfv2_area")
[docs]
def add_traffic_handler(self, netlink, netlink_gw=None, network=None):
"""
Add a traffic handler to a routing node. A traffic handler can be
either a static netlink or a multilink traffic handler. If ``network``
is not specified and the interface has multiple IP addresses, the
traffic handler will be added to all ipv4 addresses.
Add a pre-defined netlink to the route table of interface 0::
engine = Engine('vm')
rnode = engine.routing.get(0)
rnode.add_traffic_handler(StaticNetlink('mynetlink'))
Add a pre-defined netlink only to a specific network on an interface
with multiple addresses. Specify a netlink_gw for the netlink::
rnode = engine.routing.get(0)
rnode.add_traffic_handler(
StaticNetlink('mynetlink'),
netlink_gw=[Router('myrtr'), Host('myhost')],
network='172.18.1.0/24')
:param StaticNetlink,Multilink netlink: netlink element
:param list(Element) netlink_gw: list of elements that should be destinations
for this netlink. Typically these may be of type host, router, group, server,
network or engine.
:param str network: if network specified, only add OSPF to this network on interface
:raises UpdateElementFailed: failure updating routing
:raises ModificationAborted: Change must be made at the interface level
:raises ElementNotFound: ospf area not found
:return: Status of whether the route table was updated
:rtype: bool
"""
routing_node_gateway = RoutingNodeGateway(
netlink, destinations=[] if not netlink_gw else netlink_gw
)
return self._add_gateway_node("netlink", routing_node_gateway, network)
[docs]
def add_ospf_area(
self,
ospf_area,
ospf_interface_setting=None,
network=None,
communication_mode="not_forced",
unicast_ref=None,
):
"""
Add OSPF Area to this routing node.
Communication mode specifies how the interface will interact with the
adjacent OSPF environment. Please see SMC API documentation for more
in depth information on each option.
If the interface has multiple networks nested below, all networks
will receive the OSPF area by default unless the ``network`` parameter
is specified. OSPF cannot be applied to IPv6 networks.
Example of adding an area to interface routing node::
area = OSPFArea('area0') #obtain area resource
#Set on routing interface 0
interface = engine.routing.get(0)
interface.add_ospf_area(area)
.. note:: If unicast is specified, you must also provide a unicast_ref
of element type Host to identify the remote host. If no
unicast_ref is provided, this is skipped
:param OSPFArea ospf_area: OSPF area instance or href
:param OSPFInterfaceSetting ospf_interface_setting: used to override the
OSPF settings for this interface (optional)
:param str network: if network specified, only add OSPF to this network
on interface
:param str communication_mode: not_forced|point_to_point|passive|unicast
:param Element unicast_ref: Element used as unicast gw (required for unicast)
:raises ModificationAborted: Change must be made at the interface level
:raises UpdateElementFailed: failure updating routing
:raises ElementNotFound: ospf area not found
:return: Status of whether the route table was updated
:rtype: bool
"""
communication_mode = communication_mode.lower()
destinations = [] if not ospf_interface_setting else [ospf_interface_setting]
if communication_mode == "unicast" and unicast_ref:
destinations.append(unicast_ref)
routing_node_gateway = RoutingNodeGateway(
ospf_area, communication_mode=communication_mode, destinations=destinations
)
return self._add_gateway_node("ospfv2_area", routing_node_gateway, network)
[docs]
def add_bgp_peering(self, bgp_peering, external_bgp_peer=None, network=None):
"""
Add a BGP configuration to this routing interface.
If the interface has multiple ip addresses, all networks will receive
the BGP peering by default unless the ``network`` parameter is
specified.
Example of adding BGP to an interface by ID::
interface = engine.routing.get(0)
interface.add_bgp_peering(
BGPPeering('mypeer'),
ExternalBGPPeer('neighbor'))
:param BGPPeering bgp_peering: BGP Peer element
:param ExternalBGPPeer,Engine external_bgp_peer: peer element or href
:param str network: if network specified, only add OSPF to this network
on interface
:raises ModificationAborted: Change must be made at the interface level
:raises UpdateElementFailed: failed to add BGP
:return: Status of whether the route table was updated
:rtype: bool
"""
destination = [external_bgp_peer] if external_bgp_peer else []
routing_node_gateway = RoutingNodeGateway(bgp_peering, destinations=destination)
return self._add_gateway_node("bgp_peering", routing_node_gateway, network)
[docs]
def add_static_route(self, gateway, destination, network=None):
"""
Add a static route to this route table. Destination can be any element
type supported in the routing table such as a Group of network members.
Since a static route gateway needs to be on the same network as the
interface, provide a value for `network` if an interface has multiple
addresses on different networks.
::
>>> engine = Engine('ve-1')
>>> itf = engine.routing.get(0)
>>> itf.add_static_route(
gateway=Router('tmprouter'),
destination=[Group('routegroup')])
:param Element gateway: gateway for this route (Router, Host)
:param Element destination: destination network/s for this route.
:type destination: list(Host, Router, ..)
:raises ModificationAborted: Change must be made at the interface level
:raises UpdateElementFailed: failure to update routing table
:return: Status of whether the route table was updated
:rtype: bool
"""
routing_node_gateway = RoutingNodeGateway(gateway, destinations=destination)
return self._add_gateway_node("router", routing_node_gateway, network)
[docs]
def add_dynamic_gateway(self, networks):
"""
A dynamic gateway object creates a router object that is
attached to a DHCP interface. You can associate networks with
this gateway address to identify networks for routing on this
interface.
::
route = engine.routing.get(0)
route.add_dynamic_gateway([Network('mynetwork')])
:param list Network: list of network elements to add to
this DHCP gateway
:raises ModificationAborted: Change must be made at the interface level
:raises UpdateElementFailed: failure to update routing table
:return: Status of whether the route table was updated
:rtype: bool
"""
routing_node_gateway = RoutingNodeGateway(
dynamic_classid="gateway", destinations=networks or []
)
return self._add_gateway_node("dynamic_netlink", routing_node_gateway)
def _add_gateway_node_on_tunnel(self, routing_node_gateway):
"""
Add a gateway node on a tunnel interface. Tunnel interface elements
are attached to the interface level and not directly nested under
the networks node.
:param RouteNodeGateway routing_node_gateway: routing node gateway instance
:return: Whether a change was made or not
:rtype: bool
"""
modified = False
peering = [
next_hop
for next_hop in self
if next_hop.routing_node_element == routing_node_gateway.routing_node_element
]
if not peering:
self.data.setdefault("routing_node", []).append(routing_node_gateway)
modified = True
# Have peering
else:
peers = [node.routing_node_element for peer in peering for node in peer]
for destination in routing_node_gateway.destinations:
if destination not in peers:
peering[0].data.setdefault("routing_node", []).append(
{"level": "any", "href": destination.href, "name": destination.name}
)
modified = True
if modified:
self.update()
return modified
def _add_gateway_node(self, gw_type, routing_node_gateway, network=None):
"""
Add a gateway node to existing routing tree. Gateways are only added if
they do not already exist. If they do exist, check the destinations of
the existing gateway and add destinations that are not already there.
A current limitation is that if a gateway doesn't exist and the
destinations specified do not have IP addresses that are valid, they
are still added (i.e. IPv4 gateway with IPv6 destination is considered
invalid).
:param Routing self: the routing node, should be the interface routing node
:param str gw_type: type of gateway, i.e. netlink, ospfv2_area, etc
:param RoutingNodeGateway route_node_gateway: gateway element
:param str network: network to bind to. If none, all networks
:return: Whether a change was made or not
:rtype: bool
"""
if self.level != "interface":
raise ModificationAborted(
"You must make this change from the "
"interface routing level. Current node: {}".format(self)
)
if self.related_element_type == "tunnel_interface":
return self._add_gateway_node_on_tunnel(routing_node_gateway)
# Find any existing gateways
routing_node = list(gateway_by_type(self, type=gw_type, on_network=network))
_networks = (
[netwk for netwk in self if netwk.ip == network] if network is not None else list(self)
)
# Routing Node Gateway to add as Element
gateway_element_type = routing_node_gateway.routing_node_element
modified = False
for network in _networks:
# Short circuit for dynamic interfaces
if getattr(network, "dynamic_classid", None):
network.data.setdefault("routing_node", []).append(routing_node_gateway)
modified = True
break
# Used for comparison to
this_network_node = network.routing_node_element
if routing_node and any(
netwk
for _intf, netwk, gw in routing_node
if netwk.routing_node_element == this_network_node
and gateway_element_type == gw.routing_node_element
):
# A gateway exists on this network
for gw in network:
if gw.routing_node_element == gateway_element_type:
existing_dests = [node.routing_node_element for node in gw]
for destination in routing_node_gateway.destinations:
is_valid_destination = False
if destination not in existing_dests:
dest_ipv4, dest_ipv6 = _which_ip_protocol(destination)
if len(network.ip.split(":")) > 1: # IPv6
if dest_ipv6:
is_valid_destination = True
else:
if dest_ipv4:
is_valid_destination = True
if is_valid_destination:
gw.data.setdefault("routing_node", []).append(
{
"level": "any",
"href": destination.href,
"name": destination.name,
}
)
modified = True
else: # Gateway doesn't exist
gw_ipv4, gw_ipv6 = _which_ip_protocol(gateway_element_type) # ipv4, ipv6 or both
if len(network.ip.split(":")) > 1:
if gw_ipv6:
network.data.setdefault("routing_node", []).append(routing_node_gateway)
modified = True
else: # IPv4
if gw_ipv4:
network.data.setdefault("routing_node", []).append(routing_node_gateway)
modified = True
if modified:
self.update()
return modified
[docs]
def remove_route_gateway(self, element, network=None):
"""
Remove a route element by href or Element. Use this if you want to
remove a netlink or a routing element such as BGP or OSPF. Removing
is done from within the routing interface context.
::
interface0 = engine.routing.get(0)
interface0.remove_route_gateway(StaticNetlink('mynetlink'))
Only from a specific network on a multi-address interface::
interface0.remove_route_gateway(
StaticNetlink('mynetlink'),
network='172.18.1.0/24')
:param str,Element element: element to remove from this routing node
:param str network: if network specified, only add OSPF to this
network on interface
:raises ModificationAborted: Change must be made at the interface level
:raises UpdateElementFailed: failure to update routing table
:return: Status of whether the entry was removed (i.e. or not found)
:rtype: bool
"""
if self.level not in ("interface",):
raise ModificationAborted(
"You must make this change from the "
"interface routing level. Current node: {}".format(self)
)
node_changed = False
element = element_resolver(element)
for network in self:
# Tunnel Interface binds gateways to the interface
if network.level == "gateway" and network.data.get("href") == element:
network.delete()
node_changed = True
break
for gateway in network:
if gateway.data.get("href") == element:
gateway.delete()
node_changed = True
return node_changed
class RoutingNodeGateway(Routing):
def __init__(self, element=None, level="gateway", **kwargs):
self.destinations = kwargs.pop("destinations", [])
self.data = ElementCache(kwargs)
self.data.update(level=level, routing_node=[])
if element:
self.data.update(href=element.href, name=element.name)
# related_element_type=element.typeof)
for destination in self.destinations:
self.data["routing_node"].append(
{"href": destination.href, "name": destination.name, "level": "any"}
)
[docs]
class Antispoofing(RoutingTree):
"""
Anti-spoofing is configured by default based on
interface networks directly attached. It is possible
to override these settings by adding additional
networks as valid source networks on a given
interface.
Antispoofing is nested similar to routes. Iterate the
antispoofing configuration::
for entry in engine.antispoofing.all():
print(entry)
"""
typeof = "antispoofing_node"
def __init__(self, data=None, **meta):
self._parent = meta.pop("parent", None)
super(Antispoofing, self).__init__(data, **meta)
@property
def autogenerated(self):
"""
Was the entry auto generated by a route entry or
added manually as an override
:rtype: bool
"""
return self.data.get("auto_generated") == "true"
@property
def validity(self):
"""
Enabled or disabled antispoofing entry
:return: validity of this entry (enable,disable,absolute)
:rtype: str
"""
return self.data.get("validity")
[docs]
def add(self, element):
"""
Add an entry to this antispoofing node level.
Entry can be either href or network elements specified
in :py:class:`smc.elements.network`
::
if0 = engine.antispoofing.get(0)
if0.add(Network('foonet'))
:param Element element: entry to add, i.e. Network('mynetwork'), Host(..)
:raises CreateElementFailed: failed adding entry
:raises ElementNotFound: element entry specified not in SMC
:return: whether entry was added
:rtype: bool
"""
if self.level == "interface":
for network in self:
if from_meta(network) == element:
return False
self.data["antispoofing_node"].append(
{
"antispoofing_node": [],
"auto_generated": "false",
"href": element.href,
"level": self.level,
"validity": "enable",
"name": element.name,
}
)
self.update()
return True
return False
def __len__(self):
return len(self.data.get("antispoofing_node", []))
[docs]
def remove(self, element):
"""
Remove a specific user added element from the antispoofing tables of
a given interface. This will not remove autogenerated or system level
entries.
:param Element element: element to remove
:return: remove element if it exists and return bool
:rtype: bool
"""
if self.level == "interface":
len_before_change = len(self)
_nodes = []
for network in self:
if from_meta(network) != element:
_nodes.append(network.data)
else:
if network.autogenerated: # Make sure it was user added
_nodes.append(network.data)
if len(_nodes) != len_before_change:
self.data["antispoofing_node"] = _nodes
self.update()
return True
return False
def from_meta(node):
"""
Helper method that reolves a routing node to element. Rather than doing
a lookup and fetch, the routing node provides the information to
build the element from meta alone.
:rtype: Element
"""
# Version SMC < 6.4
if "related_element_type" not in node.data:
return Element.from_href(node.data.get("href"))
# SMC Version >= 6.4 - more efficient because it builds the
# element by meta versus requiring a query
return Element.from_meta(
name=node.data.get("name"), type=node.related_element_type, href=node.data.get("href")
)
def route_level(root, level):
"""
Helper method to recurse the current node and return
the specified routing node level.
"""
def recurse(nodes):
for node in nodes:
if node.level == level:
routing_node.append(node)
else:
recurse(node)
routing_node = []
recurse(root)
return routing_node
def gateway_by_type(self, type=None, on_network=None): # @ReservedAssignment
"""
Return gateways for the specified node. You can also
specify type to find only gateways of a specific type.
Valid types are: bgp_peering, netlink, ospfv2_area.
:param RoutingNode self: the routing node to check
:param str type: bgp_peering, netlink, ospfv2_area
:param str on_network: if network is specified, should be CIDR and
specifies a filter to only return gateways on that network when
an interface has multiple
:return: tuple of RoutingNode(interface,network,gateway)
:rtype: list
"""
gateways = route_level(self, "gateway")
if not type:
for gw in gateways:
yield gw
else:
for node in gateways:
# TODO: Change to type == node.related_element_type when
# only supporting SMC >= 6.4
if type == node.routing_node_element.typeof:
# If the parent is level interface, this is a tunnel interface
# where the gateway is bound to interface versus network
parent = node._parent
if parent.level == "interface":
interface = parent
network = None
else:
network = parent
interface = network._parent
if on_network is not None:
if network and network.ip == on_network:
yield (interface, network, node)
else:
yield (interface, network, node)
def _which_ip_protocol(element):
"""
Validate the protocol addresses for the element. Most elements can
have an IPv4 or IPv6 address assigned on the same element. This
allows elements to be validated and placed on the right network.
:return: boolean tuple
:rtype: tuple(ipv4, ipv6)
"""
try:
if element.typeof in ("host", "router"):
return getattr(element, "address", False), getattr(element, "ipv6_address", False)
elif element.typeof == "netlink":
gateway = element.gateway
if gateway.typeof == "router":
return getattr(gateway, "address", False), getattr(gateway, "ipv6_address", False)
# It's an engine, return true
elif element.typeof == "network":
return getattr(element, "ipv4_network", False), getattr(element, "ipv6_network", False)
except AttributeError:
pass
# Always return true so that the calling function assumes the element
# is valid for the routing node. This could fail when submitting but
# we don't want to prevent adding elements yet since this could change
return True, True
def del_invalid_routes(engine, nicids):
"""
Helper method to run through and delete any routes that are tagged
as invalid or to_delete by a list of nicids. Since we could have a
list of routes, iterate from top level engine routing node to avoid
fetch exceptions. Route list should be a list of nicids as str.
:param list nicids: list of nicids
:raises DeleteElementFailed: delete element failed with reason
"""
nicids = map(str, nicids)
for interface in engine.routing:
if interface.nicid in nicids:
if getattr(interface, "to_delete", False): # Delete the invalid interface
interface.delete()
continue
for network in interface:
if getattr(network, "invalid", False) or getattr(network, "to_delete", False):
network.delete()
route = collections.namedtuple(
"Route", "route_network route_netmask route_gateway route_type dst_if src_if, values"
)
route.__new__.__defaults__ = (None,) * len(route._fields)
[docs]
class Route(SerializedIterable):
"""
Active routes obtained from a running engine.
Obtain routes from an engine reference::
>>> engine = Engine('sg_vm')
>>> for route in engine.routing_monitoring:
... route
:ivar str route_network: network for this route
:ivar int route_netmask: netmask for the route
:ivar str route_gateway: route gateway, may be None if it's a local network only
:ivar str route_type: status of the route
:ivar int dst_if: destination interface index
:ivar int src_if: source interface index
"""
def __init__(self, data):
routes = data.get("routing_monitoring_entry", [])
data = [{k: v for k, v in d.items() if k != "cluster_ref"} for d in routes]
super(Route, self).__init__(data, route)
policy_route = collections.namedtuple("PolicyRoute", "source destination gateway_ip comment")
policy_route.__new__.__defaults__ = (None,) * len(policy_route._fields)
[docs]
class PolicyRoute(SerializedIterable):
"""
An iterable providing an interface to policy based routing on the
engine.
You must call engine.udpate() after performing an add or delete::
>>> engine = Engine('myfw')
>>> engine.policy_route
PolicyRoute(items: 1)
>>> for rt in engine.policy_route:
... rt
...
PolicyRoute(source=u'172.18.1.0/24', destination=u'172.18.1.0/24',
gateway_ip=u'172.18.1.1', comment=None)
>>> engine.policy_route.create(source='172.18.2.0/24',
destination='192.168.3.0/24', gateway_ip='172.18.2.1')
>>> engine.update()
'http://172.18.1.151:8082/6.4/elements/single_fw/746'
>>> for rt in engine.policy_route:
... rt
...
PolicyRoute(source=u'172.18.1.0/24', destination=u'172.18.1.0/24',
gateway_ip=u'172.18.1.1', comment=None)
PolicyRoute(source=u'172.18.2.0/24', destination=u'192.168.3.0/24',
gateway_ip=u'172.18.2.1', comment=None)
>>> engine.policy_route.delete(source='172.18.2.0/24')
>>> engine.update()
'http://172.18.1.151:8082/6.4/elements/single_fw/746'
>>> for rt in engine.policy_route:
... rt
...
PolicyRoute(source=u'172.18.1.0/24', destination=u'172.18.1.0/24',
gateway_ip=u'172.18.1.1', comment=None)
:ivar str source: source network/cidr for the route
:ivar str destination: destination network/cidr for the route
:ivar str gateway_ip: gateway IP address, must be on source network
:ivar str comment: optional comment
"""
def __init__(self, engine):
data = engine.data.get("policy_route")
super(PolicyRoute, self).__init__(data, policy_route)
[docs]
def create(self, source, destination, gateway_ip, comment=None):
"""
Add a new policy route to the engine.
:param str source: network address with /cidr
:param str destination: network address with /cidr
:param str gateway: IP address, must be on source network
:param str comment: optional comment
"""
self.items.append(
dict(source=source, destination=destination, gateway_ip=gateway_ip, comment=comment)
)
[docs]
def delete(self, **kw):
"""
Delete a policy route from the engine. You can delete using a
single field or multiple fields for a more exact match.
Use a keyword argument to delete a route by any valid attribute.
:param kw: use valid Route keyword values to delete by exact match
"""
delete_by = []
for field, val in kw.items():
if val is not None:
delete_by.append(field)
self.items[:] = [
route
for route in self.items
if not all(route.get(field) == kw.get(field) for field in delete_by)
]